from __future__ import annotations import os from pathlib import Path from flask import Flask, abort, jsonify, redirect, render_template, request, send_from_directory, url_for from werkzeug.utils import secure_filename BASE_DIR = Path(__file__).resolve().parent UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", str(BASE_DIR / "uploads"))) UPLOAD_DIR.mkdir(parents=True, exist_ok=True) ALLOWED_EXTENSIONS = { "mp4", "webm", "ogg", "mp3", "wav", "m4a", "jpg", "jpeg", "png", "gif", } app = Flask(__name__) app.config["UPLOAD_FOLDER"] = str(UPLOAD_DIR) # ── Player State ────────────────────────────────────────────────────── player_state = { "version": 0, "current": None, "playing": False, "seek": 0, "volume": 1.0, } def _bump(): player_state["version"] += 1 def _set_current(name: str, kind: str): player_state["current"] = {"name": name, "kind": kind} def _clear_seek(): player_state["seek"] = 0 def _reset_seek(): player_state["seek"] = 0 # ── Helpers ─────────────────────────────────────────────────────────── def allowed_file(filename: str) -> bool: return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def _kind_from_suffix(suffix: str) -> str: suffix = suffix.lower() if suffix in {"mp4", "webm", "ogg"}: return "video" if suffix in {"mp3", "wav", "m4a"}: return "audio" return "image" def media_items() -> list[dict[str, str]]: items: list[dict[str, str]] = [] for entry in sorted(UPLOAD_DIR.iterdir(), key=lambda p: p.name.lower()): if entry.is_file() and allowed_file(entry.name): suffix = entry.suffix.lower().lstrip(".") items.append({"name": entry.name, "kind": _kind_from_suffix(suffix)}) return items def _next_item(name: str) -> dict | None: items = media_items() for i, item in enumerate(items): if item["name"] == name: return items[(i + 1) % len(items)] return items[0] if items else None def _prev_item(name: str) -> dict | None: items = media_items() for i, item in enumerate(items): if item["name"] == name: return items[(i - 1) % len(items)] return items[-1] if items else None # ── Page Routes ─────────────────────────────────────────────────────── @app.route("/") def index(): return redirect(url_for("controller")) @app.route("/display") def display(): return render_template("display.html") @app.route("/controller") def controller(): return render_template("controller.html", items=media_items()) # ── Player API ──────────────────────────────────────────────────────── @app.route("/api/state") def api_state(): return jsonify(player_state) @app.route("/api/play", methods=["POST"]) def api_play(): data = request.get_json(silent=True) or {} name = data.get("name") if name: items = media_items() match = next((i for i in items if i["name"] == name), None) if not match: return jsonify({"error": "not found"}), 404 _set_current(match["name"], match["kind"]) _clear_seek() player_state["playing"] = True _bump() return jsonify(player_state) @app.route("/api/stop", methods=["POST"]) def api_stop(): _clear_seek() player_state["playing"] = False _bump() return jsonify(player_state) @app.route("/api/next", methods=["POST"]) def api_next(): current = player_state["current"] if current: nxt = _next_item(current["name"]) if nxt: _set_current(nxt["name"], nxt["kind"]) else: items = media_items() if items: _set_current(items[0]["name"], items[0]["kind"]) _clear_seek() player_state["playing"] = True _bump() return jsonify(player_state) @app.route("/api/prev", methods=["POST"]) def api_prev(): current = player_state["current"] if current: prv = _prev_item(current["name"]) if prv: _set_current(prv["name"], prv["kind"]) else: items = media_items() if items: _set_current(items[-1]["name"], items[-1]["kind"]) _clear_seek() player_state["playing"] = True _bump() return jsonify(player_state) @app.route("/api/seek", methods=["POST"]) def api_seek(): data = request.get_json(silent=True) or {} seconds = data.get("seconds", 0) player_state["seek"] = seconds player_state["playing"] = True _bump() return jsonify(player_state) @app.route("/api/seek-ack", methods=["POST"]) def api_seek_ack(): player_state["seek"] = 0 return jsonify({"ok": True}) @app.route("/api/volume", methods=["POST"]) def api_volume(): data = request.get_json(silent=True) or {} vol = float(data.get("volume", 1.0)) vol = max(0.0, min(1.0, vol)) player_state["volume"] = vol _bump() return jsonify(player_state) # ── Upload / Media / Delete ─────────────────────────────────────────── @app.route("/api/media", methods=["GET"]) def api_media(): return jsonify(media_items()) @app.route("/upload", methods=["POST"]) def upload(): files = request.files.getlist("files") if not files: return redirect(url_for("controller")) for file in files: if not file.filename: continue if not allowed_file(file.filename): continue filename = secure_filename(file.filename) file.save(UPLOAD_DIR / filename) return redirect(url_for("controller")) @app.route("/media/") def media(filename: str): if not allowed_file(filename): abort(404) return send_from_directory(app.config["UPLOAD_FOLDER"], filename) @app.route("/api/state/media", methods=["GET"]) def api_state_media(): if player_state["current"]: name = player_state["current"]["name"] if not allowed_file(name): abort(404) return send_from_directory(app.config["UPLOAD_FOLDER"], name) abort(404) @app.route("/delete/", methods=["POST"]) def delete(filename: str): if not allowed_file(filename): abort(404) path = UPLOAD_DIR / filename if path.exists(): path.unlink() if player_state["current"] and player_state["current"]["name"] == filename: player_state["current"] = None player_state["playing"] = False _clear_seek() _bump() return redirect(url_for("controller")) if __name__ == "__main__": app.run( host="0.0.0.0", port=int(os.environ.get("PORT", "5008")), debug=True, )