Files
keyVerwaltung/app.py
Erik Thiele f255e50dd3 first Commit
2026-05-18 21:14:04 +02:00

782 lines
26 KiB
Python

import sqlite3
from pathlib import Path
import logging
import csv
import io
from functools import wraps
from datetime import datetime
from flask import Flask, flash, g, redirect, render_template, request, session, url_for
from werkzeug.security import check_password_hash, generate_password_hash
app = Flask(__name__)
app.config["SECRET_KEY"] = "dev-secret-key"
APP_VERSION = "1.0.0"
DATABASE = Path(__file__).with_name("inventory.db")
LOGFILE = Path(__file__).with_name("inventory.log")
logging.basicConfig(
filename=LOGFILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
ASSET_LABELS = {
"chip": "Tuerchip",
"parking_card": "Parkkarte",
}
ACTION_LABELS = {
"assign": "Ausgabe",
"return": "Rueckgabe",
}
PRINT_DESCRIPTIONS = {
"assign": "Empfangsbestaetigung fuer die Ausgabe eines Mediums",
"return": "Rueckgabebestaetigung fuer die Ruecknahme eines Mediums",
}
IMPORT_ACTIONS = {"import", "ausgabe", "assign"}
IMPORT_HEADER = ["user", "typ", "kennung", "aktion"]
def get_db() -> sqlite3.Connection:
if "db" not in g:
g.db = sqlite3.connect(DATABASE)
g.db.row_factory = sqlite3.Row
return g.db
@app.context_processor
def inject_app_meta() -> dict[str, str | int]:
host = request.host.split(":", 1)[0] if request.host else "-"
return {
"app_version": APP_VERSION,
"app_host": host,
"app_year": datetime.now().year,
}
@app.teardown_appcontext
def close_db(_: object | None) -> None:
db = g.pop("db", None)
if db is not None:
db.close()
def init_db() -> None:
db = get_db()
db.executescript(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
full_name TEXT NOT NULL,
email TEXT,
department TEXT,
chip_code TEXT,
chip_assigned_at TEXT,
parking_card_code TEXT,
parking_card_assigned_at TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
asset_type TEXT NOT NULL CHECK (asset_type IN ('chip', 'parking_card')),
asset_code TEXT NOT NULL,
handled_by TEXT,
action TEXT NOT NULL CHECK (action IN ('assign', 'return')),
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS staff_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
full_name TEXT NOT NULL,
role TEXT NOT NULL CHECK (role IN ('admin', 'staff')),
password_hash TEXT,
must_set_password INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
"""
)
columns = {
row["name"]
for row in db.execute("PRAGMA table_info(transactions)").fetchall()
}
if "handled_by" not in columns:
db.execute("ALTER TABLE transactions ADD COLUMN handled_by TEXT")
admin_exists = db.execute(
"SELECT id FROM staff_users WHERE role = 'admin' LIMIT 1"
).fetchone()
if admin_exists is None:
db.execute(
"""
INSERT INTO staff_users (username, full_name, role, password_hash, must_set_password)
VALUES (?, ?, 'admin', NULL, 1)
""",
("admin", "Administrator"),
)
db.commit()
def current_staff() -> sqlite3.Row | None:
staff_id = session.get("staff_user_id")
if not staff_id:
return None
return get_db().execute(
"SELECT id, username, full_name, role, must_set_password FROM staff_users WHERE id = ?",
(staff_id,),
).fetchone()
def login_required(view):
@wraps(view)
def wrapped_view(*args, **kwargs):
staff = current_staff()
endpoint = request.endpoint or ""
allowed_without_password = {"set_password", "logout", "static"}
if staff is None:
return redirect(url_for("login"))
if staff["must_set_password"] and endpoint not in allowed_without_password:
flash("Bitte zuerst ein Passwort setzen.")
return redirect(url_for("set_password"))
g.current_staff = staff
return view(*args, **kwargs)
return wrapped_view
def admin_required(view):
@wraps(view)
@login_required
def wrapped_view(*args, **kwargs):
if g.current_staff["role"] != "admin":
flash("Nur Admins duerfen diese Seite aufrufen.")
return redirect(url_for("index"))
return view(*args, **kwargs)
return wrapped_view
def log_asset_event(
action: str,
user_name: str,
asset_type: str,
asset_code: str,
handled_by: str,
) -> None:
logging.info(
"%s | user=%s | typ=%s | kennung=%s | bearbeiter=%s",
action,
user_name,
ASSET_LABELS[asset_type],
asset_code,
handled_by,
)
def read_recent_logs(limit: int = 20) -> list[str]:
if not LOGFILE.exists():
return []
lines = LOGFILE.read_text(encoding="utf-8").splitlines()
return list(reversed(lines[-limit:]))
def asset_code_in_use(db: sqlite3.Connection, asset_type: str, asset_code: str) -> bool:
column_name = "chip_code" if asset_type == "chip" else "parking_card_code"
existing = db.execute(
f"SELECT id FROM users WHERE {column_name} = ?",
(asset_code,),
).fetchone()
return existing is not None
def normalize_asset_type(value: str) -> str | None:
normalized = value.strip().lower()
aliases = {
"chip": "chip",
"tuerchip": "chip",
"parkkarte": "parking_card",
"parking_card": "parking_card",
"parking card": "parking_card",
}
return aliases.get(normalized)
def is_import_header(parts: list[str]) -> bool:
normalized = [part.strip().lower() for part in parts]
return normalized == IMPORT_HEADER
def get_transaction_for_print(transaction_id: int) -> sqlite3.Row | None:
return get_db().execute(
"""
SELECT t.id, t.created_at, t.asset_type, t.asset_code, t.action, t.handled_by,
u.full_name, u.email, u.department
FROM transactions t
JOIN users u ON u.id = t.user_id
WHERE t.id = ?
""",
(transaction_id,),
).fetchone()
@app.before_request
def ensure_database() -> None:
init_db()
g.current_staff = current_staff()
@app.route("/login", methods=["GET", "POST"])
def login() -> str:
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
db = get_db()
staff = db.execute(
"SELECT * FROM staff_users WHERE username = ?",
(username,),
).fetchone()
if staff is None:
flash("Benutzer wurde nicht gefunden.")
return redirect(url_for("login"))
if staff["must_set_password"]:
session.clear()
session["staff_user_id"] = staff["id"]
flash("Bitte jetzt Ihr Passwort vergeben.")
return redirect(url_for("set_password"))
if not staff["password_hash"] or not check_password_hash(staff["password_hash"], password):
flash("Login fehlgeschlagen.")
return redirect(url_for("login"))
session.clear()
session["staff_user_id"] = staff["id"]
flash("Anmeldung erfolgreich.")
return redirect(url_for("index"))
return render_template("login.html")
@app.route("/set-password", methods=["GET", "POST"])
def set_password() -> str:
staff = current_staff()
if staff is None:
flash("Bitte zuerst anmelden.")
return redirect(url_for("login"))
if request.method == "POST":
password = request.form.get("password", "")
password_confirm = request.form.get("password_confirm", "")
if len(password) < 8:
flash("Das Passwort muss mindestens 8 Zeichen lang sein.")
return redirect(url_for("set_password"))
if password != password_confirm:
flash("Die Passwoerter stimmen nicht ueberein.")
return redirect(url_for("set_password"))
db = get_db()
db.execute(
"UPDATE staff_users SET password_hash = ?, must_set_password = 0 WHERE id = ?",
(generate_password_hash(password), staff["id"]),
)
db.commit()
flash("Passwort wurde gespeichert.")
return redirect(url_for("index"))
return render_template("set_password.html")
@app.route("/logout")
def logout() -> str:
session.clear()
flash("Sie wurden abgemeldet.")
return redirect(url_for("login"))
@app.route("/admin/staff", methods=["GET", "POST"])
@admin_required
def manage_staff() -> str:
db = get_db()
if request.method == "POST":
username = request.form.get("username", "").strip()
full_name = request.form.get("full_name", "").strip()
role = request.form.get("role", "staff").strip()
if not username or not full_name or role not in {"admin", "staff"}:
flash("Bitte alle Bearbeiterdaten korrekt eingeben.")
return redirect(url_for("manage_staff"))
try:
db.execute(
"INSERT INTO staff_users (username, full_name, role, password_hash, must_set_password) VALUES (?, ?, ?, NULL, 1)",
(username, full_name, role),
)
db.commit()
except sqlite3.IntegrityError:
flash("Der Benutzername ist bereits vergeben.")
return redirect(url_for("manage_staff"))
flash("Bearbeiter wurde angelegt. Passwort wird bei der ersten Anmeldung gesetzt.")
return redirect(url_for("manage_staff"))
staff_users = db.execute(
"SELECT id, username, full_name, role, must_set_password, created_at FROM staff_users ORDER BY full_name COLLATE NOCASE"
).fetchall()
return render_template("manage_staff.html", staff_users=staff_users)
@app.route("/admin/staff/<int:staff_id>/reset-password", methods=["POST"])
@admin_required
def reset_staff_password(staff_id: int) -> str:
db = get_db()
staff = db.execute(
"SELECT id FROM staff_users WHERE id = ?",
(staff_id,),
).fetchone()
if staff is None:
flash("Bearbeiter wurde nicht gefunden.")
return redirect(url_for("manage_staff"))
db.execute(
"UPDATE staff_users SET password_hash = NULL, must_set_password = 1 WHERE id = ?",
(staff_id,),
)
db.commit()
flash("Passwort-Reset wurde gesetzt. Der Bearbeiter muss bei der naechsten Anmeldung ein neues Passwort vergeben.")
return redirect(url_for("manage_staff"))
@app.route("/admin/staff/<int:staff_id>/delete", methods=["POST"])
@admin_required
def delete_staff(staff_id: int) -> str:
db = get_db()
staff = db.execute(
"SELECT id, role, full_name FROM staff_users WHERE id = ?",
(staff_id,),
).fetchone()
if staff is None:
flash("Bearbeiter wurde nicht gefunden.")
return redirect(url_for("manage_staff"))
if staff["id"] == g.current_staff["id"]:
flash("Der aktuell angemeldete Bearbeiter kann nicht geloescht werden.")
return redirect(url_for("manage_staff"))
if staff["role"] == "admin":
admin_count = db.execute(
"SELECT COUNT(*) AS admin_count FROM staff_users WHERE role = 'admin'"
).fetchone()
if admin_count["admin_count"] <= 1:
flash("Der letzte Admin kann nicht geloescht werden.")
return redirect(url_for("manage_staff"))
db.execute("DELETE FROM staff_users WHERE id = ?", (staff_id,))
db.commit()
flash(f"Bearbeiter '{staff['full_name']}' wurde geloescht.")
return redirect(url_for("manage_staff"))
@app.route("/admin/import", methods=["GET", "POST"])
@admin_required
def import_data() -> str:
if request.method == "POST":
upload = request.files.get("import_file")
raw_rows = request.form.get("import_rows", "")
if upload and upload.filename:
try:
decoded = upload.stream.read().decode("utf-8-sig")
except UnicodeDecodeError:
flash("Die CSV-Datei muss UTF-8 kodiert sein.", "error")
return redirect(url_for("import_data"))
csv_rows: list[str] = []
reader = csv.reader(io.StringIO(decoded), delimiter=";")
for row in reader:
if not row or not any(cell.strip() for cell in row):
continue
csv_rows.append(";".join(cell.strip() for cell in row))
raw_rows = "\n".join(csv_rows)
rows = [line.strip() for line in raw_rows.splitlines() if line.strip()]
if not rows:
flash("Bitte mindestens eine Importzeile eingeben.", "error")
return redirect(url_for("import_data"))
db = get_db()
imported_count = 0
errors: list[str] = []
operations: list[tuple[str, object]] = []
for index, row in enumerate(rows, start=1):
parts = [part.strip() for part in row.split(";")]
if is_import_header(parts):
continue
if len(parts) != 4:
errors.append(f"Zeile {index}: ungueltig. Erwartet wird: User;Typ;Kennung;Aktion")
continue
full_name, raw_asset_type, asset_code, raw_action = parts
asset_type = normalize_asset_type(raw_asset_type)
action = raw_action.strip().lower()
if not full_name or not asset_type or not asset_code:
errors.append(f"Zeile {index}: enthaelt unvollstaendige oder ungueltige Werte.")
continue
if action not in IMPORT_ACTIONS:
errors.append(f"Zeile {index}: ungueltige Aktion. Erlaubt: Import")
continue
user = db.execute(
"SELECT * FROM users WHERE full_name = ? COLLATE NOCASE",
(full_name,),
).fetchone()
pending_user = None
for operation_type, payload in operations:
if operation_type == "user" and payload["full_name"].lower() == full_name.lower():
pending_user = payload
break
current_chip_code = user["chip_code"] if user else None
current_card_code = user["parking_card_code"] if user else None
user_id = user["id"] if user else None
if pending_user is not None:
current_chip_code = pending_user["chip_code"]
current_card_code = pending_user["parking_card_code"]
if user is None and pending_user is None:
pending_user = {
"full_name": full_name,
"chip_code": None,
"parking_card_code": None,
}
operations.append(("user", pending_user))
if asset_code_in_use(db, asset_type, asset_code):
assigned_to_same_user = (
asset_type == "chip" and current_chip_code == asset_code
) or (
asset_type == "parking_card" and current_card_code == asset_code
)
if not assigned_to_same_user:
errors.append(f"Zeile {index}: Kennung '{asset_code}' ist bereits vergeben.")
continue
duplicate_in_import = False
for operation_type, payload in operations:
if operation_type != "assign":
continue
if payload["asset_type"] == asset_type and payload["asset_code"] == asset_code:
same_user = payload["full_name"].lower() == full_name.lower()
if not same_user:
duplicate_in_import = True
break
if duplicate_in_import:
errors.append(f"Zeile {index}: Kennung '{asset_code}' wird im Import mehrfach vergeben.")
continue
if asset_type == "chip":
if current_chip_code and current_chip_code != asset_code:
errors.append(f"Zeile {index}: User '{full_name}' hat bereits einen anderen Tuerchip.")
continue
if pending_user is not None:
pending_user["chip_code"] = asset_code
else:
if current_card_code and current_card_code != asset_code:
errors.append(f"Zeile {index}: User '{full_name}' hat bereits eine andere Parkkarte.")
continue
if pending_user is not None:
pending_user["parking_card_code"] = asset_code
operations.append(
(
"assign",
{
"full_name": full_name,
"user_id": user_id,
"asset_type": asset_type,
"asset_code": asset_code,
},
)
)
if errors:
flash(errors, "import-errors")
return redirect(url_for("import_data"))
user_ids_by_name: dict[str, int] = {}
for operation_type, payload in operations:
if operation_type == "user":
db.execute(
"INSERT INTO users (full_name) VALUES (?)",
(payload["full_name"],),
)
user_ids_by_name[payload["full_name"].lower()] = db.execute(
"SELECT last_insert_rowid() AS id"
).fetchone()["id"]
for operation_type, payload in operations:
if operation_type != "assign":
continue
resolved_user_id = payload["user_id"] or user_ids_by_name[payload["full_name"].lower()]
if payload["asset_type"] == "chip":
db.execute(
"UPDATE users SET chip_code = ?, chip_assigned_at = CURRENT_TIMESTAMP WHERE id = ?",
(payload["asset_code"], resolved_user_id),
)
else:
db.execute(
"UPDATE users SET parking_card_code = ?, parking_card_assigned_at = CURRENT_TIMESTAMP WHERE id = ?",
(payload["asset_code"], resolved_user_id),
)
db.execute(
"INSERT INTO transactions (user_id, asset_type, asset_code, handled_by, action) VALUES (?, ?, ?, ?, 'assign')",
(resolved_user_id, payload["asset_type"], payload["asset_code"], "Import"),
)
log_asset_event("import", payload["full_name"], payload["asset_type"], payload["asset_code"], "Import")
imported_count += 1
db.commit()
flash(f"{imported_count} Importzeilen wurden verarbeitet.", "success")
return redirect(url_for("index"))
return render_template("import_data.html")
@app.route("/")
@login_required
def index() -> str:
db = get_db()
search_query = request.args.get("q", "").strip()
params: tuple[str, ...] = ()
user_query = """
SELECT id, full_name, email, department, chip_code, chip_assigned_at,
parking_card_code, parking_card_assigned_at
FROM users
"""
if search_query:
like_value = f"%{search_query}%"
user_query += """
WHERE full_name LIKE ?
OR email LIKE ?
OR chip_code LIKE ?
OR parking_card_code LIKE ?
"""
params = (like_value, like_value, like_value, like_value)
user_query += " ORDER BY full_name COLLATE NOCASE"
users = db.execute(user_query, params).fetchall()
stats = db.execute(
"""
SELECT
COUNT(*) AS users,
SUM(CASE WHEN chip_code IS NOT NULL AND chip_code != '' THEN 1 ELSE 0 END) AS active_chips,
SUM(CASE WHEN parking_card_code IS NOT NULL AND parking_card_code != '' THEN 1 ELSE 0 END) AS active_cards
FROM users
"""
).fetchone()
transactions = db.execute(
"""
SELECT t.id, t.created_at, t.asset_type, t.asset_code, t.action, t.handled_by, u.full_name
FROM transactions t
JOIN users u ON u.id = t.user_id
ORDER BY t.created_at DESC, t.id DESC
LIMIT 10
"""
).fetchall()
recent_logs = read_recent_logs()
return render_template(
"index.html",
users=users,
transactions=transactions,
stats={
"users": stats["users"],
"active_chips": stats["active_chips"],
"active_cards": stats["active_cards"],
},
search_query=search_query,
asset_labels=ASSET_LABELS,
action_labels=ACTION_LABELS,
recent_logs=recent_logs,
)
@app.route("/users/new", methods=["GET", "POST"])
@login_required
def create_user() -> str:
if request.method == "POST":
full_name = request.form.get("full_name", "").strip()
email = request.form.get("email", "").strip() or None
department = request.form.get("department", "").strip() or None
if not full_name:
flash("Bitte einen Namen eingeben.")
return redirect(url_for("create_user"))
db = get_db()
db.execute(
"INSERT INTO users (full_name, email, department) VALUES (?, ?, ?)",
(full_name, email, department),
)
db.commit()
flash("User wurde angelegt.")
return redirect(url_for("index"))
return render_template("create_user.html")
@app.route("/assign", methods=["GET", "POST"])
@login_required
def assign_asset() -> str:
db = get_db()
if request.method == "POST":
user_id = request.form.get("user_id", "").strip()
asset_type = request.form.get("asset_type", "").strip()
asset_code = request.form.get("asset_code", "").strip()
handled_by = g.current_staff["full_name"]
if not user_id or asset_type not in {"chip", "parking_card"} or not asset_code:
flash("Bitte alle Felder fuer die Ausgabe ausfuellen.")
return redirect(url_for("assign_asset"))
user = db.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if user is None:
flash("Ausgewaehlter User wurde nicht gefunden.")
return redirect(url_for("assign_asset"))
if asset_code_in_use(db, asset_type, asset_code):
flash("Diese Kennung ist bereits vergeben.")
return redirect(url_for("assign_asset"))
if asset_type == "chip":
if user["chip_code"]:
flash("Dieser User hat bereits einen aktiven Tuerchip.")
return redirect(url_for("assign_asset"))
db.execute(
"UPDATE users SET chip_code = ?, chip_assigned_at = CURRENT_TIMESTAMP WHERE id = ?",
(asset_code, user_id),
)
else:
if user["parking_card_code"]:
flash("Dieser User hat bereits eine aktive Parkkarte.")
return redirect(url_for("assign_asset"))
db.execute(
"UPDATE users SET parking_card_code = ?, parking_card_assigned_at = CURRENT_TIMESTAMP WHERE id = ?",
(asset_code, user_id),
)
db.execute(
"INSERT INTO transactions (user_id, asset_type, asset_code, handled_by, action) VALUES (?, ?, ?, ?, 'assign')",
(user_id, asset_type, asset_code, handled_by),
)
transaction_id = db.execute("SELECT last_insert_rowid() AS id").fetchone()["id"]
db.commit()
log_asset_event("ausgabe", user["full_name"], asset_type, asset_code, handled_by)
flash("Ausgabe wurde gespeichert.")
return redirect(url_for("print_transaction", transaction_id=transaction_id))
users = db.execute("SELECT id, full_name FROM users ORDER BY full_name COLLATE NOCASE").fetchall()
return render_template("assign_asset.html", users=users)
@app.route("/return", methods=["GET", "POST"])
@login_required
def return_asset() -> str:
db = get_db()
if request.method == "POST":
user_id = request.form.get("user_id", "").strip()
asset_type = request.form.get("asset_type", "").strip()
handled_by = g.current_staff["full_name"]
if not user_id or asset_type not in {"chip", "parking_card"}:
flash("Bitte User und Typ fuer die Rueckgabe waehlen.")
return redirect(url_for("return_asset"))
user = db.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if user is None:
flash("Ausgewaehlter User wurde nicht gefunden.")
return redirect(url_for("return_asset"))
asset_code = user["chip_code"] if asset_type == "chip" else user["parking_card_code"]
if not asset_code:
flash("Fuer diesen User ist kein entsprechendes Medium aktiv hinterlegt.")
return redirect(url_for("return_asset"))
if asset_type == "chip":
db.execute(
"UPDATE users SET chip_code = NULL, chip_assigned_at = NULL WHERE id = ?",
(user_id,),
)
else:
db.execute(
"UPDATE users SET parking_card_code = NULL, parking_card_assigned_at = NULL WHERE id = ?",
(user_id,),
)
db.execute(
"INSERT INTO transactions (user_id, asset_type, asset_code, handled_by, action) VALUES (?, ?, ?, ?, 'return')",
(user_id, asset_type, asset_code, handled_by),
)
transaction_id = db.execute("SELECT last_insert_rowid() AS id").fetchone()["id"]
db.commit()
log_asset_event("rueckgabe", user["full_name"], asset_type, asset_code, handled_by)
flash("Rueckgabe wurde gespeichert.")
return redirect(url_for("print_transaction", transaction_id=transaction_id))
users = db.execute("SELECT id, full_name FROM users ORDER BY full_name COLLATE NOCASE").fetchall()
return render_template("return_asset.html", users=users)
@app.route("/transactions/<int:transaction_id>/print")
@login_required
def print_transaction(transaction_id: int) -> str:
transaction = get_transaction_for_print(transaction_id)
if transaction is None:
flash("Druckbeleg wurde nicht gefunden.")
return redirect(url_for("index"))
return render_template(
"print_transaction.html",
transaction=transaction,
asset_labels=ASSET_LABELS,
action_labels=ACTION_LABELS,
print_descriptions=PRINT_DESCRIPTIONS,
)
if __name__ == "__main__":
app.run(debug=True)