Files
videoplayer/app.py
2026-05-27 23:08:12 +02:00

275 lines
7.5 KiB
Python

from __future__ import annotations
import os
import time
from pathlib import Path
from flask import Flask, abort, jsonify, redirect, render_template, request, send_from_directory, url_for
from werkzeug.utils import secure_filename
BASE_DIR = Path(__file__).resolve().parent
UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", str(BASE_DIR / "uploads")))
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
ALLOWED_EXTENSIONS = {
"mp4", "webm", "ogg", "mp3", "wav", "m4a", "jpg", "jpeg", "png", "gif",
}
app = Flask(__name__)
app.config["UPLOAD_FOLDER"] = str(UPLOAD_DIR)
APP_VERSION = "1.0.0"
@app.context_processor
def inject_globals():
return {
"app_year": time.localtime().tm_year,
"app_version": APP_VERSION,
"app_host": request.host.split(":")[0],
}
# ── Player State ──────────────────────────────────────────────────────
player_state = {
"version": 0,
"current": None,
"playing": False,
"seek": 0,
"volume": 1.0,
}
_display_last_seen = 0.0
DISPLAY_TIMEOUT = 8
def _bump():
player_state["version"] += 1
def _set_current(name: str, kind: str):
player_state["current"] = {"name": name, "kind": kind}
def _clear_seek():
player_state["seek"] = 0
def _reset_seek():
player_state["seek"] = 0
# ── Helpers ───────────────────────────────────────────────────────────
def allowed_file(filename: str) -> bool:
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def _kind_from_suffix(suffix: str) -> str:
suffix = suffix.lower()
if suffix in {"mp4", "webm", "ogg"}:
return "video"
if suffix in {"mp3", "wav", "m4a"}:
return "audio"
return "image"
def media_items() -> list[dict[str, str]]:
items: list[dict[str, str]] = []
for entry in sorted(UPLOAD_DIR.iterdir(), key=lambda p: p.name.lower()):
if entry.is_file() and allowed_file(entry.name):
suffix = entry.suffix.lower().lstrip(".")
items.append({"name": entry.name, "kind": _kind_from_suffix(suffix)})
return items
def _next_item(name: str) -> dict | None:
items = media_items()
for i, item in enumerate(items):
if item["name"] == name:
return items[(i + 1) % len(items)]
return items[0] if items else None
def _prev_item(name: str) -> dict | None:
items = media_items()
for i, item in enumerate(items):
if item["name"] == name:
return items[(i - 1) % len(items)]
return items[-1] if items else None
# ── Page Routes ───────────────────────────────────────────────────────
@app.route("/")
def index():
return redirect(url_for("controller"))
@app.route("/display")
def display():
return render_template("display.html")
@app.route("/controller")
def controller():
return render_template("controller.html", items=media_items())
# ── Player API ────────────────────────────────────────────────────────
@app.route("/api/state")
def api_state():
resp = dict(player_state)
resp["display_online"] = (time.time() - _display_last_seen) < DISPLAY_TIMEOUT
return jsonify(resp)
@app.route("/api/play", methods=["POST"])
def api_play():
data = request.get_json(silent=True) or {}
name = data.get("name")
if name:
items = media_items()
match = next((i for i in items if i["name"] == name), None)
if not match:
return jsonify({"error": "not found"}), 404
_set_current(match["name"], match["kind"])
_clear_seek()
player_state["playing"] = True
_bump()
return jsonify(player_state)
@app.route("/api/stop", methods=["POST"])
def api_stop():
_clear_seek()
player_state["playing"] = False
_bump()
return jsonify(player_state)
@app.route("/api/next", methods=["POST"])
def api_next():
current = player_state["current"]
if current:
nxt = _next_item(current["name"])
if nxt:
_set_current(nxt["name"], nxt["kind"])
else:
items = media_items()
if items:
_set_current(items[0]["name"], items[0]["kind"])
_clear_seek()
player_state["playing"] = True
_bump()
return jsonify(player_state)
@app.route("/api/prev", methods=["POST"])
def api_prev():
current = player_state["current"]
if current:
prv = _prev_item(current["name"])
if prv:
_set_current(prv["name"], prv["kind"])
else:
items = media_items()
if items:
_set_current(items[-1]["name"], items[-1]["kind"])
_clear_seek()
player_state["playing"] = True
_bump()
return jsonify(player_state)
@app.route("/api/seek", methods=["POST"])
def api_seek():
data = request.get_json(silent=True) or {}
seconds = data.get("seconds", 0)
player_state["seek"] = seconds
player_state["playing"] = True
_bump()
return jsonify(player_state)
@app.route("/api/seek-ack", methods=["POST"])
def api_seek_ack():
player_state["seek"] = 0
return jsonify({"ok": True})
@app.route("/api/display/ping", methods=["POST"])
def api_display_ping():
global _display_last_seen
_display_last_seen = time.time()
return jsonify({"ok": True})
@app.route("/api/volume", methods=["POST"])
def api_volume():
data = request.get_json(silent=True) or {}
vol = float(data.get("volume", 1.0))
vol = max(0.0, min(1.0, vol))
player_state["volume"] = vol
_bump()
return jsonify(player_state)
# ── Upload / Media / Delete ───────────────────────────────────────────
@app.route("/api/media", methods=["GET"])
def api_media():
return jsonify(media_items())
@app.route("/upload", methods=["POST"])
def upload():
files = request.files.getlist("files")
if not files:
return redirect(url_for("controller"))
for file in files:
if not file.filename:
continue
if not allowed_file(file.filename):
continue
filename = secure_filename(file.filename)
file.save(UPLOAD_DIR / filename)
return redirect(url_for("controller"))
@app.route("/media/<path:filename>")
def media(filename: str):
if not allowed_file(filename):
abort(404)
return send_from_directory(app.config["UPLOAD_FOLDER"], filename)
@app.route("/api/state/media", methods=["GET"])
def api_state_media():
if player_state["current"]:
name = player_state["current"]["name"]
if not allowed_file(name):
abort(404)
return send_from_directory(app.config["UPLOAD_FOLDER"], name)
abort(404)
@app.route("/delete/<path:filename>", methods=["POST"])
def delete(filename: str):
if not allowed_file(filename):
abort(404)
path = UPLOAD_DIR / filename
if path.exists():
path.unlink()
if player_state["current"] and player_state["current"]["name"] == filename:
player_state["current"] = None
player_state["playing"] = False
_clear_seek()
_bump()
return redirect(url_for("controller"))
if __name__ == "__main__":
app.run(
host="0.0.0.0",
port=int(os.environ.get("PORT", "5008")),
debug=True,
)