Files
signage/app.py
2026-05-05 13:09:56 +02:00

617 lines
19 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import hashlib
# Import welcome page functions
import generate_welcome_page
from datetime import datetime
from flask import (
Flask, render_template,
send_from_directory, redirect,
request, abort
)
from flask_login import (
LoginManager, login_user,
login_required, logout_user,
UserMixin
)
from werkzeug.utils import secure_filename
# -------------------------------------------------
# Grundkonfiguration
# -------------------------------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MEDIA_DIR = os.path.join(BASE_DIR, "media")
CONFIG_FILE = os.path.join(BASE_DIR, "config.json")
APP_VERSION = "4.0.2"
UPLOAD_EXTENSIONS = {".jpg", ".jpeg", ".png", ".mp4"}
app = Flask(__name__)
app.secret_key = "CHANGE_THIS_SECRET!!!"
login_manager = LoginManager(app)
login_manager.login_view = "login"
# -------------------------------------------------
# Config Helpers
# -------------------------------------------------
def load_config():
if not os.path.exists(CONFIG_FILE):
return {"admin": {}, "screens": {}}
with open(CONFIG_FILE) as f:
return json.load(f)
def save_config(cfg):
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f, indent=2)
f.flush()
os.fsync(f.fileno())
print("✅ config.json gespeichert")
def is_url(value):
"""Check if value is a URL (string or dict with 'url' key)"""
if isinstance(value, str):
return value.lower().startswith(("http://", "https://"))
if isinstance(value, dict) and "url" in value:
url = value.get("url", "")
return isinstance(url, str) and url.lower().startswith(("http://", "https://"))
return False
def normalize_url(value):
"""Convert URL string to dict with zoom factor, or return existing dict"""
if isinstance(value, dict) and "url" in value:
return value
if isinstance(value, str) and value.lower().startswith(("http://", "https://")):
return {"url": value, "zoom": 1.0}
return None
# -------------------------------------------------
# Auth / Benutzer
# -------------------------------------------------
class Admin(UserMixin):
id = 1
@login_manager.user_loader
def load_user(user_id):
return Admin()
# -------------------------------------------------
# Login / Logout
# -------------------------------------------------
@app.route("/login", methods=["GET", "POST"])
def login():
config = load_config()
error = None
if request.method == "POST":
if (
request.form.get("username") == config["admin"].get("username")
and request.form.get("password") == config["admin"].get("password")
):
login_user(Admin())
return redirect("/admin")
error = "Ungültige Zugangsdaten"
return render_template("login.html", error=error)
@app.route("/logout")
def logout():
logout_user()
return redirect("/login")
# -------------------------------------------------
# Customer / Willkommensseite
# -------------------------------------------------
@app.route("/customer", methods=["GET", "POST"])
@login_required
def add_customer():
"""Add new customer with logo search and welcome page"""
error = None
success = None
logo_url = None
if request.method == "POST":
customer_name = request.form.get("customer_name", "").strip()
if not customer_name:
error = "Kundenname erforderlich"
else:
try:
# Search for logo
logo_url = generate_welcome_page.search_customer_logo(customer_name)
if not logo_url:
error = "Logo konnte nicht gefunden werden"
else:
# Generate and save welcome page
html_filename = generate_welcome_page.save_welcome_page(customer_name, logo_url)
if html_filename:
# Add to lobby playlist
if generate_welcome_page.add_customer_to_lobby_playlist(html_filename):
success = f"✅ Kunde '{customer_name}' erfolgreich hinzugefügt!"
else:
error = "Fehler beim Hinzufügen zur Playliste"
else:
error = "Fehler beim Speichern der Willkommensseite"
except ValueError as e:
error = f"Konfigurationsfehler: {str(e)}"
except Exception as e:
error = f"Fehler: {str(e)}"
return render_template("customer.html", error=error, success=success, logo_url=logo_url)
@app.route("/api/customer", methods=["POST"])
@login_required
def api_add_customer():
"""API endpoint for customer creation"""
try:
data = request.get_json()
customer_name = data.get("customer_name", "").strip()
if not customer_name:
return jsonify({"error": "Customer name required"}), 400
# Search for logo
logo_url = generate_welcome_page.search_customer_logo(customer_name)
if not logo_url:
return jsonify({"error": "Logo not found"}), 400
# Generate and save welcome page
html_filename = generate_welcome_page.save_welcome_page(customer_name, logo_url)
if not html_filename:
return jsonify({"error": "Failed to create welcome page"}), 500
# Add to lobby playlist
if generate_welcome_page.add_customer_to_lobby_playlist(html_filename):
return jsonify({
"success": True,
"message": f"Customer '{customer_name}' added successfully",
"welcome_page": html_filename,
"logo_url": logo_url
}), 201
else:
return jsonify({"error": "Failed to add to playlist"}), 500
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
# -------------------------------------------------
# Medien ausliefern
# -------------------------------------------------
@app.route("/media/<screen>/<path:filename>")
def media(screen, filename):
path = os.path.join(MEDIA_DIR, screen)
return send_from_directory(path, filename)
# -------------------------------------------------
# Player
# -------------------------------------------------
@app.route("/player/<screen>")
def player(screen):
config = load_config()
screens_cfg = config.setdefault("screens", {})
screen_cfg = screens_cfg.get(screen)
if not screen_cfg:
abort(404)
folder = os.path.join(MEDIA_DIR, screen)
playlist = screen_cfg.get("playlist", [])
show_images = screen_cfg.get("show_images", True)
show_videos = screen_cfg.get("show_videos", True)
def allowed_file(name):
if name.startswith("._"):
return False
ext = name.lower()
if ext.endswith((".jpg", ".jpeg", ".png")):
return show_images
if ext.endswith(".mp4"):
return show_videos
if ext.endswith((".html", ".htm")):
return True
return False
# ------------------------------
# Normale Screen-Playlist
# ------------------------------
normal_files = []
# 1. Playlist-Reihenfolge
for item in playlist:
if is_url(item):
url_data = normalize_url(item)
if url_data:
normal_files.append({"kind": "url", "url": url_data["url"], "zoom": url_data.get("zoom", 1.0)})
continue
if os.path.exists(os.path.join(folder, item)) and allowed_file(item):
normal_files.append({"kind": "file", "name": item})
existing_file_names = {
item["name"] for item in normal_files if item["kind"] == "file"
}
# 2. Neue Dateien hintendran
for f in sorted(os.listdir(folder)):
if f in existing_file_names or f.startswith("._"):
continue
if allowed_file(f):
normal_files.append({"kind": "file", "name": f})
# ------------------------------
# Priority-Playlist (global)
# WICHTIG: IMMER definieren!
# ------------------------------
priority_cfg = config.get("priority", {})
if priority_cfg.get("enabled", False):
prio_files = []
for item in priority_cfg.get("playlist", []):
if is_url(item):
url_data = normalize_url(item)
if url_data:
prio_files.append({"kind": "url", "url": url_data["url"], "zoom": url_data.get("zoom", 1.0)})
continue
file_path = os.path.join(MEDIA_DIR, "priority", item)
if os.path.exists(file_path):
prio_files.append({"kind": "file", "name": item})
else:
prio_files = []
return render_template(
"player.html",
screen=screen,
normal_files=normal_files, # ✅ explizit
prio_files=prio_files, # ✅ IMMER Liste
interval=screen_cfg.get("interval", 10),
newsticker_text=screen_cfg.get("newsticker_text", ""),
newsticker_enabled=screen_cfg.get("newsticker_enabled", False)
)
# -------------------------------------------------
# Hash für automatisches Player-Reload
# -------------------------------------------------
@app.route("/playlist/<screen>/hash")
def playlist_hash(screen):
config = load_config()
# ✅ Globaler Hash über ALLE relevanten Daten
relevant = {
"screens": {
screen: config["screens"].get(screen, {})
},
"priority": config.get("priority", {})
}
blob = json.dumps(relevant, sort_keys=True).encode()
return hashlib.md5(blob).hexdigest()
# -------------------------------------------------
# Admin Dashboard
# -------------------------------------------------
@app.route("/admin")
@login_required
def admin():
cfg = load_config()
screens = {}
media_files = {}
screen_status = {}
for screen in os.listdir(MEDIA_DIR):
if screen == "priority":
continue # ✅ kein echter Screen
path = os.path.join(MEDIA_DIR, screen)
if not os.path.isdir(path):
continue
files = []
screen_cfg = cfg.setdefault("screens", {}).setdefault(screen, {
"interval": 10,
"show_images": True,
"show_videos": True,
"playlist": []
})
playlist = screen_cfg.get("playlist", [])
screen_cfg["newsticker_text"] = screen_cfg.get("newsticker_text", "")
screen_cfg["newsticker_enabled"] = screen_cfg.get("newsticker_enabled", False)
# 1⃣ Playlist-Reihenfolge
for item in playlist:
if is_url(item):
url_data = normalize_url(item)
if url_data:
files.append({
"name": url_data["url"],
"type": "url",
"size": "URL",
"zoom": url_data.get("zoom", 1.0)
})
continue
file_path = os.path.join(path, item)
if not os.path.exists(file_path):
continue
ext = os.path.splitext(item)[1].lower()
if ext == ".mp4":
ftype = "video"
if ext in (".jpg", ".jpeg", ".png"):
ftype = "image"
if ext == ".html":
ftype = "html"
size = os.path.getsize(file_path) // 1024
files.append({
"name": item,
"type": ftype,
"size": size
})
# 2⃣ Neue Dateien anhängen (nicht in Playlist)
for f in sorted(os.listdir(path)):
if f in playlist or f.startswith("._"):
continue
file_path = os.path.join(path, f)
ext = os.path.splitext(f)[1].lower()
ftype = "video" if ext == ".mp4" else "image"
size = os.path.getsize(file_path) // 1024
files.append({
"name": f,
"type": ftype,
"size": size
})
screens[screen] = screen_cfg
media_files[screen] = files
screen_status[screen] = "active" if files else "empty"
# --- PRIORITY EXTENSION: Admin ---
prio_dir = os.path.join(MEDIA_DIR, "priority")
os.makedirs(prio_dir, exist_ok=True)
priority_files = []
prio_playlist = cfg["priority"].get("playlist", [])
# 1⃣ Playlist-Reihenfolge
for item in prio_playlist:
if is_url(item):
url_data = normalize_url(item)
if url_data:
priority_files.append({
"name": url_data["url"],
"type": "url",
"size": "URL",
"zoom": url_data.get("zoom", 1.0)
})
continue
file_path = os.path.join(prio_dir, item)
if not os.path.exists(file_path):
continue
ext = os.path.splitext(item)[1].lower()
ftype = "video" if ext == ".mp4" else "image"
size = os.path.getsize(file_path) // 1024
priority_files.append({
"name": item,
"type": ftype,
"size": size
})
# 2⃣ Neue Dateien anhängen (nicht in Playlist)
for f in sorted(os.listdir(prio_dir)):
if f in prio_playlist or f.startswith("._"):
continue
file_path = os.path.join(prio_dir, f)
ext = os.path.splitext(f)[1].lower()
ftype = "video" if ext == ".mp4" else "image"
size = os.path.getsize(file_path) // 1024
priority_files.append({
"name": f,
"type": ftype,
"size": size
})
return render_template(
"admin.html",
screens=screens,
media_files=media_files,
screen_status=screen_status,
version=APP_VERSION,
year=datetime.now().year,
hostname=os.uname().nodename,
priority_files=priority_files
)
# -------------------------------------------------
# Admin: Screen-Einstellungen speichern
# -------------------------------------------------
@app.route("/admin/update/<screen>", methods=["POST"])
@login_required
def update_screen(screen):
config = load_config()
cfg = config.setdefault("screens", {}).setdefault(screen, {
"interval": 10,
"show_images": True,
"show_videos": True,
"playlist": []
})
cfg["interval"] = int(request.form.get("interval", 10))
cfg["show_images"] = "show_images" in request.form
cfg["show_videos"] = "show_videos" in request.form
cfg["newsticker_text"] = request.form.get("newsticker_text", "")
cfg["newsticker_enabled"] = "newsticker_enabled" in request.form
save_config(config)
return redirect("/admin")
# -------------------------------------------------
# Admin: Upload
# -------------------------------------------------
@app.route("/admin/upload/<screen>", methods=["POST"])
@login_required
def upload(screen):
file = request.files.get("file")
if not file:
return redirect("/admin")
ext = os.path.splitext(file.filename)[1].lower()
if ext not in UPLOAD_EXTENSIONS:
return "Ungültiger Dateityp", 400
filename = secure_filename(file.filename)
file.save(os.path.join(MEDIA_DIR, screen, filename))
return redirect("/admin")
# -------------------------------------------------
# Admin: URL hinzufügen
# -------------------------------------------------
@app.route("/admin/add-url/<screen>", methods=["POST"])
@login_required
def add_url(screen):
url = request.form.get("url", "").strip()
zoom = request.form.get("zoom", "1.0").strip()
if not is_url(url):
return redirect("/admin")
try:
zoom_factor = float(zoom)
if zoom_factor <= 0:
zoom_factor = 1.0
except (ValueError, TypeError):
zoom_factor = 1.0
# Store as object with zoom factor
url_data = {"url": url, "zoom": zoom_factor}
config = load_config()
if screen == "priority":
config.setdefault("priority", {}).setdefault("playlist", []).append(url_data)
else:
screen_cfg = config.setdefault("screens", {}).setdefault(screen, {
"interval": 10,
"show_images": True,
"show_videos": True,
"playlist": []
})
screen_cfg.setdefault("playlist", []).append(url_data)
save_config(config)
return redirect("/admin")
# -------------------------------------------------
# Admin: Datei löschen
# -------------------------------------------------
@app.route("/admin/delete/<screen>/<path:filename>", methods=["POST"])
@login_required
def delete_file(screen, filename):
cfg = load_config()
# Datei löschen
path = os.path.join(MEDIA_DIR, screen, filename)
if os.path.exists(path):
os.remove(path)
# --- Playlist bereinigen ---
if screen == "priority":
playlist = cfg.get("priority", {}).get("playlist", [])
# Remove both string URLs and dict-based URLs
playlist[:] = [item for item in playlist if not (
(isinstance(item, str) and item == filename) or
(isinstance(item, dict) and item.get("url") == filename)
)]
else:
playlist = cfg["screens"].get(screen, {}).get("playlist", [])
# Remove both string URLs and dict-based URLs
playlist[:] = [item for item in playlist if not (
(isinstance(item, str) and item == filename) or
(isinstance(item, dict) and item.get("url") == filename)
)]
save_config(cfg)
return redirect("/admin")
# -------------------------------------------------
# Admin: Playlist speichern
# -------------------------------------------------
@app.route("/admin/playlist/<screen>", methods=["POST"])
@login_required
def save_playlist(screen):
cfg = load_config()
data = request.get_json()
new_order = data["playlist"] # List of filenames/URLs as strings
# Get the current playlist with all data (including zoom factors)
if screen == "priority":
old_playlist = cfg.get("priority", {}).get("playlist", [])
else:
old_playlist = cfg.get("screens", {}).get(screen, {}).get("playlist", [])
# Create a mapping: filename/url -> full item (preserves zoom factor)
item_map = {}
for item in old_playlist:
if isinstance(item, dict) and "url" in item:
key = item["url"] # URL as key
else:
key = item # Filename as key
item_map[key] = item
# Build new playlist: use the mapping to preserve zoom factors
new_playlist = []
for key in new_order:
if key in item_map:
new_playlist.append(item_map[key])
else:
# Item not found in mapping - add as is
new_playlist.append(key)
if screen == "priority":
cfg.setdefault("priority", {})["playlist"] = new_playlist
else:
cfg.setdefault("screens", {}).setdefault(screen, {})["playlist"] = new_playlist
save_config(cfg)
return "", 204
# -------------------------------------------------
# Main
# -------------------------------------------------
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5005)