Files
signage/app.py
Erik Thiele 0fa3c00319 Version 3.6 mit URL Zoom Funktion
Co-authored-by: Copilot <copilot@github.com>
2026-04-27 13:03:45 +02:00

525 lines
16 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import socket
import hashlib
from datetime import datetime
from flask import (
Flask, render_template,
send_from_directory, redirect,
request, abort
)
from flask_login import (
LoginManager, login_user,
login_required, logout_user,
UserMixin
)
from werkzeug.utils import secure_filename
# -------------------------------------------------
# Grundkonfiguration
# -------------------------------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MEDIA_DIR = os.path.join(BASE_DIR, "media")
CONFIG_FILE = os.path.join(BASE_DIR, "config.json")
APP_VERSION = "3.6.0"
UPLOAD_EXTENSIONS = {".jpg", ".jpeg", ".png", ".mp4"}
app = Flask(__name__)
app.secret_key = "CHANGE_THIS_SECRET!!!"
login_manager = LoginManager(app)
login_manager.login_view = "login"
# -------------------------------------------------
# Config Helpers
# -------------------------------------------------
def load_config():
if not os.path.exists(CONFIG_FILE):
return {"admin": {}, "screens": {}}
with open(CONFIG_FILE) as f:
return json.load(f)
def save_config(cfg):
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f, indent=2)
f.flush()
os.fsync(f.fileno())
print("✅ config.json gespeichert")
def is_url(value):
"""Check if value is a URL (string or dict with 'url' key)"""
if isinstance(value, str):
return value.lower().startswith(("http://", "https://"))
if isinstance(value, dict) and "url" in value:
url = value.get("url", "")
return isinstance(url, str) and url.lower().startswith(("http://", "https://"))
return False
def normalize_url(value):
"""Convert URL string to dict with zoom factor, or return existing dict"""
if isinstance(value, dict) and "url" in value:
return value
if isinstance(value, str) and value.lower().startswith(("http://", "https://")):
return {"url": value, "zoom": 1.0}
return None
# -------------------------------------------------
# Auth / Benutzer
# -------------------------------------------------
class Admin(UserMixin):
id = 1
@login_manager.user_loader
def load_user(user_id):
return Admin()
# -------------------------------------------------
# Login / Logout
# -------------------------------------------------
@app.route("/login", methods=["GET", "POST"])
def login():
config = load_config()
error = None
if request.method == "POST":
if (
request.form.get("username") == config["admin"].get("username")
and request.form.get("password") == config["admin"].get("password")
):
login_user(Admin())
return redirect("/admin")
error = "Ungültige Zugangsdaten"
return render_template("login.html", error=error)
@app.route("/logout")
def logout():
logout_user()
return redirect("/login")
# -------------------------------------------------
# Medien ausliefern
# -------------------------------------------------
@app.route("/media/<screen>/<path:filename>")
def media(screen, filename):
path = os.path.join(MEDIA_DIR, screen)
return send_from_directory(path, filename)
# -------------------------------------------------
# Player
# -------------------------------------------------
@app.route("/player/<screen>")
def player(screen):
config = load_config()
screens_cfg = config.setdefault("screens", {})
screen_cfg = screens_cfg.get(screen)
if not screen_cfg:
abort(404)
folder = os.path.join(MEDIA_DIR, screen)
playlist = screen_cfg.get("playlist", [])
show_images = screen_cfg.get("show_images", True)
show_videos = screen_cfg.get("show_videos", True)
def allowed_file(name):
if name.startswith("._"):
return False
ext = name.lower()
if ext.endswith((".jpg", ".jpeg", ".png")):
return show_images
if ext.endswith(".mp4"):
return show_videos
return False
# ------------------------------
# Normale Screen-Playlist
# ------------------------------
normal_files = []
# 1. Playlist-Reihenfolge
for item in playlist:
if is_url(item):
url_data = normalize_url(item)
if url_data:
normal_files.append({"kind": "url", "url": url_data["url"], "zoom": url_data.get("zoom", 1.0)})
continue
if os.path.exists(os.path.join(folder, item)) and allowed_file(item):
normal_files.append({"kind": "file", "name": item})
existing_file_names = {
item["name"] for item in normal_files if item["kind"] == "file"
}
# 2. Neue Dateien hintendran
for f in sorted(os.listdir(folder)):
if f in existing_file_names or f.startswith("._"):
continue
if allowed_file(f):
normal_files.append({"kind": "file", "name": f})
# ------------------------------
# Priority-Playlist (global)
# WICHTIG: IMMER definieren!
# ------------------------------
priority_cfg = config.get("priority", {})
if priority_cfg.get("enabled", False):
prio_files = []
for item in priority_cfg.get("playlist", []):
if is_url(item):
url_data = normalize_url(item)
if url_data:
prio_files.append({"kind": "url", "url": url_data["url"], "zoom": url_data.get("zoom", 1.0)})
continue
file_path = os.path.join(MEDIA_DIR, "priority", item)
if os.path.exists(file_path):
prio_files.append({"kind": "file", "name": item})
else:
prio_files = []
return render_template(
"player.html",
screen=screen,
normal_files=normal_files, # ✅ explizit
prio_files=prio_files, # ✅ IMMER Liste
interval=screen_cfg.get("interval", 10),
newsticker_text=screen_cfg.get("newsticker_text", ""),
newsticker_enabled=screen_cfg.get("newsticker_enabled", False)
)
# -------------------------------------------------
# Hash für automatisches Player-Reload
# -------------------------------------------------
@app.route("/playlist/<screen>/hash")
def playlist_hash(screen):
config = load_config()
# ✅ Globaler Hash über ALLE relevanten Daten
relevant = {
"screens": {
screen: config["screens"].get(screen, {})
},
"priority": config.get("priority", {})
}
blob = json.dumps(relevant, sort_keys=True).encode()
return hashlib.md5(blob).hexdigest()
# -------------------------------------------------
# Admin Dashboard
# -------------------------------------------------
@app.route("/admin")
@login_required
def admin():
cfg = load_config()
screens = {}
media_files = {}
screen_status = {}
for screen in os.listdir(MEDIA_DIR):
if screen == "priority":
continue # ✅ kein echter Screen
path = os.path.join(MEDIA_DIR, screen)
if not os.path.isdir(path):
continue
files = []
screen_cfg = cfg.setdefault("screens", {}).setdefault(screen, {
"interval": 10,
"show_images": True,
"show_videos": True,
"playlist": []
})
playlist = screen_cfg.get("playlist", [])
screen_cfg["newsticker_text"] = screen_cfg.get("newsticker_text", "")
screen_cfg["newsticker_enabled"] = screen_cfg.get("newsticker_enabled", False)
# 1⃣ Playlist-Reihenfolge
for item in playlist:
if is_url(item):
url_data = normalize_url(item)
if url_data:
files.append({
"name": url_data["url"],
"type": "url",
"size": "URL",
"zoom": url_data.get("zoom", 1.0)
})
continue
file_path = os.path.join(path, item)
if not os.path.exists(file_path):
continue
ext = os.path.splitext(item)[1].lower()
ftype = "video" if ext == ".mp4" else "image"
size = os.path.getsize(file_path) // 1024
files.append({
"name": item,
"type": ftype,
"size": size
})
# 2⃣ Neue Dateien anhängen (nicht in Playlist)
for f in sorted(os.listdir(path)):
if f in playlist or f.startswith("._"):
continue
file_path = os.path.join(path, f)
ext = os.path.splitext(f)[1].lower()
ftype = "video" if ext == ".mp4" else "image"
size = os.path.getsize(file_path) // 1024
files.append({
"name": f,
"type": ftype,
"size": size
})
screens[screen] = screen_cfg
media_files[screen] = files
screen_status[screen] = "active" if files else "empty"
# --- PRIORITY EXTENSION: Admin ---
prio_dir = os.path.join(MEDIA_DIR, "priority")
os.makedirs(prio_dir, exist_ok=True)
priority_files = []
prio_playlist = cfg["priority"].get("playlist", [])
# 1⃣ Playlist-Reihenfolge
for item in prio_playlist:
if is_url(item):
url_data = normalize_url(item)
if url_data:
priority_files.append({
"name": url_data["url"],
"type": "url",
"size": "URL",
"zoom": url_data.get("zoom", 1.0)
})
continue
file_path = os.path.join(prio_dir, item)
if not os.path.exists(file_path):
continue
ext = os.path.splitext(item)[1].lower()
ftype = "video" if ext == ".mp4" else "image"
size = os.path.getsize(file_path) // 1024
priority_files.append({
"name": item,
"type": ftype,
"size": size
})
# 2⃣ Neue Dateien anhängen (nicht in Playlist)
for f in sorted(os.listdir(prio_dir)):
if f in prio_playlist or f.startswith("._"):
continue
file_path = os.path.join(prio_dir, f)
ext = os.path.splitext(f)[1].lower()
ftype = "video" if ext == ".mp4" else "image"
size = os.path.getsize(file_path) // 1024
priority_files.append({
"name": f,
"type": ftype,
"size": size
})
return render_template(
"admin.html",
screens=screens,
media_files=media_files,
screen_status=screen_status,
version=APP_VERSION,
year=datetime.now().year,
hostname=os.uname().nodename,
priority_files=priority_files
)
# -------------------------------------------------
# Admin: Screen-Einstellungen speichern
# -------------------------------------------------
@app.route("/admin/update/<screen>", methods=["POST"])
@login_required
def update_screen(screen):
config = load_config()
cfg = config.setdefault("screens", {}).setdefault(screen, {
"interval": 10,
"show_images": True,
"show_videos": True,
"playlist": []
})
cfg["interval"] = int(request.form.get("interval", 10))
cfg["show_images"] = "show_images" in request.form
cfg["show_videos"] = "show_videos" in request.form
cfg["newsticker_text"] = request.form.get("newsticker_text", "")
cfg["newsticker_enabled"] = "newsticker_enabled" in request.form
save_config(config)
return redirect("/admin")
# -------------------------------------------------
# Admin: Upload
# -------------------------------------------------
@app.route("/admin/upload/<screen>", methods=["POST"])
@login_required
def upload(screen):
file = request.files.get("file")
if not file:
return redirect("/admin")
ext = os.path.splitext(file.filename)[1].lower()
if ext not in UPLOAD_EXTENSIONS:
return "Ungültiger Dateityp", 400
filename = secure_filename(file.filename)
file.save(os.path.join(MEDIA_DIR, screen, filename))
return redirect("/admin")
# -------------------------------------------------
# Admin: URL hinzufügen
# -------------------------------------------------
@app.route("/admin/add-url/<screen>", methods=["POST"])
@login_required
def add_url(screen):
url = request.form.get("url", "").strip()
zoom = request.form.get("zoom", "1.0").strip()
if not is_url(url):
return redirect("/admin")
try:
zoom_factor = float(zoom)
if zoom_factor <= 0:
zoom_factor = 1.0
except (ValueError, TypeError):
zoom_factor = 1.0
# Store as object with zoom factor
url_data = {"url": url, "zoom": zoom_factor}
config = load_config()
if screen == "priority":
config.setdefault("priority", {}).setdefault("playlist", []).append(url_data)
else:
screen_cfg = config.setdefault("screens", {}).setdefault(screen, {
"interval": 10,
"show_images": True,
"show_videos": True,
"playlist": []
})
screen_cfg.setdefault("playlist", []).append(url_data)
save_config(config)
return redirect("/admin")
# -------------------------------------------------
# Admin: Datei löschen
# -------------------------------------------------
@app.route("/admin/delete/<screen>/<path:filename>", methods=["POST"])
@login_required
def delete_file(screen, filename):
cfg = load_config()
# Datei löschen
path = os.path.join(MEDIA_DIR, screen, filename)
if os.path.exists(path):
os.remove(path)
# --- Playlist bereinigen ---
if screen == "priority":
playlist = cfg.get("priority", {}).get("playlist", [])
# Remove both string URLs and dict-based URLs
playlist[:] = [item for item in playlist if not (
(isinstance(item, str) and item == filename) or
(isinstance(item, dict) and item.get("url") == filename)
)]
else:
playlist = cfg["screens"].get(screen, {}).get("playlist", [])
# Remove both string URLs and dict-based URLs
playlist[:] = [item for item in playlist if not (
(isinstance(item, str) and item == filename) or
(isinstance(item, dict) and item.get("url") == filename)
)]
save_config(cfg)
return redirect("/admin")
# -------------------------------------------------
# Admin: Playlist speichern
# -------------------------------------------------
@app.route("/admin/playlist/<screen>", methods=["POST"])
@login_required
def save_playlist(screen):
cfg = load_config()
data = request.get_json()
new_order = data["playlist"] # List of filenames/URLs as strings
# Get the current playlist with all data (including zoom factors)
if screen == "priority":
old_playlist = cfg.get("priority", {}).get("playlist", [])
else:
old_playlist = cfg.get("screens", {}).get(screen, {}).get("playlist", [])
# Create a mapping: filename/url -> full item (preserves zoom factor)
item_map = {}
for item in old_playlist:
if isinstance(item, dict) and "url" in item:
key = item["url"] # URL as key
else:
key = item # Filename as key
item_map[key] = item
# Build new playlist: use the mapping to preserve zoom factors
new_playlist = []
for key in new_order:
if key in item_map:
new_playlist.append(item_map[key])
else:
# Item not found in mapping - add as is
new_playlist.append(key)
if screen == "priority":
cfg.setdefault("priority", {})["playlist"] = new_playlist
else:
cfg.setdefault("screens", {}).setdefault(screen, {})["playlist"] = new_playlist
save_config(cfg)
return "", 204
# -------------------------------------------------
# Main
# -------------------------------------------------
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5005)