224 lines
6.9 KiB
Python
224 lines
6.9 KiB
Python
|
|
import os
|
||
|
|
import sqlite3
|
||
|
|
import string
|
||
|
|
import secrets
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from functools import wraps
|
||
|
|
|
||
|
|
from flask import (
|
||
|
|
Flask, g, request, redirect, render_template,
|
||
|
|
session, url_for, abort, flash
|
||
|
|
)
|
||
|
|
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
# Config
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
DB_PATH = os.environ.get("DB_PATH", "/data/links.db")
|
||
|
|
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD")
|
||
|
|
SECRET_KEY = os.environ.get("SECRET_KEY")
|
||
|
|
|
||
|
|
if not ADMIN_PASSWORD:
|
||
|
|
raise RuntimeError("ADMIN_PASSWORD env var ist nicht gesetzt")
|
||
|
|
if not SECRET_KEY:
|
||
|
|
raise RuntimeError("SECRET_KEY env var ist nicht gesetzt")
|
||
|
|
|
||
|
|
BASE62 = string.digits + string.ascii_lowercase + string.ascii_uppercase
|
||
|
|
CODE_LENGTH = 6
|
||
|
|
|
||
|
|
app = Flask(__name__)
|
||
|
|
app.secret_key = SECRET_KEY
|
||
|
|
app.config["SESSION_COOKIE_HTTPONLY"] = True
|
||
|
|
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
|
||
|
|
app.config["SESSION_COOKIE_SECURE"] = os.environ.get("COOKIE_SECURE", "true").lower() == "true"
|
||
|
|
|
||
|
|
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
# DB Helpers
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
def get_db():
|
||
|
|
if "db" not in g:
|
||
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
||
|
|
g.db = sqlite3.connect(DB_PATH)
|
||
|
|
g.db.row_factory = sqlite3.Row
|
||
|
|
g.db.execute("PRAGMA foreign_keys = ON")
|
||
|
|
return g.db
|
||
|
|
|
||
|
|
|
||
|
|
@app.teardown_appcontext
|
||
|
|
def close_db(exception=None):
|
||
|
|
db = g.pop("db", None)
|
||
|
|
if db is not None:
|
||
|
|
db.close()
|
||
|
|
|
||
|
|
|
||
|
|
def init_db():
|
||
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
||
|
|
conn = sqlite3.connect(DB_PATH)
|
||
|
|
conn.execute("""
|
||
|
|
CREATE TABLE IF NOT EXISTS links (
|
||
|
|
code TEXT PRIMARY KEY,
|
||
|
|
target TEXT NOT NULL,
|
||
|
|
is_alias INTEGER NOT NULL DEFAULT 0,
|
||
|
|
clicks INTEGER NOT NULL DEFAULT 0,
|
||
|
|
created_at TEXT NOT NULL
|
||
|
|
)
|
||
|
|
""")
|
||
|
|
conn.commit()
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
# Code generation
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
def generate_code(length=CODE_LENGTH):
|
||
|
|
"""Kryptographisch zufälligen, noch nicht vergebenen base62-Code erzeugen."""
|
||
|
|
db = get_db()
|
||
|
|
for _ in range(20):
|
||
|
|
code = "".join(secrets.choice(BASE62) for _ in range(length))
|
||
|
|
exists = db.execute("SELECT 1 FROM links WHERE code = ?", (code,)).fetchone()
|
||
|
|
if not exists:
|
||
|
|
return code
|
||
|
|
# extrem unwahrscheinlich, aber sicher ist sicher
|
||
|
|
raise RuntimeError("Konnte keinen freien Code generieren, Namespace evtl. zu voll")
|
||
|
|
|
||
|
|
|
||
|
|
def normalize_target(url: str) -> str:
|
||
|
|
url = url.strip()
|
||
|
|
if not url:
|
||
|
|
raise ValueError("Ziel-URL darf nicht leer sein")
|
||
|
|
if not (url.startswith("http://") or url.startswith("https://")):
|
||
|
|
url = "https://" + url
|
||
|
|
return url
|
||
|
|
|
||
|
|
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
# Auth
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
def login_required(view):
|
||
|
|
@wraps(view)
|
||
|
|
def wrapped(*args, **kwargs):
|
||
|
|
if not session.get("is_admin"):
|
||
|
|
return redirect(url_for("admin_login", next=request.path))
|
||
|
|
return view(*args, **kwargs)
|
||
|
|
return wrapped
|
||
|
|
|
||
|
|
|
||
|
|
@app.route("/admin/login", methods=["GET", "POST"])
|
||
|
|
def admin_login():
|
||
|
|
if request.method == "POST":
|
||
|
|
password = request.form.get("password", "")
|
||
|
|
# secrets.compare_digest schützt gegen Timing-Angriffe
|
||
|
|
if secrets.compare_digest(password, ADMIN_PASSWORD):
|
||
|
|
session.clear()
|
||
|
|
session["is_admin"] = True
|
||
|
|
next_url = request.args.get("next") or url_for("admin_dashboard")
|
||
|
|
return redirect(next_url)
|
||
|
|
flash("Falsches Passwort", "error")
|
||
|
|
return render_template("admin_login.html")
|
||
|
|
|
||
|
|
|
||
|
|
@app.route("/admin/logout")
|
||
|
|
def admin_logout():
|
||
|
|
session.clear()
|
||
|
|
return redirect(url_for("admin_login"))
|
||
|
|
|
||
|
|
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
# Admin Dashboard
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
@app.route("/admin")
|
||
|
|
@login_required
|
||
|
|
def admin_dashboard():
|
||
|
|
db = get_db()
|
||
|
|
links = db.execute(
|
||
|
|
"SELECT * FROM links ORDER BY created_at DESC"
|
||
|
|
).fetchall()
|
||
|
|
return render_template("admin.html", links=links)
|
||
|
|
|
||
|
|
|
||
|
|
@app.route("/admin/create", methods=["POST"])
|
||
|
|
@login_required
|
||
|
|
def admin_create():
|
||
|
|
db = get_db()
|
||
|
|
target = request.form.get("target", "")
|
||
|
|
custom_code = request.form.get("code", "").strip()
|
||
|
|
|
||
|
|
try:
|
||
|
|
target = normalize_target(target)
|
||
|
|
except ValueError as e:
|
||
|
|
flash(str(e), "error")
|
||
|
|
return redirect(url_for("admin_dashboard"))
|
||
|
|
|
||
|
|
if custom_code:
|
||
|
|
# Alias-Regeln: nur URL-sichere Zeichen
|
||
|
|
allowed = set(string.ascii_letters + string.digits + "-_")
|
||
|
|
if not set(custom_code) <= allowed:
|
||
|
|
flash("Alias darf nur Buchstaben, Zahlen, - und _ enthalten", "error")
|
||
|
|
return redirect(url_for("admin_dashboard"))
|
||
|
|
existing = db.execute(
|
||
|
|
"SELECT 1 FROM links WHERE code = ?", (custom_code,)
|
||
|
|
).fetchone()
|
||
|
|
if existing:
|
||
|
|
flash(f"Alias '{custom_code}' ist bereits vergeben", "error")
|
||
|
|
return redirect(url_for("admin_dashboard"))
|
||
|
|
code = custom_code
|
||
|
|
is_alias = 1
|
||
|
|
else:
|
||
|
|
code = generate_code()
|
||
|
|
is_alias = 0
|
||
|
|
|
||
|
|
db.execute(
|
||
|
|
"INSERT INTO links (code, target, is_alias, clicks, created_at) VALUES (?, ?, ?, 0, ?)",
|
||
|
|
(code, target, is_alias, datetime.now(timezone.utc).isoformat()),
|
||
|
|
)
|
||
|
|
db.commit()
|
||
|
|
flash(f"Erstellt: /{code} → {target}", "success")
|
||
|
|
return redirect(url_for("admin_dashboard"))
|
||
|
|
|
||
|
|
|
||
|
|
@app.route("/admin/delete/<code>", methods=["POST"])
|
||
|
|
@login_required
|
||
|
|
def admin_delete(code):
|
||
|
|
db = get_db()
|
||
|
|
db.execute("DELETE FROM links WHERE code = ?", (code,))
|
||
|
|
db.commit()
|
||
|
|
flash(f"Gelöscht: /{code}", "success")
|
||
|
|
return redirect(url_for("admin_dashboard"))
|
||
|
|
|
||
|
|
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
# Public Redirect
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
@app.route("/")
|
||
|
|
def index():
|
||
|
|
return redirect(url_for("admin_login"))
|
||
|
|
|
||
|
|
|
||
|
|
@app.route("/<code>")
|
||
|
|
def do_redirect(code):
|
||
|
|
db = get_db()
|
||
|
|
row = db.execute("SELECT * FROM links WHERE code = ?", (code,)).fetchone()
|
||
|
|
if row is None:
|
||
|
|
abort(404)
|
||
|
|
db.execute("UPDATE links SET clicks = clicks + 1 WHERE code = ?", (code,))
|
||
|
|
db.commit()
|
||
|
|
return redirect(row["target"], code=302)
|
||
|
|
|
||
|
|
|
||
|
|
@app.errorhandler(404)
|
||
|
|
def not_found(e):
|
||
|
|
return render_template("404.html"), 404
|
||
|
|
|
||
|
|
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
# Health check (nützlich für docker-compose healthcheck)
|
||
|
|
# --------------------------------------------------------------------------
|
||
|
|
@app.route("/healthz")
|
||
|
|
def healthz():
|
||
|
|
return {"status": "ok"}, 200
|
||
|
|
|
||
|
|
|
||
|
|
init_db()
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
app.run(host="0.0.0.0", port=5001)
|