import sqlite3 from pathlib import Path import logging import csv import io from functools import wraps from datetime import datetime from flask import Flask, flash, g, redirect, render_template, request, session, url_for from werkzeug.security import check_password_hash, generate_password_hash app = Flask(__name__) app.config["SECRET_KEY"] = "dev-secret-key" APP_VERSION = "2.1.0" DATABASE = Path(__file__).with_name("inventory.db") LOGFILE = Path(__file__).with_name("inventory.log") logging.basicConfig( filename=LOGFILE, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) ASSET_LABELS = { "chip": "Tuerchip", "parking_card": "Parkkarte", "pool_vehicle": "Poolfahrzeug", } ACTION_LABELS = { "assign": "Ausgabe", "return": "Rueckgabe", } PRINT_DESCRIPTIONS = { "assign": "Empfangsbestaetigung fuer die Ausgabe eines Mediums", "return": "Rueckgabebestaetigung fuer die Ruecknahme eines Mediums", } INPUT_PLACEHOLDERS = { "chip": "z. B. 100", "parking_card": "z. B. 200199887755123", "pool_vehicle": "z. B. GZ-CC-123", } IMPORT_ACTIONS = {"import", "ausgabe", "assign"} IMPORT_HEADER = ["user", "typ", "kennung", "aktion"] def get_db() -> sqlite3.Connection: if "db" not in g: g.db = sqlite3.connect(DATABASE) g.db.row_factory = sqlite3.Row return g.db @app.context_processor def inject_app_meta() -> dict[str, str | int]: host = request.host.split(":", 1)[0] if request.host else "-" return { "app_version": APP_VERSION, "app_host": host, "app_year": datetime.now().year, } @app.teardown_appcontext def close_db(_: object | None) -> None: db = g.pop("db", None) if db is not None: db.close() def init_db() -> None: db = get_db() db.executescript( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, full_name TEXT NOT NULL, email TEXT, department TEXT, chip_code TEXT, chip_assigned_at TEXT, parking_card_code TEXT, parking_card_assigned_at TEXT, pool_vehicle_code TEXT, pool_vehicle_assigned_at TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS transactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, asset_type TEXT NOT NULL CHECK (asset_type IN ('chip', 'parking_card', 'pool_vehicle')), asset_code TEXT NOT NULL, handled_by TEXT, action TEXT NOT NULL CHECK (action IN ('assign', 'return')), created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ); CREATE TABLE IF NOT EXISTS staff_users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, full_name TEXT NOT NULL, role TEXT NOT NULL CHECK (role IN ('admin', 'staff')), password_hash TEXT, must_set_password INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """ ) columns = { row["name"] for row in db.execute("PRAGMA table_info(transactions)").fetchall() } if "handled_by" not in columns: db.execute("ALTER TABLE transactions ADD COLUMN handled_by TEXT") transactions_sql = db.execute( "SELECT sql FROM sqlite_master WHERE type = 'table' AND name = 'transactions'" ).fetchone() if transactions_sql and "pool_vehicle" not in (transactions_sql["sql"] or ""): db.execute("DROP TABLE transactions") db.execute( """ CREATE TABLE transactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, asset_type TEXT NOT NULL CHECK (asset_type IN ('chip', 'parking_card', 'pool_vehicle')), asset_code TEXT NOT NULL, handled_by TEXT, action TEXT NOT NULL CHECK (action IN ('assign', 'return')), created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) """ ) user_columns = { row["name"] for row in db.execute("PRAGMA table_info(users)").fetchall() } if "pool_vehicle_code" not in user_columns: db.execute("ALTER TABLE users ADD COLUMN pool_vehicle_code TEXT") if "pool_vehicle_assigned_at" not in user_columns: db.execute("ALTER TABLE users ADD COLUMN pool_vehicle_assigned_at TEXT") admin_exists = db.execute( "SELECT id FROM staff_users WHERE role = 'admin' LIMIT 1" ).fetchone() if admin_exists is None: db.execute( """ INSERT INTO staff_users (username, full_name, role, password_hash, must_set_password) VALUES (?, ?, 'admin', NULL, 1) """, ("admin", "Administrator"), ) db.commit() def current_staff() -> sqlite3.Row | None: staff_id = session.get("staff_user_id") if not staff_id: return None return get_db().execute( "SELECT id, username, full_name, role, must_set_password FROM staff_users WHERE id = ?", (staff_id,), ).fetchone() def login_required(view): @wraps(view) def wrapped_view(*args, **kwargs): staff = current_staff() endpoint = request.endpoint or "" allowed_without_password = {"set_password", "logout", "static"} if staff is None: return redirect(url_for("login")) if staff["must_set_password"] and endpoint not in allowed_without_password: flash("Bitte zuerst ein Passwort setzen.") return redirect(url_for("set_password")) g.current_staff = staff return view(*args, **kwargs) return wrapped_view def admin_required(view): @wraps(view) @login_required def wrapped_view(*args, **kwargs): if g.current_staff["role"] != "admin": flash("Nur Admins duerfen diese Seite aufrufen.") return redirect(url_for("index")) return view(*args, **kwargs) return wrapped_view def log_asset_event( action: str, user_name: str, asset_type: str, asset_code: str, handled_by: str, ) -> None: logging.info( "%s | user=%s | typ=%s | kennung=%s | bearbeiter=%s", action, user_name, ASSET_LABELS[asset_type], asset_code, handled_by, ) def read_recent_logs(limit: int = 20) -> list[str]: if not LOGFILE.exists(): return [] lines = LOGFILE.read_text(encoding="utf-8").splitlines() return list(reversed(lines[-limit:])) def asset_code_in_use(db: sqlite3.Connection, asset_type: str, asset_code: str) -> bool: column_name = { "chip": "chip_code", "parking_card": "parking_card_code", "pool_vehicle": "pool_vehicle_code", }[asset_type] existing = db.execute( f"SELECT id FROM users WHERE {column_name} = ?", (asset_code,), ).fetchone() return existing is not None def normalize_asset_type(value: str) -> str | None: normalized = value.strip().lower() aliases = { "chip": "chip", "tuerchip": "chip", "parkkarte": "parking_card", "parking_card": "parking_card", "parking card": "parking_card", "poolfahrzeug": "pool_vehicle", "pool vehicle": "pool_vehicle", "pool_vehicle": "pool_vehicle", } return aliases.get(normalized) def is_import_header(parts: list[str]) -> bool: normalized = [part.strip().lower() for part in parts] return normalized == IMPORT_HEADER def get_transaction_for_print(transaction_id: int) -> sqlite3.Row | None: return get_db().execute( """ SELECT t.id, t.created_at, t.asset_type, t.asset_code, t.action, t.handled_by, u.full_name, u.email, u.department FROM transactions t JOIN users u ON u.id = t.user_id WHERE t.id = ? """, (transaction_id,), ).fetchone() @app.before_request def ensure_database() -> None: init_db() g.current_staff = current_staff() @app.route("/login", methods=["GET", "POST"]) def login() -> str: if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") db = get_db() staff = db.execute( "SELECT * FROM staff_users WHERE username = ?", (username,), ).fetchone() if staff is None: flash("Benutzer wurde nicht gefunden.") return redirect(url_for("login")) if staff["must_set_password"]: session.clear() session["staff_user_id"] = staff["id"] flash("Bitte jetzt Ihr Passwort vergeben.") return redirect(url_for("set_password")) if not staff["password_hash"] or not check_password_hash(staff["password_hash"], password): flash("Login fehlgeschlagen.") return redirect(url_for("login")) session.clear() session["staff_user_id"] = staff["id"] flash("Anmeldung erfolgreich.") return redirect(url_for("index")) return render_template("login.html") @app.route("/set-password", methods=["GET", "POST"]) def set_password() -> str: staff = current_staff() if staff is None: flash("Bitte zuerst anmelden.") return redirect(url_for("login")) if request.method == "POST": password = request.form.get("password", "") password_confirm = request.form.get("password_confirm", "") if len(password) < 8: flash("Das Passwort muss mindestens 8 Zeichen lang sein.") return redirect(url_for("set_password")) if password != password_confirm: flash("Die Passwoerter stimmen nicht ueberein.") return redirect(url_for("set_password")) db = get_db() db.execute( "UPDATE staff_users SET password_hash = ?, must_set_password = 0 WHERE id = ?", (generate_password_hash(password), staff["id"]), ) db.commit() flash("Passwort wurde gespeichert.") return redirect(url_for("index")) return render_template("set_password.html") @app.route("/logout") def logout() -> str: session.clear() flash("Sie wurden abgemeldet.") return redirect(url_for("login")) @app.route("/admin/staff", methods=["GET", "POST"]) @admin_required def manage_staff() -> str: db = get_db() if request.method == "POST": username = request.form.get("username", "").strip() full_name = request.form.get("full_name", "").strip() role = request.form.get("role", "staff").strip() if not username or not full_name or role not in {"admin", "staff"}: flash("Bitte alle Bearbeiterdaten korrekt eingeben.") return redirect(url_for("manage_staff")) try: db.execute( "INSERT INTO staff_users (username, full_name, role, password_hash, must_set_password) VALUES (?, ?, ?, NULL, 1)", (username, full_name, role), ) db.commit() except sqlite3.IntegrityError: flash("Der Benutzername ist bereits vergeben.") return redirect(url_for("manage_staff")) flash("Bearbeiter wurde angelegt. Passwort wird bei der ersten Anmeldung gesetzt.") return redirect(url_for("manage_staff")) staff_users = db.execute( "SELECT id, username, full_name, role, must_set_password, created_at FROM staff_users ORDER BY full_name COLLATE NOCASE" ).fetchall() return render_template("manage_staff.html", staff_users=staff_users) @app.route("/admin/staff//reset-password", methods=["POST"]) @admin_required def reset_staff_password(staff_id: int) -> str: db = get_db() staff = db.execute( "SELECT id FROM staff_users WHERE id = ?", (staff_id,), ).fetchone() if staff is None: flash("Bearbeiter wurde nicht gefunden.") return redirect(url_for("manage_staff")) db.execute( "UPDATE staff_users SET password_hash = NULL, must_set_password = 1 WHERE id = ?", (staff_id,), ) db.commit() flash("Passwort-Reset wurde gesetzt. Der Bearbeiter muss bei der naechsten Anmeldung ein neues Passwort vergeben.") return redirect(url_for("manage_staff")) @app.route("/admin/staff//delete", methods=["POST"]) @admin_required def delete_staff(staff_id: int) -> str: db = get_db() staff = db.execute( "SELECT id, role, full_name FROM staff_users WHERE id = ?", (staff_id,), ).fetchone() if staff is None: flash("Bearbeiter wurde nicht gefunden.") return redirect(url_for("manage_staff")) if staff["id"] == g.current_staff["id"]: flash("Der aktuell angemeldete Bearbeiter kann nicht geloescht werden.") return redirect(url_for("manage_staff")) if staff["role"] == "admin": admin_count = db.execute( "SELECT COUNT(*) AS admin_count FROM staff_users WHERE role = 'admin'" ).fetchone() if admin_count["admin_count"] <= 1: flash("Der letzte Admin kann nicht geloescht werden.") return redirect(url_for("manage_staff")) db.execute("DELETE FROM staff_users WHERE id = ?", (staff_id,)) db.commit() flash(f"Bearbeiter '{staff['full_name']}' wurde geloescht.") return redirect(url_for("manage_staff")) @app.route("/admin/import", methods=["GET", "POST"]) @admin_required def import_data() -> str: if request.method == "POST": upload = request.files.get("import_file") raw_rows = request.form.get("import_rows", "") if upload and upload.filename: try: decoded = upload.stream.read().decode("utf-8-sig") except UnicodeDecodeError: flash("Die CSV-Datei muss UTF-8 kodiert sein.", "error") return redirect(url_for("import_data")) csv_rows: list[str] = [] reader = csv.reader(io.StringIO(decoded), delimiter=";") for row in reader: if not row or not any(cell.strip() for cell in row): continue csv_rows.append(";".join(cell.strip() for cell in row)) raw_rows = "\n".join(csv_rows) rows = [line.strip() for line in raw_rows.splitlines() if line.strip()] if not rows: flash("Bitte mindestens eine Importzeile eingeben.", "error") return redirect(url_for("import_data")) db = get_db() imported_count = 0 errors: list[str] = [] operations: list[tuple[str, object]] = [] for index, row in enumerate(rows, start=1): parts = [part.strip() for part in row.split(";")] if is_import_header(parts): continue if len(parts) != 4: errors.append(f"Zeile {index}: ungueltig. Erwartet wird: User;Typ;Kennung;Aktion") continue full_name, raw_asset_type, asset_code, raw_action = parts asset_type = normalize_asset_type(raw_asset_type) action = raw_action.strip().lower() if not full_name or not asset_type or not asset_code: errors.append(f"Zeile {index}: enthaelt unvollstaendige oder ungueltige Werte.") continue if action not in IMPORT_ACTIONS: errors.append(f"Zeile {index}: ungueltige Aktion. Erlaubt: Import") continue user = db.execute( "SELECT * FROM users WHERE full_name = ? COLLATE NOCASE", (full_name,), ).fetchone() pending_user = None for operation_type, payload in operations: if operation_type == "user" and payload["full_name"].lower() == full_name.lower(): pending_user = payload break current_chip_code = user["chip_code"] if user else None current_card_code = user["parking_card_code"] if user else None current_vehicle_code = user["pool_vehicle_code"] if user else None user_id = user["id"] if user else None if pending_user is not None: current_chip_code = pending_user["chip_code"] current_card_code = pending_user["parking_card_code"] current_vehicle_code = pending_user["pool_vehicle_code"] if user is None and pending_user is None: pending_user = { "full_name": full_name, "chip_code": None, "parking_card_code": None, "pool_vehicle_code": None, } operations.append(("user", pending_user)) if asset_code_in_use(db, asset_type, asset_code): assigned_to_same_user = ( asset_type == "chip" and current_chip_code == asset_code ) or ( asset_type == "parking_card" and current_card_code == asset_code ) or ( asset_type == "pool_vehicle" and current_vehicle_code == asset_code ) if not assigned_to_same_user: errors.append(f"Zeile {index}: Kennung '{asset_code}' ist bereits vergeben.") continue duplicate_in_import = False for operation_type, payload in operations: if operation_type != "assign": continue if payload["asset_type"] == asset_type and payload["asset_code"] == asset_code: same_user = payload["full_name"].lower() == full_name.lower() if not same_user: duplicate_in_import = True break if duplicate_in_import: errors.append(f"Zeile {index}: Kennung '{asset_code}' wird im Import mehrfach vergeben.") continue if asset_type == "chip": if current_chip_code and current_chip_code != asset_code: errors.append(f"Zeile {index}: User '{full_name}' hat bereits einen anderen Tuerchip.") continue if pending_user is not None: pending_user["chip_code"] = asset_code elif asset_type == "parking_card": if current_card_code and current_card_code != asset_code: errors.append(f"Zeile {index}: User '{full_name}' hat bereits eine andere Parkkarte.") continue if pending_user is not None: pending_user["parking_card_code"] = asset_code else: if current_vehicle_code and current_vehicle_code != asset_code: errors.append(f"Zeile {index}: User '{full_name}' hat bereits ein anderes Poolfahrzeug.") continue if pending_user is not None: pending_user["pool_vehicle_code"] = asset_code operations.append( ( "assign", { "full_name": full_name, "user_id": user_id, "asset_type": asset_type, "asset_code": asset_code, }, ) ) if errors: flash(errors, "import-errors") return redirect(url_for("import_data")) user_ids_by_name: dict[str, int] = {} for operation_type, payload in operations: if operation_type == "user": db.execute( "INSERT INTO users (full_name) VALUES (?)", (payload["full_name"],), ) user_ids_by_name[payload["full_name"].lower()] = db.execute( "SELECT last_insert_rowid() AS id" ).fetchone()["id"] for operation_type, payload in operations: if operation_type != "assign": continue resolved_user_id = payload["user_id"] or user_ids_by_name[payload["full_name"].lower()] if payload["asset_type"] == "chip": db.execute( "UPDATE users SET chip_code = ?, chip_assigned_at = CURRENT_TIMESTAMP WHERE id = ?", (payload["asset_code"], resolved_user_id), ) elif payload["asset_type"] == "parking_card": db.execute( "UPDATE users SET parking_card_code = ?, parking_card_assigned_at = CURRENT_TIMESTAMP WHERE id = ?", (payload["asset_code"], resolved_user_id), ) else: db.execute( "UPDATE users SET pool_vehicle_code = ?, pool_vehicle_assigned_at = CURRENT_TIMESTAMP WHERE id = ?", (payload["asset_code"], resolved_user_id), ) db.execute( "INSERT INTO transactions (user_id, asset_type, asset_code, handled_by, action) VALUES (?, ?, ?, ?, 'assign')", (resolved_user_id, payload["asset_type"], payload["asset_code"], "Import"), ) log_asset_event("import", payload["full_name"], payload["asset_type"], payload["asset_code"], "Import") imported_count += 1 db.commit() flash(f"{imported_count} Importzeilen wurden verarbeitet.", "success") return redirect(url_for("index")) return render_template("import_data.html") @app.route("/") @login_required def index() -> str: db = get_db() search_query = request.args.get("q", "").strip() params: tuple[str, ...] = () user_query = """ SELECT id, full_name, email, department, chip_code, chip_assigned_at, parking_card_code, parking_card_assigned_at, pool_vehicle_code, pool_vehicle_assigned_at FROM users """ if search_query: like_value = f"%{search_query}%" user_query += """ WHERE full_name LIKE ? OR email LIKE ? OR chip_code LIKE ? OR parking_card_code LIKE ? OR pool_vehicle_code LIKE ? """ params = (like_value, like_value, like_value, like_value, like_value) user_query += " ORDER BY full_name COLLATE NOCASE" users = db.execute(user_query, params).fetchall() stats = db.execute( """ SELECT COUNT(*) AS users, SUM(CASE WHEN chip_code IS NOT NULL AND chip_code != '' THEN 1 ELSE 0 END) AS active_chips, SUM(CASE WHEN parking_card_code IS NOT NULL AND parking_card_code != '' THEN 1 ELSE 0 END) AS active_cards, SUM(CASE WHEN pool_vehicle_code IS NOT NULL AND pool_vehicle_code != '' THEN 1 ELSE 0 END) AS active_pool_vehicles FROM users """ ).fetchone() transactions = db.execute( """ SELECT t.id, t.created_at, t.asset_type, t.asset_code, t.action, t.handled_by, u.full_name FROM transactions t JOIN users u ON u.id = t.user_id ORDER BY t.created_at DESC, t.id DESC LIMIT 10 """ ).fetchall() recent_logs = read_recent_logs() return render_template( "index.html", users=users, transactions=transactions, stats={ "users": stats["users"], "active_chips": stats["active_chips"], "active_cards": stats["active_cards"], "active_pool_vehicles": stats["active_pool_vehicles"], }, search_query=search_query, asset_labels=ASSET_LABELS, action_labels=ACTION_LABELS, recent_logs=recent_logs, ) @app.route("/users/new", methods=["GET", "POST"]) @login_required def create_user() -> str: if request.method == "POST": full_name = request.form.get("full_name", "").strip() email = request.form.get("email", "").strip() or None department = request.form.get("department", "").strip() or None if not full_name: flash("Bitte einen Namen eingeben.") return redirect(url_for("create_user")) db = get_db() db.execute( "INSERT INTO users (full_name, email, department) VALUES (?, ?, ?)", (full_name, email, department), ) db.commit() flash("User wurde angelegt.") return redirect(url_for("index")) return render_template("create_user.html") @app.route("/assign", methods=["GET", "POST"]) @login_required def assign_asset() -> str: db = get_db() asset_type = request.form.get("asset_type", "chip").strip() if request.method == "POST" else request.args.get("asset_type", "chip").strip() if asset_type not in {"chip", "parking_card", "pool_vehicle"}: asset_type = "chip" if request.method == "POST": user_id = request.form.get("user_id", "").strip() asset_code = request.form.get("asset_code", "").strip() handled_by = g.current_staff["full_name"] if not user_id or asset_type not in {"chip", "parking_card", "pool_vehicle"}: flash("Bitte alle Felder fuer die Ausgabe ausfuellen.") return redirect(url_for("assign_asset")) if not asset_code: flash("Bitte alle Felder fuer die Ausgabe ausfuellen.") return redirect(url_for("assign_asset")) user = db.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if user is None: flash("Ausgewaehlter User wurde nicht gefunden.") return redirect(url_for("assign_asset")) if asset_code_in_use(db, asset_type, asset_code): flash("Dieses Kennzeichen ist bereits vergeben." if asset_type == "pool_vehicle" else "Diese Kennung ist bereits vergeben.") return redirect(url_for("assign_asset")) if asset_type == "chip": if user["chip_code"]: flash("Dieser User hat bereits einen aktiven Tuerchip.") return redirect(url_for("assign_asset")) db.execute( "UPDATE users SET chip_code = ?, chip_assigned_at = CURRENT_TIMESTAMP WHERE id = ?", (asset_code, user_id), ) elif asset_type == "parking_card": if user["parking_card_code"]: flash("Dieser User hat bereits eine aktive Parkkarte.") return redirect(url_for("assign_asset")) db.execute( "UPDATE users SET parking_card_code = ?, parking_card_assigned_at = CURRENT_TIMESTAMP WHERE id = ?", (asset_code, user_id), ) else: if user["pool_vehicle_code"]: flash("Dieser User hat bereits ein aktives Poolfahrzeug.") return redirect(url_for("assign_asset")) db.execute( "UPDATE users SET pool_vehicle_code = ?, pool_vehicle_assigned_at = CURRENT_TIMESTAMP WHERE id = ?", (asset_code, user_id), ) db.execute( "INSERT INTO transactions (user_id, asset_type, asset_code, handled_by, action) VALUES (?, ?, ?, ?, 'assign')", (user_id, asset_type, asset_code, handled_by), ) transaction_id = db.execute("SELECT last_insert_rowid() AS id").fetchone()["id"] db.commit() log_asset_event("ausgabe", user["full_name"], asset_type, asset_code, handled_by) flash("Ausgabe wurde gespeichert.") return redirect(url_for("print_transaction", transaction_id=transaction_id)) users = db.execute("SELECT id, full_name FROM users ORDER BY full_name COLLATE NOCASE").fetchall() return render_template( "assign_asset.html", users=users, selected_asset_type=asset_type, input_placeholder=INPUT_PLACEHOLDERS[asset_type], ) @app.route("/return", methods=["GET", "POST"]) @login_required def return_asset() -> str: db = get_db() asset_type = request.form.get("asset_type", "chip").strip() if request.method == "POST" else request.args.get("asset_type", "chip").strip() if asset_type not in {"chip", "parking_card", "pool_vehicle"}: asset_type = "chip" if request.method == "POST": user_id = request.form.get("user_id", "").strip() handled_by = g.current_staff["full_name"] if not user_id or asset_type not in {"chip", "parking_card", "pool_vehicle"}: flash("Bitte User und Typ fuer die Rueckgabe waehlen.") return redirect(url_for("return_asset")) user = db.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if user is None: flash("Ausgewaehlter User wurde nicht gefunden.") return redirect(url_for("return_asset")) asset_code = ( user["chip_code"] if asset_type == "chip" else user["parking_card_code"] if asset_type == "parking_card" else user["pool_vehicle_code"] ) if not asset_code: flash("Fuer diesen User ist kein entsprechendes Medium aktiv hinterlegt.") return redirect(url_for("return_asset")) if asset_type == "chip": db.execute( "UPDATE users SET chip_code = NULL, chip_assigned_at = NULL WHERE id = ?", (user_id,), ) elif asset_type == "parking_card": db.execute( "UPDATE users SET parking_card_code = NULL, parking_card_assigned_at = NULL WHERE id = ?", (user_id,), ) else: db.execute( "UPDATE users SET pool_vehicle_code = NULL, pool_vehicle_assigned_at = NULL WHERE id = ?", (user_id,), ) db.execute( "INSERT INTO transactions (user_id, asset_type, asset_code, handled_by, action) VALUES (?, ?, ?, ?, 'return')", (user_id, asset_type, asset_code, handled_by), ) transaction_id = db.execute("SELECT last_insert_rowid() AS id").fetchone()["id"] db.commit() log_asset_event("rueckgabe", user["full_name"], asset_type, asset_code, handled_by) flash("Rueckgabe wurde gespeichert.") return redirect(url_for("print_transaction", transaction_id=transaction_id)) users = db.execute("SELECT id, full_name FROM users ORDER BY full_name COLLATE NOCASE").fetchall() return render_template( "return_asset.html", users=users, selected_asset_type=asset_type, ) @app.route("/transactions//print") @login_required def print_transaction(transaction_id: int) -> str: transaction = get_transaction_for_print(transaction_id) if transaction is None: flash("Druckbeleg wurde nicht gefunden.") return redirect(url_for("index")) return render_template( "print_transaction.html", transaction=transaction, asset_labels=ASSET_LABELS, action_labels=ACTION_LABELS, print_descriptions=PRINT_DESCRIPTIONS, ) if __name__ == "__main__": app.run(host="0.0.0.0", port=5006, debug=True)