diff --git a/.gitignore b/.gitignore index 3637755ae..8ba6bdf5c 100644 --- a/.gitignore +++ b/.gitignore @@ -171,4 +171,4 @@ mock_display_output/ # Static Files added by install/update_vendors.sh /src/static/scripts/ -/src/static/styles/ +/src/static/styles/select2.min.css diff --git a/src/blueprints/system_info.py b/src/blueprints/system_info.py new file mode 100644 index 000000000..9f16c5b02 --- /dev/null +++ b/src/blueprints/system_info.py @@ -0,0 +1,949 @@ +import fnmatch +import json +import logging +import os +import platform +import re +import socket +from urllib.parse import urlparse +from datetime import datetime, timezone +from pathlib import Path + +from flask import Blueprint, current_app, jsonify, render_template + +try: + import psutil +except ImportError: + psutil = None + +logger = logging.getLogger(__name__) + +system_info_bp = Blueprint("system_info", __name__) + +_PLUGINS_DIR = Path(__file__).resolve().parent.parent / "plugins" + + +def _get_cpu_freq(): + """Return CPU frequency as a formatted string (e.g. '2.5 GHz'). + + Priority: psutil current -> psutil max -> /proc/cpuinfo cpu MHz -> + sysfs scaling_cur_freq -> sysfs cpuinfo_max_freq -> None. + """ + # 1. psutil (preferred - works on ARM and x86) + if psutil is not None: + try: + freq = psutil.cpu_freq() + if freq is not None: + mhz = freq.current if freq.current and freq.current > 0 else freq.max + if mhz and mhz > 0: + return f"{round(mhz / 1000, 1)} GHz" + except Exception: + pass + + # 2. /proc/cpuinfo "cpu MHz" line + try: + with open("/proc/cpuinfo") as f: + for line in f: + if line.lower().startswith("cpu mhz"): + mhz = float(line.split(":")[1].strip()) + if mhz > 0: + return f"{round(mhz / 1000, 1)} GHz" + except (FileNotFoundError, PermissionError, ValueError): + pass + + # 3. sysfs frequency files (kHz) + freq_paths = [ + "/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq", + "/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq", + ] + for path in freq_paths: + try: + with open(path) as f: + freq_khz = int(f.read().strip()) + if freq_khz > 0: + return f"{round(freq_khz / 1_000_000, 1)} GHz" + except (FileNotFoundError, PermissionError, ValueError): + continue + + return None + + +def _read_sysfs_freq(path): + """Read a sysfs frequency file (kHz) and return formatted GHz string or None.""" + try: + with open(path) as f: + freq_khz = int(f.read().strip()) + if freq_khz > 0: + return f"{round(freq_khz / 1_000_000, 1)} GHz" + except (FileNotFoundError, PermissionError, ValueError): + pass + return None + + +def _get_cpu_cur_freq(): + """Return current CPU frequency as a formatted string or None. + + Priority: sysfs scaling_cur_freq -> psutil current -> /proc/cpuinfo cpu MHz. + """ + # 1. sysfs (Linux / Raspberry Pi) + val = _read_sysfs_freq("/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq") + if val: + return val + + # 2. psutil current frequency + if psutil is not None: + try: + freq = psutil.cpu_freq() + if freq is not None and freq.current and freq.current > 0: + return f"{round(freq.current / 1000, 1)} GHz" + except Exception: + pass + + # 3. /proc/cpuinfo "cpu MHz" + try: + with open("/proc/cpuinfo") as f: + for line in f: + if line.lower().startswith("cpu mhz"): + mhz = float(line.split(":")[1].strip()) + if mhz > 0: + return f"{round(mhz / 1000, 1)} GHz" + except (FileNotFoundError, PermissionError, ValueError): + pass + + return None + + +def _get_cpu_max_freq(): + """Return max CPU frequency as a formatted string or None. + + Priority: sysfs scaling_max_freq -> psutil max -> lscpu CPU max MHz. + """ + # 1. sysfs (Linux / Raspberry Pi) + val = _read_sysfs_freq("/sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq") + if val: + return val + + # 2. psutil max frequency + if psutil is not None: + try: + freq = psutil.cpu_freq() + if freq is not None and freq.max and freq.max > 0: + return f"{round(freq.max / 1000, 1)} GHz" + except Exception: + pass + + # 3. lscpu (available on most Linux distros and WSL) + import subprocess + try: + result = subprocess.run( + ["lscpu"], capture_output=True, text=True, timeout=5, + ) + if result.returncode == 0: + max_mhz = None + cur_mhz = None + for line in result.stdout.splitlines(): + lower = line.lower() + if "cpu max mhz" in lower: + try: + max_mhz = float(line.split(":")[1].strip().replace(",", ".")) + except ValueError: + pass + elif "cpu mhz" in lower and "max" not in lower and "min" not in lower: + try: + cur_mhz = float(line.split(":")[1].strip().replace(",", ".")) + except ValueError: + pass + mhz = max_mhz or cur_mhz + if mhz and mhz > 0: + return f"{round(mhz / 1000, 1)} GHz" + except (FileNotFoundError, subprocess.TimeoutExpired, ValueError, OSError): + pass + + return None + + +# -- ARM CPU part ID to model name mapping -- +_ARM_CPU_PART_MAP = { + "0xb76": "ARM1176JZF-S", + "0xc07": "ARM Cortex-A7", + "0xc08": "ARM Cortex-A8", + "0xc09": "ARM Cortex-A9", + "0xc0f": "ARM Cortex-A15", + "0xd01": "ARM Cortex-A32", + "0xd03": "ARM Cortex-A53", + "0xd04": "ARM Cortex-A35", + "0xd05": "ARM Cortex-A55", + "0xd07": "ARM Cortex-A57", + "0xd08": "ARM Cortex-A72", + "0xd09": "ARM Cortex-A73", + "0xd0a": "ARM Cortex-A75", + "0xd0b": "ARM Cortex-A76", + "0xd0c": "ARM Neoverse N1", + "0xd0d": "ARM Cortex-A77", + "0xd41": "ARM Cortex-A78", + "0xd44": "ARM Cortex-X1", + "0xd46": "ARM Cortex-A510", + "0xd47": "ARM Cortex-A710", + "0xd48": "ARM Cortex-X2", +} + + +def _get_arm_cpu_model(): + """Detect ARM CPU model from /proc/cpuinfo CPU part field. + + Returns a friendly name like 'ARM Cortex-A53' or None. + """ + try: + with open("/proc/cpuinfo") as f: + for line in f: + if line.startswith("CPU part"): + part = line.split(":")[1].strip().lower() + return _ARM_CPU_PART_MAP.get(part) + except (FileNotFoundError, PermissionError): + pass + return None + + +def _get_cpu_info(): + """Return CPU model name, frequency strings, and core count. + + Detection order for model name: + 1. ARM CPU part mapping (Raspberry Pi / ARM SoCs) + 2. /proc/cpuinfo ``model name`` field (x86, modern ARM) + 3. /proc/cpuinfo ``Hardware`` field (older Raspberry Pi kernels) + 4. /proc/device-tree/model (Raspberry Pi device-tree) + 5. platform.processor() + 6. ``CPU not detected`` (never shows "Unknown") + + Results are cached after the first call. + """ + if hasattr(_get_cpu_info, "_cache"): + return _get_cpu_info._cache + + model = None + hardware = None + cores = None + + # Try ARM CPU part mapping first (most accurate for Raspberry Pi) + arm_model = _get_arm_cpu_model() + if arm_model: + model = arm_model + + try: + with open("/proc/cpuinfo") as f: + core_count = 0 + for line in f: + if line.startswith("model name") and not model: + model = line.split(":")[1].strip() + if line.startswith("Hardware") and not hardware: + hardware = line.split(":")[1].strip() + if line.startswith("processor"): + core_count += 1 + if core_count > 0: + cores = core_count + except (FileNotFoundError, PermissionError): + pass + + if not model or model.lower() == "unknown": + model = hardware + + if not model or model.lower() == "unknown": + try: + with open("/proc/device-tree/model") as f: + model = f.read().strip().rstrip("\x00") + except (FileNotFoundError, PermissionError): + pass + + if not model or model.lower() == "unknown": + proc = platform.processor() + if proc: + model = proc + + if not model or model.lower() == "unknown": + model = "CPU not detected" + + freq = _get_cpu_freq() + cur_freq = _get_cpu_cur_freq() + max_freq = _get_cpu_max_freq() + # Cross-fallback: if one is unavailable, use the other or the general freq + if not cur_freq: + cur_freq = max_freq or freq + if not max_freq: + max_freq = cur_freq or freq + result = { + "model": model, + "freq": freq, + "cur_freq": cur_freq, + "max_freq": max_freq, + "cores": cores, + } + _get_cpu_info._cache = result + return result + + +def _is_wsl(): + """Detect if running inside Windows Subsystem for Linux.""" + try: + with open("/proc/version") as f: + return "microsoft" in f.read().lower() + except (FileNotFoundError, PermissionError): + return False + + +def _get_host_physical_memory(): + """Try to get actual physical RAM from Windows host (WSL2 only). + + Uses PowerShell interop to query the host's total physical memory. + Returns the value in bytes or None if unavailable. + """ + import subprocess + + try: + result = subprocess.run( + [ + "powershell.exe", + "-NoProfile", + "-Command", + "(Get-CimInstance Win32_ComputerSystem).TotalPhysicalMemory", + ], + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode == 0 and result.stdout.strip(): + return int(result.stdout.strip()) + except (FileNotFoundError, subprocess.TimeoutExpired, ValueError, OSError): + pass + return None + + +def _get_installed_ram(): + """Return total installed physical RAM via vcgencmd (Raspberry Pi). + + Sums ``vcgencmd get_mem arm`` and ``vcgencmd get_mem gpu`` to obtain the + full physical memory (including GPU-reserved portion invisible to Linux). + Returns a formatted string like '512 MB' or None if vcgencmd is unavailable. + """ + import subprocess + + total_mb = 0 + for region in ("arm", "gpu"): + try: + result = subprocess.run( + ["vcgencmd", "get_mem", region], + capture_output=True, text=True, timeout=3, + ) + if result.returncode == 0 and "=" in result.stdout: + val = result.stdout.split("=")[1].strip() + # e.g. "448M" or "64M" + num = int(re.sub(r"[^\d]", "", val)) + total_mb += num + except (FileNotFoundError, subprocess.TimeoutExpired, ValueError, OSError): + return None + if total_mb > 0: + if total_mb >= 1024: + return f"{round(total_mb / 1024, 1)} GB" + return f"{total_mb} MB" + return None + + +def _get_memory_info(): + """Return total and used RAM in human-readable format. + + On WSL, attempts to report the host's physical RAM via PowerShell. + Falls back to /proc/meminfo MemTotal with an annotation when the + true installed amount cannot be determined. + + Also attempts to detect installed physical RAM via vcgencmd on + Raspberry Pi (``installed`` key). + """ + installed = _get_installed_ram() + try: + with open("/proc/meminfo") as f: + meminfo = {} + for line in f: + parts = line.split(":") + if len(parts) == 2: + key = parts[0].strip() + val = parts[1].strip().split()[0] # value in kB + meminfo[key] = int(val) + + total_kb = meminfo.get("MemTotal", 0) + available_kb = meminfo.get("MemAvailable", 0) + used_kb = total_kb - available_kb + + allocated = _format_bytes(total_kb * 1024) + used = _format_bytes(used_kb * 1024) + + if _is_wsl(): + host_mem = _get_host_physical_memory() + if host_mem: + return { + "total": _format_bytes(host_mem), + "used": used, + "installed": installed, + "allocated": allocated, + "note": f"WSL allocated: {allocated}", + } + return { + "total": allocated, + "used": used, + "installed": installed, + "allocated": allocated, + "note": "WSL allocated", + } + + return {"total": allocated, "used": used, "installed": installed, "allocated": None, "note": None} + except (FileNotFoundError, PermissionError): + return {"total": "N/A", "used": "N/A", "installed": installed, "allocated": None, "note": None} + + +def _get_storage_info(): + """Return total and used disk space for the root filesystem.""" + try: + stat = os.statvfs("/") + total = stat.f_frsize * stat.f_blocks + used = stat.f_frsize * (stat.f_blocks - stat.f_bfree) + return { + "total": _format_bytes(total), + "used": _format_bytes(used), + } + except OSError: + return {"total": "N/A", "used": "N/A"} + + +def _get_os_info(): + """Return OS name, version, distribution ID, and pretty name.""" + name = "Unknown" + version = None + distro_id = None + pretty_name = None + + try: + with open("/etc/os-release") as f: + for line in f: + if line.startswith("PRETTY_NAME="): + pretty_name = line.split("=", 1)[1].strip().strip('"') + elif line.startswith("VERSION="): + version = line.split("=", 1)[1].strip().strip('"') + elif line.startswith("NAME=") and not line.startswith("NAME=\"\n"): + name = line.split("=", 1)[1].strip().strip('"') + elif line.startswith("ID="): + distro_id = line.split("=", 1)[1].strip().strip('"') + except (FileNotFoundError, PermissionError): + name = f"{platform.system()} {platform.release()}" + + return { + "name": name, + "version": version, + "distro": distro_id, + "pretty_name": pretty_name or name, + } + + +def _get_architecture(): + """Return the system architecture (e.g. x86_64, aarch64).""" + return platform.machine() or "Unknown" + + +def _get_device_model(): + """Return the device model (e.g. Raspberry Pi 4 Model B).""" + try: + with open("/proc/device-tree/model") as f: + return f.read().strip().rstrip("\x00") + except (FileNotFoundError, PermissionError): + return platform.machine() or "Unknown" + + +def _get_temperature(): + """Return CPU temperature as a formatted string. + + Priority: psutil sensors -> vcgencmd -> thermal_zone0 sysfs -> None. + """ + # 1. psutil (cross-platform) + if psutil is not None: + try: + temps = psutil.sensors_temperatures() + for name in ("cpu_thermal", "cpu-thermal", "coretemp", "k10temp"): + if name in temps and temps[name]: + current = temps[name][0].current + if current and current > 0: + return f"{current:.0f} °C" + # Fallback: first available sensor + for entries in temps.values(): + if entries and entries[0].current > 0: + return f"{entries[0].current:.0f} °C" + except Exception: + pass + + # 2. vcgencmd (Raspberry Pi) + import subprocess + try: + result = subprocess.run( + ["vcgencmd", "measure_temp"], + capture_output=True, text=True, timeout=3, + ) + if result.returncode == 0 and "temp=" in result.stdout: + temp_str = result.stdout.split("=")[1].split("'")[0] + return f"{float(temp_str):.0f} °C" + except (FileNotFoundError, subprocess.TimeoutExpired, ValueError, OSError): + pass + + # 3. sysfs thermal zone + try: + with open("/sys/class/thermal/thermal_zone0/temp") as f: + millideg = int(f.read().strip()) + if millideg > 0: + return f"{millideg / 1000:.0f} °C" + except (FileNotFoundError, PermissionError, ValueError): + pass + + return None + +def _get_uptime(): + """Return system uptime as a human-readable string.""" + try: + with open("/proc/uptime") as f: + seconds = int(float(f.read().split()[0])) + days, remainder = divmod(seconds, 86400) + hours, remainder = divmod(remainder, 3600) + minutes, _ = divmod(remainder, 60) + parts = [] + if days: + parts.append(f"{days}d") + if hours: + parts.append(f"{hours}h") + parts.append(f"{minutes}m") + return " ".join(parts) + except (FileNotFoundError, PermissionError): + return "N/A" + + +def _get_last_boot(): + """Return last boot time as a formatted string.""" + try: + with open("/proc/uptime") as f: + uptime_seconds = float(f.read().split()[0]) + import time + boot_timestamp = time.time() - uptime_seconds + from datetime import datetime + boot_dt = datetime.fromtimestamp(boot_timestamp) + return boot_dt.strftime("%Y-%m-%d %H:%M") + except (FileNotFoundError, PermissionError): + return "N/A" + + +def _get_local_ip(): + """Return the local IP address.""" + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + s.settimeout(2) + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + return ip + finally: + try: + s.close() + except Exception: + pass + except (OSError, socket.error): + return "N/A" + + +def _get_hostname(): + """Return the system hostname.""" + return socket.gethostname() + + +def _get_display_info(display_manager): + """Return display information using the same detection path as SystemStatus. + + Mirrors the ``_get_display_value`` / ``_parse_epd_code`` logic already used + by the SystemStatus plugin so that the System Info page shows the identical + resolved display name. No manual per-model catalog is maintained - the EPD + code is parsed dynamically from the ``display_type`` config value. + """ + device_config = display_manager.device_config + display_type = device_config.get_config("display_type", default="unknown") + + name = _resolve_display_name(display_type) + + # Resolution from the same config source used by display rendering + resolution = None + try: + w, h = device_config.get_resolution() + resolution = f"{w} × {h}" + except (TypeError, ValueError, KeyError): + pass + + return {"name": name, "type": display_type, "resolution": resolution} + + +# -- Display name resolution (mirrors SystemStatus._get_display_value) -- + +_DISPLAY_NAME_MAP = { + "inky": "Inky e-Paper", + "mock": "Mock Display", +} + +_EPD_PATTERN = re.compile( + r"^epd(\d+)in(\d+)([a-z]*)(?:_(v\d+|hd))?(?:([a-z]*)(?:_(v\d+|hd))?)?$", + re.IGNORECASE, +) + + +def _parse_epd_code(code): + """Parse a Waveshare EPD code into a friendly name. + + Examples:: + + epd7in3e -> Waveshare 7.3inch e-Paper + epd5in83_v2 -> Waveshare 5.83inch e-Paper V2 + epd7in5b_hd -> Waveshare 7.5inch e-Paper HD + epd13in3k -> Waveshare 13.3inch e-Paper + """ + m = _EPD_PATTERN.match(code) + if not m: + return None + inches = m.group(1) + decimal = m.group(2) + size = f"{inches}.{decimal}" + + suffixes = [] + for g in (m.group(4), m.group(5), m.group(6)): + if g: + suffixes.append(g.upper()) + + suffix_str = f" {' '.join(suffixes)}" if suffixes else "" + return f"Waveshare {size}inch e-Paper{suffix_str}" + + +def _resolve_display_name(display_type): + """Return a human-readable display name from the config display_type value. + + Uses the same strategy as SystemStatus._get_display_value: + 1. Check the static name map (inky, mock) + 2. Parse Waveshare EPD codes dynamically + 3. Fall back to the raw display_type string + """ + if not display_type: + return "Unknown" + + normalized = str(display_type).strip().lower() + if not normalized: + return "Unknown" + + friendly = _DISPLAY_NAME_MAP.get(normalized) + if friendly: + return friendly + + if fnmatch.fnmatch(normalized, "epd*in*"): + parsed = _parse_epd_code(normalized) + if parsed: + return f"{parsed} ({display_type})" + return f"Waveshare e-Paper ({display_type})" + + return display_type + + +def _get_kernel_info(): + """Return kernel version string.""" + return platform.release() + + +def _get_device_name(device_config): + """Return the configured device name from InkyPi config.""" + return device_config.get_config("name", default="InkyPi") + + +def _format_bytes(num_bytes): + """Format bytes into a human-readable string (e.g. 3.7 GB).""" + for unit in ("B", "KB", "MB", "GB", "TB"): + if abs(num_bytes) < 1024.0: + return f"{num_bytes:.1f} {unit}" + num_bytes /= 1024.0 + return f"{num_bytes:.1f} PB" + + +def _ram_secondary(mem): + """Build secondary text for the RAM card, annotating WSL when applicable.""" + usage = f"{mem['used']} of {mem['total']} used" + note = mem.get("note") + if note: + return f"{usage} ({note})" + return usage + + +def _collect_system_info(display_manager): + """Collect all system information split into highlight cards and specification sections.""" + cpu = _get_cpu_info() + mem = _get_memory_info() + storage = _get_storage_info() + os_info = _get_os_info() + display = _get_display_info(display_manager) + local_ip = _get_local_ip() + uptime = _get_uptime() + temperature = _get_temperature() + device_config = display_manager.device_config + + cards = [ + { + "icon": "display", + "label": "Display", + "value": display["name"], + "secondary": display["resolution"], + }, + { + "icon": "memory", + "label": "Installed RAM", + "value": mem["installed"] or mem["total"], + "secondary": _ram_secondary(mem), + }, + { + "icon": "cpu", + "label": "CPU", + "value": cpu["model"], + "secondary": cpu["max_freq"] or cpu["freq"], + }, + { + "icon": "temperature", + "label": "Temperature", + "value": temperature or "N/A", + }, + { + "icon": "uptime", + "label": "Uptime", + "value": uptime, + }, + { + "icon": "network", + "label": "Local IP", + "value": local_ip, + }, + ] + + device_specs = [ + {"label": "Device name", "value": _get_device_name(device_config)}, + {"label": "Network name", "value": _get_hostname()}, + {"label": "Architecture", "value": _get_architecture()}, + {"label": "CPU", "value": cpu["model"]}, + {"label": "CPU cores", "value": str(cpu["cores"]) if cpu["cores"] else "N/A"}, + {"label": "Current frequency", "value": cpu["cur_freq"] or "N/A"}, + {"label": "Max frequency", "value": cpu["max_freq"] or "N/A"}, + ] + + wsl_allocated = mem.get("allocated") + if wsl_allocated: + # WSL: total may be host physical RAM, allocated is the WSL-visible memory + if mem["total"] != wsl_allocated: + device_specs.append({"label": "Installed RAM", "value": mem["total"]}) + device_specs.append({"label": "Usable RAM", "value": wsl_allocated}) + device_specs.append({"label": "RAM used", "value": f"{mem['used']} of {wsl_allocated} used"}) + else: + # Non-WSL: show vcgencmd physical RAM (RPi) if available, then system totals + if mem.get("installed"): + device_specs.append({"label": "Installed RAM", "value": mem["installed"]}) + device_specs.append({"label": "Usable RAM", "value": mem["total"]}) + device_specs.append({"label": "RAM used", "value": f"{mem['used']} of {mem['total']} used"}) + else: + # Fallback when physical 'installed' value is unavailable + device_specs.append({"label": "Installed RAM", "value": mem["total"]}) + device_specs.append({"label": "Usable RAM", "value": f"{mem['used']} of {mem['total']} used"}) + + device_specs.extend([ + {"label": "Storage", "value": storage["total"]}, + {"label": "Storage used", "value": f"{storage['used']} of {storage['total']} used"}, + ]) + + system_specs = [ + {"label": "OS name", "value": os_info["name"]}, + {"label": "OS version", "value": os_info["version"] or "N/A"}, + {"label": "Distribution", "value": os_info["distro"] or "N/A"}, + {"label": "Kernel", "value": _get_kernel_info()}, + ] + + return cards, device_specs, system_specs + + +def _format_time_ago(dt): + """Return a human-readable 'X ago' string from a datetime.""" + now = datetime.now(timezone.utc) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + delta = now - dt + total_seconds = int(delta.total_seconds()) + if total_seconds < 60: + return "just now" + minutes = total_seconds // 60 + if minutes < 60: + return f"{minutes} min ago" + hours = minutes // 60 + if hours < 24: + return f"{hours}h ago" + days = hours // 24 + return f"{days}d ago" + + +def _collect_overview(): + """Collect live system overview data. + + Returns a tuple of two lists (line1, line2), each containing + dicts with 'label' and 'value' keys. + line1: Active playlist (shown alone on first row). + line2: Active plugin, Last refresh, Installed plugins. + """ + line1 = [] + line2 = [] + + device_config = current_app.config.get("DEVICE_CONFIG") + + if device_config: + # Active playlist (line 1) + playlist_value = "None (no active schedule now)" + playlist_manager = device_config.get_playlist_manager() + if playlist_manager: + active = playlist_manager.determine_active_playlist(datetime.now()) + if active: + playlist_value = f"{active.name} ({active.start_time}\u2013{active.end_time})" + line1.append({"label": "Active playlist", "value": playlist_value}) + + # Active plugin (line 2) + refresh_info = device_config.get_refresh_info() + plugin_name = "None" + if refresh_info and refresh_info.plugin_id: + plugin_name = refresh_info.plugin_id + plugin_cfg = device_config.get_plugin(refresh_info.plugin_id) + if plugin_cfg: + plugin_name = plugin_cfg.get("display_name", plugin_name) + line2.append({"label": "Active plugin", "value": plugin_name}) + + # Last refresh (line 2) + last_refresh = "None" + if refresh_info and refresh_info.refresh_time: + refresh_dt = refresh_info.get_refresh_datetime() + if refresh_dt: + last_refresh = _format_time_ago(refresh_dt) + line2.append({"label": "Last refresh", "value": last_refresh}) + + # Installed plugins (line 2) + plugins = device_config.get_plugins() + if plugins: + line2.append({"label": "Installed plugins", "value": str(len(plugins))}) + + return line1, line2 + + +def _collect_plugin_info(): + """Collect installed plugin metadata from plugin-info.json files. + + Returns a dict with 'builtin' and 'third_party' lists plus counts. + Uses the same logic as the CLI ``inkypi plugin list`` command: + a plugin is third-party when its plugin-info.json has a non-empty + ``repository`` field, builtin otherwise. + """ + plugins_dir = _PLUGINS_DIR + + builtin = [] + third_party = [] + + if not plugins_dir.is_dir(): + return { + "builtin": builtin, + "third_party": third_party, + "total": 0, + "builtin_count": 0, + "third_party_count": 0, + } + + for entry in sorted(plugins_dir.iterdir()): + if not entry.is_dir(): + continue + plugin_id = entry.name + if plugin_id in ("base_plugin", "__pycache__"): + continue + + info_file = entry / "plugin-info.json" + if not info_file.is_file(): + continue + + display_name = plugin_id.replace("_", " ").title() + repository = "" + + try: + with open(info_file) as f: + info = json.load(f) + display_name = info.get("display_name", display_name) + repository = info.get("repository", "") + except (json.JSONDecodeError, OSError): + pass + + plugin_data = {"id": plugin_id, "name": display_name} + + # If metadata contains a repository the plugin is third-party. + # Only include the `repository` field when it's a safe http(s) + # URL with a non-empty netloc; otherwise omit the field but keep + # the plugin classified as third-party. + repo = (repository or "").strip() + if repo: + try: + parsed = urlparse(repo) + scheme = (parsed.scheme or "").lower() + if scheme in ("http", "https") and parsed.netloc: + plugin_data["repository"] = repo + except Exception: + pass + third_party.append(plugin_data) + else: + builtin.append(plugin_data) + + builtin.sort(key=lambda p: p["name"].casefold()) + third_party.sort(key=lambda p: p["name"].casefold()) + + total = len(builtin) + len(third_party) + return { + "builtin": builtin, + "third_party": third_party, + "total": total, + "builtin_count": len(builtin), + "third_party_count": len(third_party), + } + + +@system_info_bp.route("/system-info") +def system_info_page(): + display_manager = current_app.config["DISPLAY_MANAGER"] + hostname = _get_hostname() + device_name = _get_device_name(display_manager.device_config) + cards, device_specs, system_specs = _collect_system_info(display_manager) + overview_line1, overview_line2 = _collect_overview() + plugin_info = _collect_plugin_info() + return render_template( + "system_info.html", + hostname=hostname, + device_name=device_name, + overview_line1=overview_line1, + overview_line2=overview_line2, + cards=cards, + device_specs=device_specs, + system_specs=system_specs, + plugin_info=plugin_info, + ) + + +@system_info_bp.route("/api/system-info") +def system_info_api(): + display_manager = current_app.config["DISPLAY_MANAGER"] + hostname = _get_hostname() + cards, device_specs, system_specs = _collect_system_info(display_manager) + overview_line1, overview_line2 = _collect_overview() + plugin_info = _collect_plugin_info() + return jsonify({ + "hostname": hostname, + "overview_line1": overview_line1, + "overview_line2": overview_line2, + "cards": cards, + "device_specs": device_specs, + "system_specs": system_specs, + "plugin_info": plugin_info, + }) diff --git a/src/inkypi.py b/src/inkypi.py index 5dc4de57b..f7cd6fda7 100755 --- a/src/inkypi.py +++ b/src/inkypi.py @@ -30,6 +30,7 @@ from blueprints.plugin import plugin_bp from blueprints.playlist import playlist_bp from blueprints.apikeys import apikeys_bp +from blueprints.system_info import system_info_bp from jinja2 import ChoiceLoader, FileSystemLoader from plugins.plugin_registry import load_plugins from waitress import serve @@ -80,6 +81,7 @@ app.register_blueprint(plugin_bp) app.register_blueprint(playlist_bp) app.register_blueprint(apikeys_bp) +app.register_blueprint(system_info_bp) # Register opener for HEIF/HEIC images register_heif_opener() diff --git a/src/static/icons/system_info.png b/src/static/icons/system_info.png new file mode 100644 index 000000000..59f2353da Binary files /dev/null and b/src/static/icons/system_info.png differ diff --git a/src/static/styles/system_info.css b/src/static/styles/system_info.css new file mode 100644 index 000000000..f8ff4b7af --- /dev/null +++ b/src/static/styles/system_info.css @@ -0,0 +1,283 @@ +/* System Info Page Styles */ + +.sysinfo-hostname { + font-size: 0.9rem; + color: var(--text-secondary); + font-weight: 400; +} + +.sysinfo-icon { + width: 30px; + height: 30px; + color: var(--text-secondary); +} + +/* ── System Overview ── */ + +.sysinfo-overview { + display: flex; + flex-direction: column; + gap: 4px; + padding: 10px 0 16px; +} + +.sysinfo-overview-line { + display: flex; + flex-wrap: wrap; + align-items: center; + gap: 4px; +} + +.sysinfo-overview-item { + font-size: 0.85rem; + line-height: 1.4; + white-space: nowrap; +} + +.sysinfo-overview-sep { + font-size: 0.85rem; + color: var(--text-secondary); + margin: 0 4px; +} + +.sysinfo-overview-label { + color: var(--text-secondary); + font-weight: 400; +} + +.sysinfo-overview-value { + color: var(--text-primary); + font-weight: 600; +} + +/* ── Highlight Cards Grid ── */ + +.sysinfo-grid { + display: grid; + grid-template-columns: repeat(3, 1fr); + gap: 16px; + padding: 10px 0 24px; +} + +/* Individual card */ +.sysinfo-card { + background-color: var(--bg-primary); + border: 1px solid var(--border-color); + border-radius: 12px; + padding: 16px 16px; + display: flex; + flex-direction: column; + gap: 2px; + transition: background-color 0.3s ease, border-color 0.3s ease; +} + +/* Card header: icon + label on same row */ +.sysinfo-card-header { + display: flex; + align-items: center; + gap: 8px; + margin-bottom: 10px; +} + +.sysinfo-card-icon svg { + width: 18px; + height: 18px; + color: var(--text-secondary); + stroke: var(--text-secondary); + flex-shrink: 0; +} + +.sysinfo-card-label { + font-size: 0.8rem; + font-weight: 400; + color: var(--text-secondary); + line-height: 1; +} + +/* Card main value */ +.sysinfo-card-value { + font-size: 1.05rem; + font-weight: 700; + color: var(--text-primary); + word-wrap: break-word; + overflow-wrap: break-word; + line-height: 1.3; +} + +/* Card secondary text */ +.sysinfo-card-secondary { + font-size: 0.8rem; + color: var(--text-secondary); + margin-top: 6px; +} + +/* ── Device Specifications Section ── */ + +.sysinfo-specs-section { + padding: 24px 0 10px; +} + +.sysinfo-specs-title { + font-size: 1rem; + font-weight: 600; + color: var(--text-primary); + margin-bottom: 12px; +} + +.sysinfo-specs-table { + display: flex; + flex-direction: column; +} + +.sysinfo-spec-row { + display: flex; + justify-content: space-between; + align-items: baseline; + padding: 9px 0; + border-bottom: 1px solid var(--border-color); + gap: 16px; + transition: border-color 0.3s ease; +} + +.sysinfo-spec-row:last-child { + border-bottom: none; +} + +.sysinfo-spec-label { + font-size: 0.9rem; + color: var(--text-secondary); + flex-shrink: 0; +} + +.sysinfo-spec-value { + font-size: 0.9rem; + font-weight: 600; + color: var(--text-primary); + text-align: right; + word-break: break-word; +} + +/* ── Responsive ── */ + +@media (max-width: 720px) { + .sysinfo-grid { + grid-template-columns: repeat(2, 1fr); + } +} + +@media (max-width: 480px) { + .sysinfo-grid { + grid-template-columns: 1fr; + } +} + +/* ── Installed Plugins Section ── */ + +.sysinfo-plugin-summary { + display: flex; + flex-wrap: wrap; + align-items: center; + gap: 4px; + padding-top: 9px; + margin-bottom: 20px; + font-size: 0.85rem; +} + +.sysinfo-plugin-summary-item { + white-space: nowrap; +} + +.sysinfo-plugin-group { + margin-bottom: 24px; +} + +.sysinfo-plugin-group:last-child { + margin-bottom: 0; +} + +.sysinfo-plugin-group-title { + font-size: 0.85rem; + font-weight: 500; + color: var(--text-secondary); + margin-bottom: 12px; +} + +.sysinfo-plugin-chips { + display: flex; + flex-wrap: wrap; + gap: 8px; +} + +.sysinfo-plugin-chip { + display: inline-block; + font-size: 0.78rem; + font-weight: 400; + color: var(--text-secondary); + background-color: var(--bg-secondary); + border: 1px solid var(--border-color); + border-radius: 16px; + padding: 3px 10px; + line-height: 1.4; + cursor: default; +} + +.sysinfo-plugin-chip--thirdparty { + color: var(--text-primary); + font-weight: 500; + background-color: var(--bg-primary); + position: relative; + cursor: pointer; + text-decoration: none; + transition: background-color 0.15s ease, box-shadow 0.15s ease; +} + +.sysinfo-plugin-chip--thirdparty:hover { + background-color: var(--bg-secondary); + box-shadow: 0 1px 4px rgba(0, 0, 0, 0.1); +} + +.sysinfo-plugin-exticon { + font-size: 0.78rem; + opacity: 0.75; + margin-left: 2px; +} + +.sysinfo-plugin-tooltip { + display: none; + position: absolute; + bottom: calc(100% + 6px); + left: 50%; + transform: translateX(-50%); + background-color: var(--bg-secondary); + border: 1px solid var(--border-color); + border-radius: 6px; + padding: 6px 10px; + font-size: 0.75rem; + font-weight: 400; + color: var(--text-secondary); + white-space: nowrap; + z-index: 10; + flex-direction: column; + gap: 2px; + box-shadow: 0 2px 6px rgba(0, 0, 0, 0.1); +} + +.sysinfo-plugin-chip--thirdparty:hover .sysinfo-plugin-tooltip, +.sysinfo-plugin-chip--thirdparty:focus .sysinfo-plugin-tooltip, +.sysinfo-plugin-chip--thirdparty:focus-visible .sysinfo-plugin-tooltip, +.sysinfo-plugin-chip--thirdparty:focus-within .sysinfo-plugin-tooltip { + display: flex; +} + +.sysinfo-plugin-chip--thirdparty:focus, +.sysinfo-plugin-chip--thirdparty:focus-visible { + outline: none; + box-shadow: 0 0 0 3px var(--input-focus); + background-color: var(--bg-secondary); +} + +.sysinfo-plugin-none { + font-size: 0.85rem; + color: var(--text-secondary); + font-style: italic; +} diff --git a/src/templates/inky.html b/src/templates/inky.html index 97b85a697..302170d32 100644 --- a/src/templates/inky.html +++ b/src/templates/inky.html @@ -68,6 +68,9 @@

{{ config.name }}

playlists icon + + system info icon + settings icon diff --git a/src/templates/system_info.html b/src/templates/system_info.html new file mode 100644 index 000000000..f435f85dd --- /dev/null +++ b/src/templates/system_info.html @@ -0,0 +1,215 @@ + + + + + + System Info + + + + + + +
+ + + + +
+
+
+ System Info +
+

System Info

+
{{ device_name if device_name else hostname }}
+ {% if device_name and device_name != hostname %} +
{{ hostname }}
+ {% endif %} +
+
+
+
+ +
+ + + {% if overview_line1 or overview_line2 %} +
+ {% if overview_line1 %} +
+ {% for item in overview_line1 %} + + {{ item.label }}: + {{ item.value }} + {% if item.label == "Active playlist" %} + + {% endif %} + + {% endfor %} +
+ {% endif %} + {% if overview_line2 %} +
+ {% for item in overview_line2 %} + {% if not loop.first %}{% endif %} + + {{ item.label }}: + {{ item.value }} + + {% endfor %} +
+ {% endif %} +
+ {% endif %} + + +
+ {% for card in cards %} +
+
+
+ {% if card.icon == 'storage' %} + + + + + + {% elif card.icon == 'memory' %} + + + + + + + + {% elif card.icon == 'cpu' %} + + + + + + + + + {% elif card.icon == 'display' %} + + + + + + {% elif card.icon == 'network' %} + + + + + + + {% elif card.icon == 'uptime' %} + + + + + {% elif card.icon == 'temperature' %} + + + + {% endif %} +
+ {{ card.label }} +
+ {{ card.value }} + {% if card.secondary %} + {{ card.secondary }} + {% endif %} +
+ {% endfor %} +
+ + +
+

Device specifications

+
+ {% for spec in device_specs %} +
+ {{ spec.label }} + {{ spec.value }} +
+ {% endfor %} +
+
+ + +
+

System specifications

+
+ {% for spec in system_specs %} +
+ {{ spec.label }} + {{ spec.value }} +
+ {% endfor %} +
+
+ + +
+

Installed plugins

+ +
+ + Total: + {{ plugin_info.total }} + + + + Built-in: + {{ plugin_info.builtin_count }} + + + + Third-party: + {{ plugin_info.third_party_count }} + +
+ + {% if plugin_info.third_party %} +
+

Third-party

+
+ {% for p in plugin_info.third_party %} + + {{ p.name }} + + id: {{ p.id }} + {{ p.repository | replace('https://', '') }} + + + {% endfor %} +
+
+ {% endif %} + +
+

Built-in

+ {% if plugin_info.builtin %} +
+ {% for p in plugin_info.builtin %} + {{ p.name }} + {% endfor %} +
+ {% else %} + None + {% endif %} +
+
+
+ + diff --git a/tests/test_system_info.py b/tests/test_system_info.py new file mode 100644 index 000000000..fe93f214c --- /dev/null +++ b/tests/test_system_info.py @@ -0,0 +1,708 @@ +import pytest +from unittest.mock import patch, MagicMock + +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +from blueprints.system_info import ( + _format_bytes, + _get_cpu_info, + _get_cpu_freq, + _get_cpu_cur_freq, + _get_cpu_max_freq, + _get_arm_cpu_model, + _get_installed_ram, + _read_sysfs_freq, + _get_memory_info, + _get_storage_info, + _get_os_info, + _get_device_model, + _get_uptime, + _get_last_boot, + _get_local_ip, + _get_hostname, + _get_display_info, + _get_kernel_info, + _get_device_name, + _get_architecture, + _get_temperature, + _is_wsl, + _get_host_physical_memory, + _ram_secondary, + _parse_epd_code, + _resolve_display_name, + _collect_system_info, + _format_time_ago, + _collect_overview, + _collect_plugin_info, +) + + +class TestFormatBytes: + def test_bytes(self): + assert _format_bytes(500) == "500.0 B" + + def test_kilobytes(self): + assert _format_bytes(1024) == "1.0 KB" + + def test_megabytes(self): + assert _format_bytes(1024 * 1024) == "1.0 MB" + + def test_gigabytes(self): + assert _format_bytes(1024 ** 3) == "1.0 GB" + + def test_terabytes(self): + assert _format_bytes(1024 ** 4) == "1.0 TB" + + +class TestGetCpuFreq: + @patch("blueprints.system_info.psutil") + def test_psutil_current(self, mock_psutil): + mock_psutil.cpu_freq.return_value = MagicMock(current=2500.0, max=3000.0) + assert _get_cpu_freq() == "2.5 GHz" + + @patch("blueprints.system_info.psutil") + def test_psutil_max_fallback(self, mock_psutil): + mock_psutil.cpu_freq.return_value = MagicMock(current=0.0, max=1800.0) + assert _get_cpu_freq() == "1.8 GHz" + + @patch("blueprints.system_info.psutil") + def test_psutil_none_falls_through(self, mock_psutil): + mock_psutil.cpu_freq.return_value = None + with patch("builtins.open", side_effect=FileNotFoundError): + assert _get_cpu_freq() is None + + @patch("blueprints.system_info.psutil", new=None) + def test_no_psutil_reads_proc_cpuinfo(self): + cpuinfo = "cpu MHz\t: 1000.000\n" + with patch("builtins.open", MagicMock( + return_value=MagicMock( + __enter__=MagicMock(return_value=iter(cpuinfo.splitlines(True))), + __exit__=MagicMock(return_value=False), + ) + )): + assert _get_cpu_freq() == "1.0 GHz" + + +class TestGetCpuInfo: + def setup_method(self): + if hasattr(_get_cpu_info, "_cache"): + del _get_cpu_info._cache + + @patch("blueprints.system_info._get_cpu_max_freq", return_value=None) + @patch("blueprints.system_info._get_cpu_cur_freq", return_value=None) + @patch("blueprints.system_info._get_cpu_freq", return_value=None) + @patch("blueprints.system_info._get_arm_cpu_model", return_value=None) + @patch("builtins.open", side_effect=FileNotFoundError) + @patch("platform.processor", return_value="x86_64") + def test_fallback_to_platform(self, mock_proc, mock_open, mock_arm, mock_freq, mock_cur, mock_max): + result = _get_cpu_info() + assert result["model"] == "x86_64" + assert result["freq"] is None + assert result["cur_freq"] is None + assert result["max_freq"] is None + assert result["cores"] is None + + @patch("blueprints.system_info._get_cpu_max_freq", return_value="3.0 GHz") + @patch("blueprints.system_info._get_cpu_cur_freq", return_value="2.5 GHz") + @patch("blueprints.system_info._get_cpu_freq", return_value="2.5 GHz") + @patch("blueprints.system_info._get_arm_cpu_model", return_value=None) + @patch("builtins.open") + def test_reads_proc_cpuinfo(self, mock_open, mock_arm, mock_freq, mock_cur, mock_max): + cpuinfo_content = "processor\t: 0\nmodel name\t: Intel(R) Core(TM) i5-12400\nprocessor\t: 1\nmodel name\t: Intel(R) Core(TM) i5-12400\n" + mock_open.return_value = MagicMock( + __enter__=MagicMock(return_value=iter(cpuinfo_content.splitlines(True))), + __exit__=MagicMock(return_value=False), + ) + result = _get_cpu_info() + assert "i5-12400" in result["model"] + assert result["cores"] == 2 + assert result["freq"] == "2.5 GHz" + assert result["cur_freq"] == "2.5 GHz" + assert result["max_freq"] == "3.0 GHz" + + +class TestIsWsl: + @patch("builtins.open") + def test_detects_wsl(self, mock_open): + mock_open.return_value = MagicMock( + __enter__=MagicMock(return_value=MagicMock(read=MagicMock( + return_value="Linux version 5.15.0 (Microsoft)" + ))), + __exit__=MagicMock(return_value=False), + ) + assert _is_wsl() is True + + @patch("builtins.open") + def test_non_wsl(self, mock_open): + mock_open.return_value = MagicMock( + __enter__=MagicMock(return_value=MagicMock(read=MagicMock( + return_value="Linux version 6.1.0-rpi7" + ))), + __exit__=MagicMock(return_value=False), + ) + assert _is_wsl() is False + + @patch("builtins.open", side_effect=FileNotFoundError) + def test_fallback_on_missing(self, mock_open): + assert _is_wsl() is False + + +class TestGetHostPhysicalMemory: + @patch("subprocess.run") + def test_returns_bytes(self, mock_run): + import subprocess + mock_run.return_value = MagicMock(returncode=0, stdout="34359738368\n") + result = _get_host_physical_memory() + assert result == 34359738368 + + @patch("subprocess.run", side_effect=FileNotFoundError) + def test_returns_none_on_failure(self, mock_run): + assert _get_host_physical_memory() is None + + +class TestRamSecondary: + def test_normal(self): + mem = {"total": "4.0 GB", "used": "2.0 GB", "note": None} + assert _ram_secondary(mem) == "2.0 GB of 4.0 GB used" + + def test_wsl_with_host_ram(self): + mem = {"total": "32.0 GB", "used": "8.0 GB", "note": "WSL allocated: 15.5 GB"} + assert _ram_secondary(mem) == "8.0 GB of 32.0 GB used (WSL allocated: 15.5 GB)" + + def test_wsl_no_host_ram(self): + mem = {"total": "15.5 GB", "used": "8.0 GB", "note": "WSL allocated"} + assert _ram_secondary(mem) == "8.0 GB of 15.5 GB used (WSL allocated)" + + +class TestGetMemoryInfo: + @patch("builtins.open", side_effect=FileNotFoundError) + def test_fallback_on_missing_proc(self, mock_open): + result = _get_memory_info() + assert result["total"] == "N/A" + assert result["used"] == "N/A" + assert result["note"] is None + + +class TestGetStorageInfo: + @patch("os.statvfs") + def test_returns_storage(self, mock_statvfs): + mock_statvfs.return_value = MagicMock( + f_frsize=4096, f_blocks=2621440, f_bfree=1310720 + ) + result = _get_storage_info() + assert "GB" in result["total"] or "MB" in result["total"] + + @patch("os.statvfs", side_effect=OSError) + def test_fallback_on_error(self, mock_statvfs): + result = _get_storage_info() + assert result["total"] == "N/A" + + +class TestGetOsInfo: + @patch("builtins.open", side_effect=FileNotFoundError) + def test_fallback_to_platform(self, mock_open): + result = _get_os_info() + assert result["name"] != "Unknown" + assert result["pretty_name"] is not None + assert result["distro"] is None + + +class TestGetDeviceModel: + @patch("builtins.open", side_effect=FileNotFoundError) + @patch("platform.machine", return_value="x86_64") + def test_fallback_to_platform(self, mock_machine, mock_open): + result = _get_device_model() + assert result == "x86_64" + + +class TestGetTemperature: + @patch("blueprints.system_info.psutil") + def test_psutil_cpu_thermal(self, mock_psutil): + mock_psutil.sensors_temperatures.return_value = { + "cpu_thermal": [MagicMock(current=55.0)], + } + assert _get_temperature() == "55 °C" + + @patch("blueprints.system_info.psutil") + def test_psutil_coretemp(self, mock_psutil): + mock_psutil.sensors_temperatures.return_value = { + "coretemp": [MagicMock(current=62.0)], + } + assert _get_temperature() == "62 °C" + + @patch("blueprints.system_info.psutil", new=None) + @patch("subprocess.run", side_effect=FileNotFoundError) + @patch("builtins.open", side_effect=FileNotFoundError) + def test_returns_none_when_unavailable(self, mock_open, mock_run): + assert _get_temperature() is None + + @patch("blueprints.system_info.psutil") + def test_psutil_empty_falls_to_sysfs(self, mock_psutil): + mock_psutil.sensors_temperatures.return_value = {} + with patch("subprocess.run", side_effect=FileNotFoundError): + with patch("builtins.open") as mock_open: + mock_open.return_value = MagicMock( + __enter__=MagicMock(return_value=MagicMock( + read=MagicMock(return_value="45000") + )), + __exit__=MagicMock(return_value=False), + ) + assert _get_temperature() == "45 °C" + + +class TestGetUptime: + @patch("builtins.open", side_effect=FileNotFoundError) + def test_fallback_on_missing(self, mock_open): + assert _get_uptime() == "N/A" + + @patch("builtins.open") + def test_parses_uptime(self, mock_open): + mock_open.return_value.__enter__ = MagicMock( + return_value=MagicMock(read=MagicMock(return_value="90061.23 180000.00")) + ) + mock_open.return_value.__exit__ = MagicMock(return_value=False) + result = _get_uptime() + assert "1d" in result + assert "h" in result + + +class TestGetLastBoot: + @patch("builtins.open", side_effect=FileNotFoundError) + def test_fallback_on_missing(self, mock_open): + assert _get_last_boot() == "N/A" + + +class TestGetLocalIp: + @patch("socket.socket") + def test_returns_ip(self, mock_socket_cls): + mock_sock = MagicMock() + mock_socket_cls.return_value = mock_sock + mock_sock.getsockname.return_value = ("192.168.1.100", 0) + assert _get_local_ip() == "192.168.1.100" + + @patch("socket.socket", side_effect=OSError) + def test_fallback_on_error(self, mock_socket_cls): + assert _get_local_ip() == "N/A" + + +class TestGetHostname: + @patch("socket.gethostname", return_value="inkypi") + def test_returns_hostname(self, mock_hostname): + assert _get_hostname() == "inkypi" + + +class TestParseEpdCode: + def test_simple_model(self): + assert _parse_epd_code("epd7in3e") == "Waveshare 7.3inch e-Paper" + + def test_decimal_model(self): + assert _parse_epd_code("epd5in83") == "Waveshare 5.83inch e-Paper" + + def test_version_suffix(self): + assert _parse_epd_code("epd5in83_v2") == "Waveshare 5.83inch e-Paper V2" + + def test_hd_suffix(self): + assert _parse_epd_code("epd7in5b_hd") == "Waveshare 7.5inch e-Paper HD" + + def test_large_model(self): + assert _parse_epd_code("epd13in3k") == "Waveshare 13.3inch e-Paper" + + def test_invalid_code(self): + assert _parse_epd_code("not_an_epd") is None + + +class TestResolveDisplayName: + def test_mock(self): + assert _resolve_display_name("mock") == "Mock Display" + + def test_inky(self): + assert _resolve_display_name("inky") == "Inky e-Paper" + + def test_waveshare_parsed(self): + result = _resolve_display_name("epd7in3e") + assert "Waveshare 7.3inch e-Paper" in result + assert "epd7in3e" in result + + def test_waveshare_unparseable(self): + result = _resolve_display_name("epd_custom_in_format") + assert "Waveshare e-Paper" in result + + def test_unknown_type(self): + assert _resolve_display_name("something_else") == "something_else" + + def test_empty(self): + assert _resolve_display_name("") == "Unknown" + + def test_none(self): + assert _resolve_display_name(None) == "Unknown" + + +class TestGetDisplayInfo: + def test_mock_display(self): + mock_dm = MagicMock() + mock_dm.device_config.get_config.return_value = "mock" + mock_dm.device_config.get_resolution.return_value = (800, 480) + result = _get_display_info(mock_dm) + assert result["name"] == "Mock Display" + assert result["type"] == "mock" + assert result["resolution"] == "800 × 480" + + def test_inky_display(self): + mock_dm = MagicMock() + mock_dm.device_config.get_config.return_value = "inky" + mock_dm.device_config.get_resolution.return_value = (400, 300) + result = _get_display_info(mock_dm) + assert result["name"] == "Inky e-Paper" + assert result["type"] == "inky" + assert result["resolution"] == "400 × 300" + + def test_waveshare_display(self): + mock_dm = MagicMock() + mock_dm.device_config.get_config.return_value = "epd7in3e" + mock_dm.device_config.get_resolution.return_value = (800, 480) + result = _get_display_info(mock_dm) + assert "Waveshare 7.3inch e-Paper" in result["name"] + assert result["type"] == "epd7in3e" + assert result["resolution"] == "800 × 480" + + def test_resolution_unavailable(self): + mock_dm = MagicMock() + mock_dm.device_config.get_config.return_value = "mock" + mock_dm.device_config.get_resolution.side_effect = KeyError + result = _get_display_info(mock_dm) + assert result["resolution"] is None + + +class TestGetKernelInfo: + @patch("platform.release", return_value="6.1.0-rpi7-rpi-v8") + def test_returns_kernel(self, mock_release): + assert _get_kernel_info() == "6.1.0-rpi7-rpi-v8" + + +class TestGetArchitecture: + @patch("platform.machine", return_value="aarch64") + def test_returns_arch(self, mock_machine): + assert _get_architecture() == "aarch64" + + @patch("platform.machine", return_value="") + def test_fallback_on_empty(self, mock_machine): + assert _get_architecture() == "Unknown" + + +class TestGetDeviceName: + def test_returns_config_name(self): + mock_config = MagicMock() + mock_config.get_config.return_value = "My InkyPi" + assert _get_device_name(mock_config) == "My InkyPi" + + def test_returns_default(self): + mock_config = MagicMock() + mock_config.get_config.return_value = "InkyPi" + assert _get_device_name(mock_config) == "InkyPi" + + +class TestCollectSystemInfo: + @patch("blueprints.system_info._get_temperature", return_value=None) + @patch("blueprints.system_info._get_local_ip", return_value="192.168.1.1") + @patch("blueprints.system_info._get_last_boot", return_value="2025-01-01 10:00") + @patch("blueprints.system_info._get_uptime", return_value="5d 3h 20m") + @patch("blueprints.system_info._get_device_model", return_value="Raspberry Pi 4") + @patch("blueprints.system_info._get_os_info", return_value={"name": "Debian GNU/Linux", "version": "11", "distro": "debian", "pretty_name": "Debian GNU/Linux 11 (bullseye)"}) + @patch("blueprints.system_info._get_storage_info", return_value={"total": "32.0 GB", "used": "10.0 GB"}) + @patch("blueprints.system_info._get_memory_info", return_value={"total": "4.0 GB", "used": "2.0 GB", "installed": None, "note": None}) + @patch("blueprints.system_info._get_cpu_info", return_value={"model": "ARM Cortex-A72", "freq": "1.5 GHz", "cur_freq": "1.2 GHz", "max_freq": "1.5 GHz", "cores": 4}) + @patch("blueprints.system_info._get_kernel_info", return_value="6.1.0-rpi7") + @patch("blueprints.system_info._get_hostname", return_value="inkypi") + @patch("blueprints.system_info._get_device_name", return_value="My InkyPi") + @patch("blueprints.system_info._get_architecture", return_value="aarch64") + def test_returns_cards_and_specs(self, *mocks): + mock_dm = MagicMock() + mock_dm.device_config.get_config.return_value = "mock" + mock_dm.device_config.get_resolution.return_value = (800, 480) + + cards, device_specs, system_specs = _collect_system_info(mock_dm) + + # Verify cards – Display-centric order, temperature always present + card_labels = [c["label"] for c in cards] + assert card_labels[0] == "Display" + assert "Installed RAM" in card_labels + assert "CPU" in card_labels + assert "Temperature" in card_labels + assert "Uptime" in card_labels + assert "Local IP" in card_labels + assert "Storage" not in card_labels + assert "OS" not in card_labels + assert len(cards) == 6 + + # When temperature is None, card shows "N/A" + temp_card = next(c for c in cards if c["label"] == "Temperature") + assert temp_card["value"] == "N/A" + + # Verify device specs + dev_labels = [s["label"] for s in device_specs] + assert "Device name" in dev_labels + assert "Network name" in dev_labels + assert "Model" not in dev_labels + assert "Architecture" in dev_labels + assert "CPU" in dev_labels + assert "CPU cores" in dev_labels + assert "Current frequency" in dev_labels + assert "Max frequency" in dev_labels + assert "Installed RAM" in dev_labels + assert "Usable RAM" in dev_labels + assert "Storage" in dev_labels + assert "Storage used" in dev_labels + # device_specs count varies based on installed RAM availability + + # Verify system specs + sys_labels = [s["label"] for s in system_specs] + assert "OS name" in sys_labels + assert "OS version" in sys_labels + assert "Distribution" in sys_labels + assert "Kernel" in sys_labels + assert len(system_specs) == 4 + + @patch("blueprints.system_info._get_temperature", return_value="55 °C") + @patch("blueprints.system_info._get_local_ip", return_value="192.168.1.1") + @patch("blueprints.system_info._get_last_boot", return_value="2025-01-01 10:00") + @patch("blueprints.system_info._get_uptime", return_value="5d 3h 20m") + @patch("blueprints.system_info._get_device_model", return_value="Raspberry Pi 4") + @patch("blueprints.system_info._get_os_info", return_value={"name": "Debian GNU/Linux", "version": "11", "distro": "debian", "pretty_name": "Debian GNU/Linux 11 (bullseye)"}) + @patch("blueprints.system_info._get_storage_info", return_value={"total": "32.0 GB", "used": "10.0 GB"}) + @patch("blueprints.system_info._get_memory_info", return_value={"total": "4.0 GB", "used": "2.0 GB", "installed": None, "allocated": None, "note": None}) + @patch("blueprints.system_info._get_cpu_info", return_value={"model": "ARM Cortex-A72", "freq": "1.5 GHz", "cur_freq": "1.2 GHz", "max_freq": "1.5 GHz", "cores": 4}) + @patch("blueprints.system_info._get_kernel_info", return_value="6.1.0-rpi7") + @patch("blueprints.system_info._get_hostname", return_value="inkypi") + @patch("blueprints.system_info._get_device_name", return_value="My InkyPi") + @patch("blueprints.system_info._get_architecture", return_value="aarch64") + def test_temperature_card_shows_value_when_available(self, *mocks): + mock_dm = MagicMock() + mock_dm.device_config.get_config.return_value = "mock" + mock_dm.device_config.get_resolution.return_value = (800, 480) + + cards, _, _ = _collect_system_info(mock_dm) + temp_card = next(c for c in cards if c["label"] == "Temperature") + assert temp_card["value"] == "55 °C" + assert len(cards) == 6 + + +class TestFormatTimeAgo: + def test_just_now(self): + from datetime import datetime, timezone, timedelta + now = datetime.now(timezone.utc) + assert _format_time_ago(now) == "just now" + + def test_minutes_ago(self): + from datetime import datetime, timezone, timedelta + dt = datetime.now(timezone.utc) - timedelta(minutes=5) + assert _format_time_ago(dt) == "5 min ago" + + def test_hours_ago(self): + from datetime import datetime, timezone, timedelta + dt = datetime.now(timezone.utc) - timedelta(hours=3) + assert _format_time_ago(dt) == "3h ago" + + def test_days_ago(self): + from datetime import datetime, timezone, timedelta + dt = datetime.now(timezone.utc) - timedelta(days=2) + assert _format_time_ago(dt) == "2d ago" + + def test_naive_datetime(self): + from datetime import datetime, timezone, timedelta + dt = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(minutes=10) + assert _format_time_ago(dt) == "10 min ago" + + +class TestCollectOverview: + def test_returns_all_items(self): + from flask import Flask + app = Flask(__name__) + + mock_refresh_info = MagicMock() + mock_refresh_info.plugin_id = "clock" + mock_refresh_info.refresh_time = "2026-03-27T10:00:00+00:00" + from datetime import datetime, timezone + mock_refresh_info.get_refresh_datetime.return_value = datetime( + 2026, 3, 27, 10, 0, tzinfo=timezone.utc + ) + + mock_config = MagicMock() + mock_config.get_refresh_info.return_value = mock_refresh_info + mock_config.get_plugin.return_value = {"display_name": "Clock", "id": "clock"} + mock_config.get_plugins.return_value = [{"id": "clock"}, {"id": "weather"}] + + mock_playlist = MagicMock() + mock_playlist.name = "Default" + mock_playlist.start_time = "00:00" + mock_playlist.end_time = "24:00" + mock_config.get_playlist_manager.return_value.determine_active_playlist.return_value = mock_playlist + + app.config["DEVICE_CONFIG"] = mock_config + + with app.app_context(): + line1, line2 = _collect_overview() + + # Line 1: Active playlist + l1_labels = [i["label"] for i in line1] + assert "Active playlist" in l1_labels + playlist = next(i for i in line1 if i["label"] == "Active playlist") + assert playlist["value"] == "Default (00:00\u201324:00)" + + # Line 2: Active plugin, Last refresh, Installed plugins + l2_labels = [i["label"] for i in line2] + assert "Active plugin" in l2_labels + assert "Last refresh" in l2_labels + assert "Installed plugins" in l2_labels + + plugin = next(i for i in line2 if i["label"] == "Active plugin") + assert plugin["value"] == "Clock" + + plugins_count = next(i for i in line2 if i["label"] == "Installed plugins") + assert plugins_count["value"] == "2" + + def test_empty_when_no_config(self): + from flask import Flask + app = Flask(__name__) + + with app.app_context(): + line1, line2 = _collect_overview() + + assert line1 == [] + assert line2 == [] + + def test_skips_missing_fields(self): + from flask import Flask + app = Flask(__name__) + + mock_refresh_info = MagicMock() + mock_refresh_info.plugin_id = None + mock_refresh_info.refresh_time = None + + mock_config = MagicMock() + mock_config.get_refresh_info.return_value = mock_refresh_info + mock_config.get_plugins.return_value = [] + mock_config.get_playlist_manager.return_value.determine_active_playlist.return_value = None + + app.config["DEVICE_CONFIG"] = mock_config + + with app.app_context(): + line1, line2 = _collect_overview() + + # Playlist always present with fallback + playlist = next(i for i in line1 if i["label"] == "Active playlist") + assert playlist["value"] == "None (no active schedule now)" + + # Active plugin and Last refresh always present with "None" + l2_labels = [i["label"] for i in line2] + assert "Active plugin" in l2_labels + assert "Last refresh" in l2_labels + assert "Installed plugins" not in l2_labels + + plugin = next(i for i in line2 if i["label"] == "Active plugin") + assert plugin["value"] == "None" + + refresh = next(i for i in line2 if i["label"] == "Last refresh") + assert refresh["value"] == "None" + + +class TestCollectPluginInfo: + def test_reads_real_plugins_dir(self): + result = _collect_plugin_info() + assert result["total"] > 0 + assert result["total"] == result["builtin_count"] + result["third_party_count"] + assert isinstance(result["builtin"], list) + assert isinstance(result["third_party"], list) + for p in result["builtin"] + result["third_party"]: + assert "id" in p + assert "name" in p + + def test_empty_when_dir_missing(self, tmp_path): + # Point the plugins directory to a non-existent path and call the real + # implementation to verify the fallback behavior. + fake_dir = tmp_path / "nonexistent" + with patch("blueprints.system_info._PLUGINS_DIR", fake_dir): + result = _collect_plugin_info() + + assert result["total"] == 0 + assert result["builtin_count"] == 0 + assert result["third_party_count"] == 0 + assert result["builtin"] == [] + assert result["third_party"] == [] + + def test_third_party_detection(self, tmp_path): + import json + plugins_dir = tmp_path / "plugins" + plugins_dir.mkdir() + + # Builtin plugin (no repository) + builtin = plugins_dir / "my_builtin" + builtin.mkdir() + (builtin / "plugin-info.json").write_text( + json.dumps({"display_name": "My Builtin", "id": "my_builtin"}) + ) + + # Third-party plugin (has repository) + thirdparty = plugins_dir / "my_thirdparty" + thirdparty.mkdir() + (thirdparty / "plugin-info.json").write_text( + json.dumps({ + "display_name": "My Third Party", + "id": "my_thirdparty", + "repository": "https://github.com/example/plugin", + }) + ) + + with patch( + "blueprints.system_info._PLUGINS_DIR", + plugins_dir, + ): + result = _collect_plugin_info() + + assert result["total"] == 2 + assert result["builtin_count"] == 1 + assert result["third_party_count"] == 1 + assert result["third_party"][0]["name"] == "My Third Party" + assert result["third_party"][0]["repository"] == "https://github.com/example/plugin" + assert result["builtin"][0]["name"] == "My Builtin" + + def test_skips_dir_without_info_file(self, tmp_path): + plugins_dir = tmp_path / "plugins" + plugins_dir.mkdir() + + no_info = plugins_dir / "some_plugin" + no_info.mkdir() + + with patch( + "blueprints.system_info._PLUGINS_DIR", + plugins_dir, + ): + result = _collect_plugin_info() + + assert result["total"] == 0 + assert result["builtin"] == [] + assert result["third_party"] == [] + + def test_skips_base_plugin_and_pycache(self, tmp_path): + import json + plugins_dir = tmp_path / "plugins" + plugins_dir.mkdir() + + (plugins_dir / "base_plugin").mkdir() + (plugins_dir / "__pycache__").mkdir() + real = plugins_dir / "real_plugin" + real.mkdir() + (real / "plugin-info.json").write_text( + json.dumps({"display_name": "Real Plugin", "id": "real_plugin"}) + ) + + with patch( + "blueprints.system_info._PLUGINS_DIR", + plugins_dir, + ): + result = _collect_plugin_info() + + assert result["total"] == 1 + assert result["builtin"][0]["id"] == "real_plugin" + assert result["builtin"][0]["name"] == "Real Plugin"