import os import sqlite3 import string import secrets from datetime import datetime, timezone from functools import wraps from flask import ( Flask, g, request, redirect, render_template, session, url_for, abort, flash ) # -------------------------------------------------------------------------- # Config # -------------------------------------------------------------------------- DB_PATH = os.environ.get("DB_PATH", "/data/links.db") ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD") SECRET_KEY = os.environ.get("SECRET_KEY") if not ADMIN_PASSWORD: raise RuntimeError("ADMIN_PASSWORD env var ist nicht gesetzt") if not SECRET_KEY: raise RuntimeError("SECRET_KEY env var ist nicht gesetzt") BASE62 = string.digits + string.ascii_lowercase + string.ascii_uppercase CODE_LENGTH = 6 app = Flask(__name__) app.secret_key = SECRET_KEY app.config["SESSION_COOKIE_HTTPONLY"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" app.config["SESSION_COOKIE_SECURE"] = os.environ.get("COOKIE_SECURE", "true").lower() == "true" # -------------------------------------------------------------------------- # DB Helpers # -------------------------------------------------------------------------- def get_db(): if "db" not in g: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) g.db = sqlite3.connect(DB_PATH) g.db.row_factory = sqlite3.Row g.db.execute("PRAGMA foreign_keys = ON") return g.db @app.teardown_appcontext def close_db(exception=None): db = g.pop("db", None) if db is not None: db.close() def init_db(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.execute(""" CREATE TABLE IF NOT EXISTS links ( code TEXT PRIMARY KEY, target TEXT NOT NULL, is_alias INTEGER NOT NULL DEFAULT 0, clicks INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL ) """) conn.commit() conn.close() # -------------------------------------------------------------------------- # Code generation # -------------------------------------------------------------------------- def generate_code(length=CODE_LENGTH): """Kryptographisch zufälligen, noch nicht vergebenen base62-Code erzeugen.""" db = get_db() for _ in range(20): code = "".join(secrets.choice(BASE62) for _ in range(length)) exists = db.execute("SELECT 1 FROM links WHERE code = ?", (code,)).fetchone() if not exists: return code # extrem unwahrscheinlich, aber sicher ist sicher raise RuntimeError("Konnte keinen freien Code generieren, Namespace evtl. zu voll") def normalize_target(url: str) -> str: url = url.strip() if not url: raise ValueError("Ziel-URL darf nicht leer sein") if not (url.startswith("http://") or url.startswith("https://")): url = "https://" + url return url # -------------------------------------------------------------------------- # Auth # -------------------------------------------------------------------------- def login_required(view): @wraps(view) def wrapped(*args, **kwargs): if not session.get("is_admin"): return redirect(url_for("admin_login", next=request.path)) return view(*args, **kwargs) return wrapped @app.route("/admin/login", methods=["GET", "POST"]) def admin_login(): if request.method == "POST": password = request.form.get("password", "") # secrets.compare_digest schützt gegen Timing-Angriffe if secrets.compare_digest(password, ADMIN_PASSWORD): session.clear() session["is_admin"] = True next_url = request.args.get("next") or url_for("admin_dashboard") return redirect(next_url) flash("Falsches Passwort", "error") return render_template("admin_login.html") @app.route("/admin/logout") def admin_logout(): session.clear() return redirect(url_for("admin_login")) # -------------------------------------------------------------------------- # Admin Dashboard # -------------------------------------------------------------------------- @app.route("/admin") @login_required def admin_dashboard(): db = get_db() links = db.execute( "SELECT * FROM links ORDER BY created_at DESC" ).fetchall() return render_template("admin.html", links=links) @app.route("/admin/create", methods=["POST"]) @login_required def admin_create(): db = get_db() target = request.form.get("target", "") custom_code = request.form.get("code", "").strip() try: target = normalize_target(target) except ValueError as e: flash(str(e), "error") return redirect(url_for("admin_dashboard")) if custom_code: # Alias-Regeln: nur URL-sichere Zeichen allowed = set(string.ascii_letters + string.digits + "-_") if not set(custom_code) <= allowed: flash("Alias darf nur Buchstaben, Zahlen, - und _ enthalten", "error") return redirect(url_for("admin_dashboard")) existing = db.execute( "SELECT 1 FROM links WHERE code = ?", (custom_code,) ).fetchone() if existing: flash(f"Alias '{custom_code}' ist bereits vergeben", "error") return redirect(url_for("admin_dashboard")) code = custom_code is_alias = 1 else: code = generate_code() is_alias = 0 db.execute( "INSERT INTO links (code, target, is_alias, clicks, created_at) VALUES (?, ?, ?, 0, ?)", (code, target, is_alias, datetime.now(timezone.utc).isoformat()), ) db.commit() flash(f"Erstellt: /{code} → {target}", "success") return redirect(url_for("admin_dashboard")) @app.route("/admin/delete/", methods=["POST"]) @login_required def admin_delete(code): db = get_db() db.execute("DELETE FROM links WHERE code = ?", (code,)) db.commit() flash(f"Gelöscht: /{code}", "success") return redirect(url_for("admin_dashboard")) # -------------------------------------------------------------------------- # Public Redirect # -------------------------------------------------------------------------- @app.route("/") def index(): return redirect(url_for("admin_login")) @app.route("/") def do_redirect(code): db = get_db() row = db.execute("SELECT * FROM links WHERE code = ?", (code,)).fetchone() if row is None: abort(404) db.execute("UPDATE links SET clicks = clicks + 1 WHERE code = ?", (code,)) db.commit() return redirect(row["target"], code=302) @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 # -------------------------------------------------------------------------- # Health check (nützlich für docker-compose healthcheck) # -------------------------------------------------------------------------- @app.route("/healthz") def healthz(): return {"status": "ok"}, 200 init_db() if __name__ == "__main__": app.run(host="0.0.0.0", port=5001)