diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/__init__.py b/usr/lib/python3/dist-packages/linuxmusterTools/__init__.py index e8a15e7..3b98ef8 100644 --- a/usr/lib/python3/dist-packages/linuxmusterTools/__init__.py +++ b/usr/lib/python3/dist-packages/linuxmusterTools/__init__.py @@ -1,34 +1,5 @@ -import logging +"""Overlay package for linuxmusterTools extensions.""" -logger = logging.getLogger(__name__) -logger_handler = logging.StreamHandler() -logger.addHandler(logger_handler) -logger.setLevel(logging.INFO) +from pkgutil import extend_path -class LMNToolsFormatter(logging.Formatter): - grey = '\x1b[38;5;245m' - green = '\x1b[38;5;82m' - orange = '\x1b[38;5;214m' - red = '\x1b[38;5;196m' - bold_red = '\x1b[31;1m' - violet = '\x1b[38;5;129m' - reset = '\x1b[0m' - - def __init__(self): - super().__init__() - self.fmtprefix = "%(asctime)s %(name)60s" + self.violet + " %(filename)20s:%(lineno)d " + self.reset - self.fmt = f"%(levelname)8s - %(message)s" - self.FORMATS = { - logging.DEBUG: self.fmtprefix + self.grey + self.fmt + self.reset, - logging.INFO: self.fmtprefix + self.green + self.fmt + self.reset, - logging.WARNING: self.fmtprefix + self.orange + self.fmt + self.reset, - logging.ERROR: self.fmtprefix + self.red + self.fmt + self.reset, - logging.CRITICAL: self.fmtprefix + self.bold_red + self.fmt + self.reset - } - - def format(self, record): - log_fmt = self.FORMATS.get(record.levelno) - formatter = logging.Formatter(log_fmt, "%Y-%m-%d %H:%M:%S") - return formatter.format(record) - -logger_handler.setFormatter(LMNToolsFormatter()) +__path__ = extend_path(__path__, __name__) diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/__init__.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/__init__.py index 6c826cf..5d33187 100644 --- a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/__init__.py +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/__init__.py @@ -1,3 +1,54 @@ -from .images import * -from .config import * -from .command import * +"""LINBO modules provided by this overlay package.""" + +from importlib import import_module +from pkgutil import extend_path +import subprocess + +__path__ = extend_path(__path__, __name__) + +from .wol import * +from .host_status import * +from .boot_logs import * +from .hooks import * +from .multicast import * +from .torrent import * +from .ssh import * +from .terminal import * +from .firmware import * +from .drivers import * +from .kernel import * +from .linbofs import * +from .wlan import * +from .image_sync import * +from .linbo_update import * +from .grub_generator import * +from .dhcp import * + +# NOTE: hosts, config, grub, images, changes are provided by +# upstream linuxmuster-tools — do NOT re-export here + +_LEGACY_MODULES = ("config", "images", "command", "changes", "hosts", "grub") + + +def __getattr__(name: str): + """Resolve legacy upstream exports lazily. + + This keeps package import lightweight while still allowing callers to + access upstream objects such as LinboConfigManager when available. + """ + if name in _LEGACY_MODULES: + module = import_module(f"{__name__}.{name}") + globals()[name] = module + return module + + for module_name in _LEGACY_MODULES: + try: + module = import_module(f"{__name__}.{module_name}") + except (ImportError, OSError, subprocess.SubprocessError): + continue + if hasattr(module, name): + value = getattr(module, name) + globals()[name] = value + return value + + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/_validation.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/_validation.py new file mode 100644 index 0000000..9eef306 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/_validation.py @@ -0,0 +1,49 @@ +"""Centralized validation helpers backed by upstream NameChecker.""" + +from linuxmusterTools.common.checks import NameChecker + +_checker = NameChecker() + + +def _check_segmented_name(name: str, check_part) -> bool: + """Allow dot-separated names while validating each segment upstream-style.""" + if not isinstance(name, str) or not name: + return False + if "/" in name or "\\" in name or "\0" in name or ".." in name: + return False + + parts = name.split(".") + return all(part and check_part(part) for part in parts) + + +def check_ip(ip: str) -> bool: + """Validate IPv4 address.""" + return _checker.check_ip_name(ip) + + +def check_mac(mac: str) -> bool: + """Validate MAC address (colon or hyphen separated).""" + if not isinstance(mac, str) or not mac: + return False + value = mac.strip() + return _checker.check_mac1_name(value) or _checker.check_mac2_name(value) + + +def normalize_mac(mac: str) -> str | None: + """Normalize MAC to uppercase colon-separated. Returns None if invalid.""" + if not isinstance(mac, str) or not mac: + return None + normalized = _checker.normalize_mac(mac.strip()) + if normalized is None: + return None + return normalized.upper() + + +def check_linbo_image_name(name: str) -> bool: + """Validate LINBO image name.""" + return _check_segmented_name(name, _checker.check_linbo_image_name) + + +def check_linbo_conf_name(name: str) -> bool: + """Validate LINBO config / group name.""" + return _check_segmented_name(name, _checker.check_linbo_conf_name) diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/boot_logs.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/boot_logs.py new file mode 100644 index 0000000..9c006a2 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/boot_logs.py @@ -0,0 +1,142 @@ +""" +LINBO Boot Logs — read client boot logs from /var/log/linuxmuster/linbo/. + +Provides safe file listing, reading, and deletion with path traversal protection. +""" + +import logging +import os +import re +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +DEFAULT_LOG_DIR = "/var/log/linuxmuster/linbo" +MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB +_SAFE_FILENAME = re.compile(r"^[a-zA-Z0-9._-]+$") +_IMAGE_STATUS_PATTERN = re.compile(r"^(\d{12})\s+(\w+):\s+(\S+)(?:\s+\"(\d+)\")?") + + +def get_host_image_status(log_dir: str | None = None) -> dict: + """Parse _image.status files for per-host last sync info. + + Reads *_image.status files from the LINBO log directory. Each file + contains a single line like: 202603241142 applied: win11_pro_edu.qcow2 "202601271107" + + Args: + log_dir: Override log directory (default: /var/log/linuxmuster/linbo) + + Returns: + Dict mapping hostname to {lastSync, action, image, imageVersion} + """ + base = Path(log_dir) if log_dir else Path(DEFAULT_LOG_DIR) + if not base.is_dir(): + return {} + + result = {} + try: + entries = os.listdir(base) + except OSError: + return {} + + status_files = [f for f in entries if f.endswith("_image.status")] + + for filename in sorted(status_files): + hostname = filename.removesuffix("_image.status") + filepath = base / filename + try: + content = filepath.read_text(encoding="utf-8", errors="replace").strip() + except OSError: + continue + + m = _IMAGE_STATUS_PATTERN.match(content) + if m: + ts = m.group(1) # YYYYMMDDHHMI + year, month, day = ts[0:4], ts[4:6], ts[6:8] + hour, minute = ts[8:10], ts[10:12] + + result[hostname] = { + "lastSync": f"{year}-{month}-{day}T{hour}:{minute}:00.000Z", + "action": m.group(2), + "image": m.group(3), + "imageVersion": m.group(4) or None, + } + + return result + + +class LinboBootLogs: + """Read-only access to LINBO client boot logs.""" + + def __init__(self, log_dir: str = DEFAULT_LOG_DIR): + self.log_dir = Path(log_dir) + + def list_logs(self) -> list[dict]: + """List all log files with metadata. + + Returns: + List of {filename, size, modifiedAt} dicts, sorted newest first + """ + if not self.log_dir.is_dir(): + return [] + + logs = [] + for f in self.log_dir.iterdir(): + if not f.is_file(): + continue + stat = f.stat() + logs.append({ + "filename": f.name, + "size": stat.st_size, + "modifiedAt": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), + }) + + return sorted(logs, key=lambda x: x["modifiedAt"], reverse=True) + + def read_log(self, filename: str) -> str | None: + """Read a log file's content. + + Args: + filename: Name of the log file (validated for path traversal) + + Returns: + File content as string, or None if not found + + Raises: + ValueError: If filename contains unsafe characters + ValueError: If file exceeds MAX_FILE_SIZE + """ + if not _SAFE_FILENAME.match(filename): + raise ValueError(f"Unsafe filename: {filename}") + + filepath = self.log_dir / filename + if not filepath.is_file(): + return None + + if filepath.stat().st_size > MAX_FILE_SIZE: + raise ValueError(f"File too large: {filepath.stat().st_size} bytes (max {MAX_FILE_SIZE})") + + return filepath.read_text(encoding="utf-8", errors="replace") + + def delete_log(self, filename: str) -> bool: + """Delete a log file. + + Args: + filename: Name of the log file + + Returns: + True if deleted, False if not found + + Raises: + ValueError: If filename contains unsafe characters + """ + if not _SAFE_FILENAME.match(filename): + raise ValueError(f"Unsafe filename: {filename}") + + filepath = self.log_dir / filename + if not filepath.is_file(): + return False + + filepath.unlink() + return True diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/changes.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/changes.py new file mode 100644 index 0000000..1aa7780 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/changes.py @@ -0,0 +1,106 @@ +""" +LINBO Change Tracker — cursor-based delta detection via filesystem mtimes. + +Compares file modification times against a unix-timestamp cursor to +determine which hosts, start.confs, and configs have changed. +""" + +import logging +import time +from datetime import datetime, timezone + +from .hosts import LinboHostProvider, devices_csv_path, get_mtime +from .config import LinboConfigManager +from .grub import LinboGrubReader + +logger = logging.getLogger(__name__) + + +class LinboChangeTracker: + """Cursor-based change detection using filesystem mtimes.""" + + def __init__(self, school: str = "default-school"): + self.school = school + self.host_provider = LinboHostProvider(school) + self.config_manager = LinboConfigManager() + self.grub_reader = LinboGrubReader() + + def get_changes(self, since_cursor: str = "0") -> dict: + """Compare filesystem state against cursor, return delta. + + Args: + since_cursor: Unix timestamp string. '0' = full snapshot. + + Returns: + Dict with nextCursor, hostsChanged, startConfsChanged, + configsChanged, dhcpChanged, deletedHosts, deletedStartConfs, + allHostMacs, allStartConfIds, allConfigIds. + """ + try: + cursor_ts = int(since_cursor) if since_cursor else 0 + except ValueError: + cursor_ts = 0 + + cursor_dt = ( + datetime.fromtimestamp(cursor_ts, tz=timezone.utc) if cursor_ts > 0 + else None + ) + + # Parse all known entities + all_hosts, _ = self.host_provider.parse_devices_csv() + school_groups = {h["hostgroup"] for h in all_hosts} + all_host_macs = [h["mac"] for h in all_hosts] + all_startconf_ids = [ + id for id in self.config_manager.list_startconf_ids() + if id in school_groups + ] + all_config_ids = [ + id for id in self.grub_reader.list_grub_cfg_ids() + if id in school_groups + ] + + # Detect host changes via devices.csv mtime + devices_mtime = get_mtime(devices_csv_path(self.school)) + hosts_changed_macs: list[str] = [] + deleted_hosts: list[str] = [] + dhcp_changed = False + + devices_modified = ( + cursor_dt is None + or devices_mtime is None + or (devices_mtime > cursor_dt) + ) + + if devices_modified: + hosts_changed_macs = list(all_host_macs) + dhcp_changed = True + + # Check start.conf files + startconfs_changed: list[str] = [] + deleted_startconfs: list[str] = [] + for group in all_startconf_ids: + mtime = self.config_manager.get_startconf_mtime(group) + if cursor_dt is None or (mtime and mtime > cursor_dt): + startconfs_changed.append(group) + + # Check GRUB configs + configs_changed: list[str] = [] + for group in all_config_ids: + mtime = self.grub_reader.get_cfg_mtime(group) + if cursor_dt is None or (mtime and mtime > cursor_dt): + configs_changed.append(group) + + next_cursor = str(int(time.time())) + + return { + "nextCursor": next_cursor, + "hostsChanged": hosts_changed_macs, + "startConfsChanged": startconfs_changed, + "configsChanged": configs_changed, + "dhcpChanged": dhcp_changed, + "deletedHosts": deleted_hosts, + "deletedStartConfs": deleted_startconfs, + "allHostMacs": all_host_macs, + "allStartConfIds": all_startconf_ids, + "allConfigIds": all_config_ids, + } diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/dhcp.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/dhcp.py new file mode 100644 index 0000000..9811780 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/dhcp.py @@ -0,0 +1,158 @@ +""" +LINBO DHCP Exporter — generates and reads DHCP config data. + +Supports dnsmasq proxy-DHCP config generation from host data, +and ISC DHCP config reading from linuxmuster-generated files. +""" + +import hashlib +import logging +import re +from datetime import datetime, timezone +from pathlib import Path + +from .hosts import get_mtime + +logger = logging.getLogger(__name__) + +_TAG_RE = re.compile(r"[^a-zA-Z0-9_-]") + +DHCP_SUBNETS_PATH = Path("/etc/dhcp/subnets.conf") +DHCP_DEVICES_DIR = Path("/etc/dhcp/devices") + + +def _get_server_ip() -> str: + """Auto-detect server IP from setup.ini. + + Raises RuntimeError if setup.ini is missing or has no serverip. + """ + from linuxmusterTools.lmnfile import LMNFile + + with LMNFile("/var/lib/linuxmuster/setup.ini", "r") as setup: + data = setup.read() + ini = data.get("setup", {}) if isinstance(data, dict) else {} + ip = ini.get("serverip", "") + if ip: + return ip + raise RuntimeError( + "Cannot determine server IP: /var/lib/linuxmuster/setup.ini " + "missing or has no 'serverip' entry" + ) + + +class LinboDhcpExporter: + """Generate and read DHCP configurations.""" + + def generate_dnsmasq_proxy( + self, + hosts: list[dict], + server_ip: str | None = None, + ) -> str: + """Generate dnsmasq proxy-DHCP config from host list. + + Args: + hosts: List of host dicts with 'pxeEnabled', 'mac', 'hostgroup' + server_ip: TFTP/boot server IP (auto-detected from setup.ini if None) + + Returns: + The complete dnsmasq configuration as a string. + """ + if server_ip is None: + server_ip = _get_server_ip() + + # Derive network address from server IP (replace last octet with 0) + parts = server_ip.rsplit(".", 1) + if len(parts) != 2: + raise ValueError(f"Invalid server IP format: {server_ip}") + dhcp_range = f"{parts[0]}.0" + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + pxe_hosts = [h for h in hosts if h["pxeEnabled"]] + + lines = [ + "#", + "# LINBO - dnsmasq Configuration (proxy mode)", + f"# Generated: {ts}", + f"# Hosts: {len(pxe_hosts)}", + "#", + "", + "# Proxy DHCP mode - no IP assignment, PXE only", + "port=0", + f"dhcp-range={dhcp_range},proxy", + "log-dhcp", + "", + "interface=eth0", + "bind-interfaces", + "", + "# PXE boot architecture detection", + "dhcp-match=set:bios,option:client-arch,0", + "dhcp-match=set:efi32,option:client-arch,6", + "dhcp-match=set:efi64,option:client-arch,7", + "dhcp-match=set:efi64,option:client-arch,9", + "", + f"dhcp-boot=tag:bios,boot/grub/i386-pc/core.0,{server_ip}", + f"dhcp-boot=tag:efi32,boot/grub/i386-efi/core.efi,{server_ip}", + f"dhcp-boot=tag:efi64,boot/grub/x86_64-efi/core.efi,{server_ip}", + "", + ] + + if pxe_hosts: + config_groups: dict[str, list[dict]] = {} + for h in pxe_hosts: + config_groups.setdefault(h["hostgroup"], []).append(h) + + lines.append("# Host config assignments") + for h in pxe_hosts: + tag = _TAG_RE.sub("_", h["hostgroup"]) + lines.append(f"dhcp-host={h['mac']},set:{tag}") + lines.append("") + + lines.append("# Config name via NIS-Domain (Option 40)") + for config_name in config_groups: + if config_name: + tag = _TAG_RE.sub("_", config_name) + lines.append(f"dhcp-option=tag:{tag},40,{config_name}") + lines.append("") + + return "\n".join(lines) + + def get_isc_dhcp(self, school: str = "default-school") -> dict: + """Read ISC DHCP config files for a school. + + Returns dict with subnets, devices content, and mtimes. + subnets.conf is shared (not per-school). + devices/{school}.conf is per-school. + """ + devices_path = DHCP_DEVICES_DIR / f"{school}.conf" + + subnets = "" + if DHCP_SUBNETS_PATH.is_file(): + try: + subnets = DHCP_SUBNETS_PATH.read_text(encoding="utf-8") + except OSError as exc: + logger.warning("Failed to read subnets.conf: %s", exc) + + devices = "" + if devices_path.is_file(): + try: + devices = devices_path.read_text(encoding="utf-8") + except OSError as exc: + logger.warning("Failed to read %s: %s", devices_path, exc) + else: + logger.info("No DHCP devices config for school %s at %s", school, devices_path) + + subnets_mtime = get_mtime(DHCP_SUBNETS_PATH) + devices_mtime = get_mtime(devices_path) + + return { + "school": school, + "subnets": subnets, + "devices": devices, + "subnetsUpdatedAt": subnets_mtime.isoformat() if subnets_mtime else None, + "devicesUpdatedAt": devices_mtime.isoformat() if devices_mtime else None, + } + + @staticmethod + def content_etag(content: str) -> str: + """Generate ETag for content string.""" + return hashlib.md5(content.encode()).hexdigest() diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/drivers.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/drivers.py new file mode 100644 index 0000000..bbdaf86 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/drivers.py @@ -0,0 +1,209 @@ +""" +LINBO Driver Management — driver profile creation and match.conf handling. + +Manages driver profiles that match hardware (via DMI vendor/product) +to driver sets for injection into linbofs64. +""" + +import configparser +import logging +import os +import re +import shutil +import zipfile +from pathlib import Path + +logger = logging.getLogger(__name__) + +DRIVERS_BASE = os.environ.get("DRIVERS_BASE", "/var/lib/linbo/drivers") +MAX_ZIP_ENTRIES = 50_000 +MAX_ZIP_SIZE = 4 * 1024 * 1024 * 1024 # 4 GB + +_SAFE_NAME = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,99}$") +_SAFE_REL_PATH = re.compile(r"^[a-zA-Z0-9._/-]+$") + + +def sanitize_name(name: str) -> str: + """Validate and return a sanitized driver profile name. + + Raises: + ValueError: If name is invalid + """ + if not name or not isinstance(name, str): + raise ValueError("Name must not be empty") + name = name.strip() + if not _SAFE_NAME.match(name): + raise ValueError( + "Name must start with alphanumeric, contain only [a-zA-Z0-9._-], max 100 chars" + ) + return name + + +def sanitize_relative_path(rel_path: str) -> str: + """Validate a relative file path (no traversal). + + Raises: + ValueError: If path is unsafe + """ + if not rel_path or not isinstance(rel_path, str): + raise ValueError("Path must not be empty") + rel_path = rel_path.strip() + + if rel_path.startswith("/"): + raise ValueError("Absolute paths not allowed") + if "\\" in rel_path: + raise ValueError("Backslashes not allowed") + if "\0" in rel_path: + raise ValueError("NUL bytes not allowed") + for seg in rel_path.split("/"): + if seg == "..": + raise ValueError("Path traversal not allowed") + + return re.sub(r"/+", "/", rel_path).rstrip("/") + + +class LinboDriverManager: + """Manage LINBO driver profiles.""" + + def __init__(self, drivers_base: str | None = None): + self.base = Path(drivers_base or DRIVERS_BASE) + + def list_profiles(self) -> list[dict]: + """List all driver profiles with match.conf info. + + Returns: + List of {name, path, hasMatchConf, matchConf} dicts + """ + if not self.base.is_dir(): + return [] + + profiles = [] + for d in sorted(self.base.iterdir()): + if not d.is_dir() or d.name.startswith("."): + continue + match_conf = d / "match.conf" + profile = { + "name": d.name, + "path": str(d), + "hasMatchConf": match_conf.is_file(), + } + if match_conf.is_file(): + profile["matchConf"] = self._parse_match_conf(match_conf) + profiles.append(profile) + + return profiles + + def get_profile(self, name: str) -> dict | None: + """Get a single profile by name.""" + name = sanitize_name(name) + profile_dir = self.base / name + if not profile_dir.is_dir(): + return None + + match_conf = profile_dir / "match.conf" + files = self._list_files(profile_dir) + + return { + "name": name, + "path": str(profile_dir), + "hasMatchConf": match_conf.is_file(), + "matchConf": self._parse_match_conf(match_conf) if match_conf.is_file() else None, + "files": files, + "totalSize": sum(f["size"] for f in files), + } + + def create_profile(self, name: str, vendor: str = "", product: str = "") -> dict: + """Create a new driver profile with match.conf. + + Args: + name: Profile name + vendor: DMI sys_vendor string + product: DMI product_name string + + Returns: + Created profile dict + """ + name = sanitize_name(name) + profile_dir = self.base / name + profile_dir.mkdir(parents=True, exist_ok=True) + + match_conf = profile_dir / "match.conf" + match_conf.write_text( + f"[match]\n" + f"sys_vendor = {vendor}\n" + f"product_name = {product}\n", + encoding="utf-8", + ) + + return self.get_profile(name) + + def delete_profile(self, name: str) -> bool: + """Delete a driver profile entirely.""" + name = sanitize_name(name) + profile_dir = self.base / name + if not profile_dir.is_dir(): + return False + shutil.rmtree(str(profile_dir)) + return True + + def extract_archive(self, name: str, archive_path: str) -> dict: + """Extract a ZIP archive into a driver profile. + + Args: + name: Profile name + archive_path: Path to ZIP file + + Returns: + {extracted, totalSize} + + Raises: + ValueError: If archive exceeds limits + """ + name = sanitize_name(name) + profile_dir = self.base / name + profile_dir.mkdir(parents=True, exist_ok=True) + + with zipfile.ZipFile(archive_path, "r") as zf: + entries = zf.infolist() + if len(entries) > MAX_ZIP_ENTRIES: + raise ValueError(f"Too many entries: {len(entries)} (max {MAX_ZIP_ENTRIES})") + + total_size = sum(e.file_size for e in entries) + if total_size > MAX_ZIP_SIZE: + raise ValueError(f"Archive too large: {total_size} bytes (max {MAX_ZIP_SIZE})") + + # Validate paths (no traversal) + for entry in entries: + if entry.filename.startswith("/") or ".." in entry.filename: + raise ValueError(f"Unsafe path in archive: {entry.filename}") + + zf.extractall(str(profile_dir)) + + return { + "extracted": len(entries), + "totalSize": total_size, + } + + def _parse_match_conf(self, path: Path) -> dict: + """Parse match.conf INI file.""" + result = {} + try: + cp = configparser.ConfigParser() + cp.read(str(path)) + if cp.has_section("match"): + result = dict(cp["match"]) + except configparser.Error: + pass + return result + + def _list_files(self, directory: Path) -> list[str]: + """List files in a profile directory (excluding match.conf).""" + files = [] + for f in sorted(directory.rglob("*")): + if f.is_file() and f.name != "match.conf": + stat = f.stat() + files.append({ + "name": str(f.relative_to(directory)), + "size": stat.st_size, + }) + return files diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/firmware.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/firmware.py new file mode 100644 index 0000000..ddc345d --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/firmware.py @@ -0,0 +1,261 @@ +""" +LINBO Firmware Management — detect missing firmware and manage config. + +Combines dmesg parsing, filesystem scanning, and firmware config file +management for LINBO client firmware provisioning. +""" + +import logging +import os +import re +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +FIRMWARE_DIR = Path("/lib/firmware") +CONFIG_DIR = os.environ.get("CONFIG_DIR", "/etc/linuxmuster/linbo") +FIRMWARE_CONFIG = Path(CONFIG_DIR) / "firmware" +LINBO_LOG_DIR = Path(os.environ.get("LINBO_LOG_DIR", "/var/log/linuxmuster/linbo")) + +# dmesg patterns +_MISSING_PATTERNS = [ + re.compile(r"firmware:\s+failed to load\s+(\S+)", re.IGNORECASE), + re.compile(r"Direct firmware load for\s+(\S+)\s+failed", re.IGNORECASE), + re.compile(r"request_firmware\s+failed.*?:\s+(\S+)", re.IGNORECASE), +] +_LOADED_PATTERN = re.compile(r"loaded firmware\s+(?:version\s+)?(\S+)", re.IGNORECASE) +_DRIVER_PATTERN = re.compile(r"\]\s+(\S+)\s+\S+:\s+(?:firmware|Direct firmware)") +_SAFE_PATH = re.compile(r"^[a-zA-Z0-9._/-]+$") + + +def _clean_filename(raw: str) -> str: + """Clean firmware filename from dmesg output.""" + name = raw + if name.startswith("/lib/firmware/"): + name = name[len("/lib/firmware/"):] + name = re.sub(r"[,;:)]+$", "", name) + return name + + +def parse_dmesg_firmware(output: str) -> list[dict]: + """Parse dmesg output for firmware events. + + Args: + output: Raw dmesg output + + Returns: + List of {filename, driver, status} dicts + """ + if not output or not isinstance(output, str): + return [] + + events = [] + seen = set() + + for line in output.splitlines(): + if not line.strip(): + continue + + matched = False + for pattern in _MISSING_PATTERNS: + m = pattern.search(line) + if m: + filename = _clean_filename(m.group(1)) + driver_m = _DRIVER_PATTERN.search(line) + driver = driver_m.group(1) if driver_m else None + key = f"missing:{filename}" + if key not in seen: + seen.add(key) + events.append({"filename": filename, "driver": driver, "status": "missing"}) + matched = True + break + + if not matched: + m = _LOADED_PATTERN.search(line) + if m: + filename = _clean_filename(m.group(1)) + driver_m = _DRIVER_PATTERN.search(line) + driver = driver_m.group(1) if driver_m else None + key = f"loaded:{filename}" + if key not in seen: + seen.add(key) + events.append({"filename": filename, "driver": driver, "status": "loaded"}) + + return events + + +def extract_missing_firmware_paths(output: str) -> list[str]: + """Extract sorted, deduplicated list of missing firmware paths.""" + events = parse_dmesg_firmware(output) + return sorted({e["filename"] for e in events if e["status"] == "missing"}) + + +_FW_FAILED_PATTERN = re.compile(r"for\s+(\S+)\s+failed", re.IGNORECASE) + + +def get_firmware_health(log_dir: str | Path | None = None) -> dict: + """Parse LINBO client boot logs for firmware errors. + + Reads *_linbo.log files from the log directory and extracts missing + firmware names using the same regex as update-linbofs parse_firmware_logs. + Falls back to compressed .1.gz logs if the current log is empty. + + Args: + log_dir: Override log directory (default: /var/log/linuxmuster/linbo) + + Returns: + {clients: [{hostname, status, missingFirmware}], summary: {total, ok, missing, noLog}} + """ + base = Path(log_dir) if log_dir else LINBO_LOG_DIR + if not base.is_dir(): + return {"clients": [], "summary": {"total": 0, "ok": 0, "missing": 0, "noLog": 0}} + + results = [] + try: + entries = os.listdir(base) + except OSError: + return {"clients": [], "summary": {"total": 0, "ok": 0, "missing": 0, "noLog": 0}} + + log_files = [f for f in entries if f.endswith("_linbo.log") and not f.startswith("UNKNOWN")] + + for log_file in sorted(log_files): + hostname = log_file.removesuffix("_linbo.log") + log_path = base / log_file + compressed_path = base / (log_file + ".1.gz") + + log_content = "" + try: + log_content = log_path.read_text(encoding="utf-8", errors="replace") + except OSError: + pass + + # Fallback to compressed previous log + if not log_content.strip(): + try: + import gzip + with gzip.open(compressed_path, "rt", encoding="utf-8", errors="replace") as gz: + log_content = gz.read() + except (OSError, ImportError): + pass + + if not log_content.strip(): + results.append({"hostname": hostname, "status": "no-log", "missingFirmware": []}) + continue + + # Parse firmware errors + missing_firmware = [] + seen = set() + for line in log_content.splitlines(): + lower = line.lower() + if "firmware" in lower and ("failed" in lower or "error" in lower): + m = _FW_FAILED_PATTERN.search(line) + if m and m.group(1) not in seen: + seen.add(m.group(1)) + missing_firmware.append(m.group(1)) + + results.append({ + "hostname": hostname, + "status": "missing" if missing_firmware else "ok", + "missingFirmware": missing_firmware, + }) + + ok = sum(1 for r in results if r["status"] == "ok") + missing = sum(1 for r in results if r["status"] == "missing") + no_log = sum(1 for r in results if r["status"] == "no-log") + + return { + "clients": results, + "summary": {"total": len(results), "ok": ok, "missing": missing, "noLog": no_log}, + } + + +class LinboFirmwareManager: + """Manage LINBO firmware configuration.""" + + def __init__(self, config_path: str | None = None): + self.config_path = Path(config_path) if config_path else FIRMWARE_CONFIG + + def read_config(self) -> list[str]: + """Read firmware config (list of firmware paths, one per line).""" + try: + text = self.config_path.read_text(encoding="utf-8") + return [l.strip() for l in text.splitlines() + if l.strip() and not l.strip().startswith("#")] + except FileNotFoundError: + return [] + + def write_config(self, entries: list[str]) -> None: + """Write firmware config file. + + Args: + entries: List of firmware paths + + Raises: + ValueError: If any entry contains unsafe characters + """ + for entry in entries: + if not _SAFE_PATH.match(entry): + raise ValueError(f"Unsafe firmware path: {entry}") + if ".." in entry or "\0" in entry: + raise ValueError(f"Path traversal not allowed: {entry}") + + header = f"# LINBO firmware config — generated {datetime.now(timezone.utc).isoformat()}\n" + self.config_path.write_text( + header + "\n".join(entries) + "\n", + encoding="utf-8", + ) + + def add_entry(self, entry: str) -> list[str]: + """Add a firmware entry. Returns updated list.""" + entries = self.read_config() + if entry not in entries: + entries.append(entry) + self.write_config(entries) + return entries + + def remove_entry(self, entry: str) -> list[str]: + """Remove a firmware entry. Returns updated list.""" + entries = self.read_config() + entries = [e for e in entries if e != entry] + self.write_config(entries) + return entries + + def validate_entry(self, entry: str) -> dict: + """Check if a firmware file exists on disk. + + Returns: + {path, exists, zstExists} + """ + fw_path = FIRMWARE_DIR / entry + zst_path = FIRMWARE_DIR / f"{entry}.zst" + return { + "path": entry, + "exists": fw_path.is_file(), + "zstExists": zst_path.is_file(), + } + + def scan_available(self, base_dir: str | None = None, max_depth: int = 3) -> list[str]: + """Scan /lib/firmware/ for available firmware files. + + Args: + base_dir: Override firmware directory + max_depth: Maximum directory depth + + Returns: + Sorted list of relative firmware paths + """ + root = Path(base_dir) if base_dir else FIRMWARE_DIR + if not root.is_dir(): + return [] + + results = [] + for dirpath, _dirnames, filenames in os.walk(root): + depth = len(Path(dirpath).relative_to(root).parts) + if depth > max_depth: + continue + for f in filenames: + rel = str(Path(dirpath, f).relative_to(root)) + results.append(rel) + + return sorted(results) diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/grub.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/grub.py new file mode 100644 index 0000000..f7475b9 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/grub.py @@ -0,0 +1,108 @@ +""" +LINBO GRUB Config Reader — reads existing GRUB config files from disk. + +Does NOT generate GRUB configs — that is done by the linbo-api locally. +This module only reads what linuxmuster-import-devices or linbo-api has written. +""" + +import logging +from datetime import datetime, timezone +from pathlib import Path + +from .hosts import get_mtime + +logger = logging.getLogger(__name__) + +GRUB_DIR_DEFAULT = '/srv/linbo/boot/grub' + + +class LinboGrubReader: + """Read-only access to GRUB config files.""" + + def __init__(self, grub_dir: str = GRUB_DIR_DEFAULT): + self.grub_dir = Path(grub_dir) + + def list_grub_cfg_ids(self) -> list[str]: + """Return sorted list of GRUB config group IDs (stems of *.cfg files).""" + ids = [] + for p in sorted(self.grub_dir.glob("*.cfg")): + group = p.stem + if group: + ids.append(group) + return ids + + def get_configs_by_ids(self, ids: list[str]) -> list[dict]: + """Return GRUB config content and mtime for given group IDs. + + Returns list of {id, content, updatedAt} dicts. + Skips IDs whose .cfg file does not exist. + """ + results = [] + for group_id in ids: + cfg_path = self.grub_dir / f"{group_id}.cfg" + if not cfg_path.is_file(): + continue + try: + content = cfg_path.read_text(encoding="utf-8") + except OSError: + continue + + mtime = get_mtime(cfg_path) + results.append({ + "id": group_id, + "content": content, + "updatedAt": mtime.isoformat() if mtime else None, + }) + + return results + + def get_all_grub_configs(self, school_groups: set[str] | None = None) -> list[dict]: + """Return all GRUB configs, optionally filtered by school groups. + + Always includes main grub.cfg. Group configs are filtered by + school_groups if provided. + + Returns list of {id, filename, content, updatedAt} dicts. + """ + configs = [] + + # Always include main grub.cfg + main_cfg = self.grub_dir / "grub.cfg" + if main_cfg.is_file(): + try: + content = main_cfg.read_text(encoding="utf-8") + mtime = get_mtime(main_cfg) + configs.append({ + "id": "grub", + "filename": "grub.cfg", + "content": content, + "updatedAt": mtime.isoformat() if mtime else None, + }) + except OSError as exc: + logger.warning("Failed to read grub.cfg: %s", exc) + + # Group configs + for p in sorted(self.grub_dir.glob("*.cfg")): + if p.name == "grub.cfg": + continue + group = p.stem + if school_groups is not None and group not in school_groups: + continue + try: + content = p.read_text(encoding="utf-8") + mtime = get_mtime(p) + configs.append({ + "id": group, + "filename": f"{group}.cfg", + "content": content, + "updatedAt": mtime.isoformat() if mtime else None, + }) + except OSError: + continue + + return configs + + def get_cfg_mtime(self, group_id: str) -> datetime | None: + """Return mtime for a specific GRUB config file.""" + cfg_path = self.grub_dir / f"{group_id}.cfg" + return get_mtime(cfg_path) diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/grub_generator.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/grub_generator.py new file mode 100644 index 0000000..e469bc6 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/grub_generator.py @@ -0,0 +1,167 @@ +""" +LINBO GRUB Config Generator — generate GRUB boot configs from data objects. + +Pure template-based generation: +- Main grub.cfg (PXE boot with MAC→group mapping) +- Per-config {group}.cfg (OS menu entries) +- Hostcfg symlinks (hostname.cfg + 01-{mac}.cfg) + +Device path translation, OS type detection, template variable substitution. +""" + +import logging +import os +import re +from pathlib import Path + +logger = logging.getLogger(__name__) + +LINBO_DIR = os.environ.get("LINBO_DIR", "/srv/linbo") +GRUB_DIR = Path(LINBO_DIR) / "boot" / "grub" +HOSTCFG_DIR = GRUB_DIR / "hostcfg" + + +# ── Pure Helper Functions ──────────────────────────────────────────── + + +def get_grub_part(device: str | None) -> str: + """Convert Linux device path to GRUB partition format. + + /dev/sda1 → (hd0,1), /dev/nvme0n1p2 → (hd0,2), /dev/mmcblk0p1 → (hd0,1) + """ + if not device: + return "(hd0,1)" + dev = device.replace("/dev/", "") + + m = re.match(r"^nvme(\d+)n\d+p(\d+)$", dev) + if m: + return f"(hd{m.group(1)},{m.group(2)})" + + m = re.match(r"^mmcblk(\d+)p(\d+)$", dev) + if m: + return f"(hd{m.group(1)},{m.group(2)})" + + m = re.match(r"^([shv]d)([a-z])(\d+)$", dev) + if m: + disk_num = ord(m.group(2)) - ord("a") + return f"(hd{disk_num},{m.group(3)})" + + return "(hd0,1)" + + +def get_grub_ostype(osname: str | None) -> str: + """Get GRUB OS type from OS name for menu icon classes.""" + if not osname: + return "unknown" + name = osname.lower() + + os_map = [ + ("windows 11", "win11"), ("win11", "win11"), + ("windows 10", "win10"), ("win10", "win10"), + ("windows 8", "win8"), ("win8", "win8"), + ("windows 7", "win7"), ("win7", "win7"), + ("windows", "windows"), + ("ubuntu", "ubuntu"), ("debian", "debian"), + ("mint", "linuxmint"), ("fedora", "fedora"), + ("opensuse", "opensuse"), ("suse", "opensuse"), + ("arch", "arch"), ("manjaro", "manjaro"), + ("centos", "centos"), + ("rhel", "rhel"), ("red hat", "rhel"), + ("linux", "linux"), + ] + for pattern, ostype in os_map: + if pattern in name: + return ostype + return "unknown" + + +def find_cache_partition(partitions: list[dict] | None) -> dict | None: + """Find cache partition from partition list.""" + if not partitions or not isinstance(partitions, list): + return None + + by_label = next((p for p in partitions + if p.get("label", "").lower() == "cache"), None) + if by_label: + return by_label + + return next((p for p in partitions + if p.get("fsType") in ("ext4", "btrfs") + and p.get("partitionId") not in ("ef00", "0c01") + and "windows" not in (p.get("label") or "").lower() + and "efi" not in (p.get("label") or "").lower()), None) + + +def get_os_partition_index(partitions: list[dict] | None, root_device: str | None) -> int: + """Find 1-based partition index for an OS's root device.""" + if not partitions or not root_device: + return 1 + for i, p in enumerate(partitions): + if p.get("device") == root_device: + return i + 1 + return 1 + + +def hex_to_grub_rgb(hex_color: str | None) -> str: + """Convert hex color (#RRGGBB) to GRUB RGB format (R,G,B).""" + if not hex_color or not re.match(r"^#[0-9a-fA-F]{6}$", hex_color): + return "42,68,87" + return ",".join(str(int(hex_color[i:i+2], 16)) for i in (1, 3, 5)) + + +def get_os_label(partitions: list[dict] | None, root_device: str | None) -> str: + """Get partition label for a root device.""" + if not partitions or not root_device: + return "" + p = next((p for p in partitions if p.get("device") == root_device), None) + return (p or {}).get("label", "") + + +def apply_template(template: str, replacements: dict) -> str: + """Replace @@key@@ placeholders in template.""" + result = template + for key, value in replacements.items(): + result = result.replace(f"@@{key}@@", str(value) if value is not None else "") + return result + + +def mac_to_grub_filename(mac: str) -> str: + """Convert MAC to GRUB hostcfg filename: AA:BB:CC:DD:EE:FF → 01-aa-bb-cc-dd-ee-ff""" + return "01-" + mac.lower().replace(":", "-") + + +def get_linbo_setting(settings: dict | None, key: str): + """Case-insensitive lookup in linbo settings dict.""" + if not settings: + return None + if key in settings: + return settings[key] + lower = key.lower() + if lower in settings: + return settings[lower] + for k, v in settings.items(): + if k.lower() == lower: + return v + return None + + +def read_kernel_options(config_id: str, linbo_dir: str | None = None) -> str: + """Read KernelOptions from start.conf file.""" + base = Path(linbo_dir or LINBO_DIR) + try: + content = (base / f"start.conf.{config_id}").read_text(encoding="utf-8") + m = re.search(r"^\s*KernelOptions\s*=\s*(.+)$", content, re.MULTILINE | re.IGNORECASE) + if m: + return m.group(1).strip() + except OSError: + pass + return "quiet splash" + + +def build_kernel_options(raw_options: str, server: str, config_id: str) -> str: + """Build clean kernel options string with server and group injected.""" + kopts = re.sub(r"\bserver=\S+", "", raw_options) + kopts = re.sub(r"\bgroup=\S+", "", kopts) + kopts = re.sub(r"\bhostgroup=\S+", "", kopts) + kopts = re.sub(r"\s+", " ", kopts).strip() + return f"{kopts} server={server} group={config_id} hostgroup={config_id}".strip() diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/hooks.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/hooks.py new file mode 100644 index 0000000..8a2a750 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/hooks.py @@ -0,0 +1,66 @@ +""" +LINBO Hook Management — discover and inspect update-linbofs hooks. + +Scans pre/post hook directories and reads build manifest for execution history. +""" + +import json +import logging +import os +from pathlib import Path + +logger = logging.getLogger(__name__) + +DEFAULT_HOOKS_DIR = "/etc/linuxmuster/linbo/hooks" +MANIFEST_PATH = "/srv/linbo/linbofs-build-manifest.json" + + +class LinboHookManager: + """Read-only access to update-linbofs hooks.""" + + def __init__(self, hooks_dir: str = DEFAULT_HOOKS_DIR): + self.hooks_dir = Path(hooks_dir) + self.pre_dir = self.hooks_dir / "update-linbofs.pre.d" + self.post_dir = self.hooks_dir / "update-linbofs.post.d" + + def get_hooks(self) -> list[dict]: + """List all pre/post hooks with metadata. + + Returns: + List of {name, type, path, executable, size} dicts + """ + hooks = [] + manifest = self._read_manifest() + + for hook_type, hook_dir in [("pre", self.pre_dir), ("post", self.post_dir)]: + if not hook_dir.is_dir(): + continue + for f in sorted(hook_dir.iterdir()): + if not f.is_file(): + continue + stat = f.stat() + hook = { + "name": f.name, + "type": hook_type, + "path": str(f), + "executable": os.access(f, os.X_OK), + "size": stat.st_size, + } + # Merge manifest data if available + hook_key = f"{hook_type}/{f.name}" + if hook_key in manifest: + hook["lastExitCode"] = manifest[hook_key].get("exitCode") + hook["lastRunAt"] = manifest[hook_key].get("runAt") + + hooks.append(hook) + + return hooks + + def _read_manifest(self) -> dict: + """Read build manifest for hook execution history.""" + try: + text = Path(MANIFEST_PATH).read_text(encoding="utf-8") + data = json.loads(text) + return data.get("hooks", {}) + except (OSError, json.JSONDecodeError): + return {} diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/host_status.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/host_status.py new file mode 100644 index 0000000..b1c354a --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/host_status.py @@ -0,0 +1,93 @@ +""" +LINBO Host Status Scanner — TCP port probing for online detection. + +Probes port 2222 (Dropbear SSH on LINBO clients) to determine +if a host is online. Supports concurrent scanning. +""" + +import asyncio +import logging +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + +DEFAULT_PORT = 2222 +DEFAULT_TIMEOUT = 2.0 +DEFAULT_CONCURRENCY = 20 + + +async def probe_host(ip: str, port: int = DEFAULT_PORT, timeout: float = DEFAULT_TIMEOUT) -> bool: + """Test if a host is reachable via TCP connect. + + Args: + ip: Target IP address + port: TCP port to probe (default: 2222) + timeout: Connection timeout in seconds (default: 2.0) + + Returns: + True if host is reachable, False otherwise + """ + try: + _, writer = await asyncio.wait_for( + asyncio.open_connection(ip, port), + timeout=timeout, + ) + writer.close() + await writer.wait_closed() + return True + except (asyncio.TimeoutError, OSError): + return False + + +async def scan_hosts( + hosts: list[dict], + port: int = DEFAULT_PORT, + timeout: float = DEFAULT_TIMEOUT, + concurrency: int = DEFAULT_CONCURRENCY, +) -> list[dict]: + """Scan multiple hosts concurrently for online status. + + Args: + hosts: List of host dicts with at least 'ip' and 'mac' keys + port: TCP port to probe + timeout: Per-host connection timeout + concurrency: Max concurrent probes + + Returns: + List of {mac, ip, hostname, online, lastSeen} dicts + """ + semaphore = asyncio.Semaphore(concurrency) + now = datetime.now(timezone.utc).isoformat() + + async def _probe(host: dict) -> dict: + async with semaphore: + ip = host.get("ip") + if not ip: + return { + "mac": host.get("mac"), + "ip": None, + "hostname": host.get("hostname"), + "online": False, + "lastSeen": None, + } + online = await probe_host(ip, port=port, timeout=timeout) + return { + "mac": host.get("mac"), + "ip": ip, + "hostname": host.get("hostname"), + "online": online, + "lastSeen": now if online else None, + } + + tasks = [_probe(h) for h in hosts] + return await asyncio.gather(*tasks) + + +def scan_hosts_sync( + hosts: list[dict], + port: int = DEFAULT_PORT, + timeout: float = DEFAULT_TIMEOUT, + concurrency: int = DEFAULT_CONCURRENCY, +) -> list[dict]: + """Synchronous wrapper for scan_hosts (for use outside async context).""" + return asyncio.run(scan_hosts(hosts, port=port, timeout=timeout, concurrency=concurrency)) diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/hosts.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/hosts.py new file mode 100644 index 0000000..6c5dce5 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/hosts.py @@ -0,0 +1,117 @@ +""" +LINBO Host Provider — reads host data from devices.csv. + +Provides school-aware access to host records with MAC normalization, +IP validation, and batch lookup functionality. +""" + +import logging +from datetime import datetime, timezone +from pathlib import Path + +from ._validation import check_mac, check_ip, normalize_mac + +logger = logging.getLogger(__name__) + + +def validate_school(school: str) -> bool: + """Validate school name to prevent path traversal.""" + from ._validation import check_linbo_conf_name + return check_linbo_conf_name(school) + + +def get_mtime(path: Path) -> datetime | None: + """Return file mtime as UTC datetime, or None if missing.""" + try: + return datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc) + except OSError: + return None + + +def devices_csv_path(school: str = "default-school") -> Path: + """Resolve devices.csv path for a given school (LMN convention).""" + if school != "default-school": + prefix = f"{school}." + else: + prefix = "" + return Path(f"/etc/linuxmuster/sophomorix/{school}/{prefix}devices.csv") + + +class LinboHostProvider: + """Read-only host provider from devices.csv.""" + + def __init__(self, school: str = "default-school"): + self.school = school + + def parse_devices_csv(self) -> tuple[list[dict], datetime | None]: + """Parse devices.csv into a list of host dicts. + + Returns (hosts, file_mtime). Skips comment lines and invalid MACs. + CSV columns (semicolon-separated): + 0=room, 1=hostname, 2=hostgroup, 3=mac, 4=ip, 10=pxeFlag + + Raises FileNotFoundError if devices.csv does not exist. + """ + csv_path = devices_csv_path(self.school) + text = csv_path.read_text(encoding="utf-8") + mtime = get_mtime(csv_path) + hosts = [] + + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + + fields = line.split(";") + if len(fields) < 5: + continue + + while len(fields) < 15: + fields.append("") + + mac = normalize_mac(fields[3]) + if mac is None: + continue + + raw_ip = fields[4].strip() + ip = raw_ip if raw_ip and check_ip(raw_ip) else None + config = fields[2].strip() + + try: + pxe_flag = int(fields[10].strip()) if fields[10].strip() else 1 + except ValueError: + pxe_flag = 1 + + pxe_enabled = pxe_flag > 0 and config.lower() != "nopxe" + + hosts.append({ + "mac": mac, + "hostname": fields[1].strip(), + "ip": ip, + "room": fields[0].strip(), + "school": self.school, + "hostgroup": config, + "pxeEnabled": pxe_enabled, + "pxeFlag": pxe_flag, + "dhcpOptions": "", + "startConfId": config, + "updatedAt": mtime.isoformat() if mtime else None, + }) + + return hosts, mtime + + def get_hosts_by_macs(self, macs: list[str]) -> list[dict]: + """Return host records matching the given MAC addresses.""" + hosts, _ = self.parse_devices_csv() + macs_upper = {m.upper().replace("-", ":") for m in macs} + return [h for h in hosts if h["mac"] in macs_upper] + + def get_all_host_macs(self) -> list[str]: + """Return all known MAC addresses.""" + hosts, _ = self.parse_devices_csv() + return [h["mac"] for h in hosts] + + def get_school_groups(self) -> set[str]: + """Return set of unique hostgroup names for this school.""" + hosts, _ = self.parse_devices_csv() + return {h["hostgroup"] for h in hosts} diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/image_sync.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/image_sync.py new file mode 100644 index 0000000..ab871b7 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/image_sync.py @@ -0,0 +1,375 @@ +""" +LINBO Image Sync — download QCOW2 images from remote server. + +Supports HTTP Range requests for resume capability, MD5 verification, +and atomic directory swap on completion. +""" + +import hashlib +import logging +import os +import shutil +from datetime import datetime, timezone +from pathlib import Path +from urllib.error import URLError +from urllib.request import Request, urlopen + +logger = logging.getLogger(__name__) + +IMAGES_DIR = Path(os.environ.get("LINBO_DIR", "/srv/linbo")) / "images" +CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB + + +def _file_md5(path: Path) -> str: + """Return the MD5 of a file without loading it fully into memory.""" + md5_hash = hashlib.md5() + with path.open("rb") as handle: + while True: + chunk = handle.read(CHUNK_SIZE) + if not chunk: + break + md5_hash.update(chunk) + return md5_hash.hexdigest() + + +class LinboImageSync: + """Download and verify LINBO images from a remote server.""" + + def __init__(self, images_dir: str | None = None): + self.images_dir = Path(images_dir) if images_dir else IMAGES_DIR + + def compare_manifests(self, local_images: list[dict], remote_images: list[dict]) -> dict: + """Compare local and remote image lists. + + Returns: + {toDownload, toDelete, upToDate} — lists of image names + """ + local_map = {img["name"]: img for img in local_images} + remote_map = {img["name"]: img for img in remote_images} + + to_download = [] + up_to_date = [] + + for name, remote in remote_map.items(): + local = local_map.get(name) + if not local: + to_download.append(name) + elif local.get("md5") != remote.get("md5"): + to_download.append(name) + else: + up_to_date.append(name) + + to_delete = [name for name in local_map if name not in remote_map] + + return { + "toDownload": to_download, + "toDelete": to_delete, + "upToDate": up_to_date, + } + + @staticmethod + def _validate_image_name(name: str) -> str: + """Validate image name for path safety.""" + if not name or not isinstance(name, str): + raise ValueError("Image name must not be empty") + if "/" in name or "\\" in name or ".." in name or "\0" in name: + raise ValueError(f"Unsafe image name: {name}") + from ._validation import check_linbo_image_name + if not check_linbo_image_name(name): + raise ValueError(f"Invalid image name characters: {name}") + return name + + def download_image( + self, + url: str, + image_name: str, + expected_md5: str | None = None, + headers: dict | None = None, + on_progress=None, + ) -> dict: + """Download an image file with resume support. + + Args: + url: Full URL to the image file + image_name: Target image name (e.g. "ubuntu22.qcow2") + expected_md5: Expected MD5 for verification + headers: Additional HTTP headers (e.g. auth) + on_progress: Callback(bytes_received, total_bytes) + + Returns: + {success, path, size, md5, duration} + """ + image_name = self._validate_image_name(image_name) + base = image_name.rsplit(".", 1)[0] if "." in image_name else image_name + target_dir = self.images_dir / base + incoming_dir = self.images_dir / ".incoming" / base + incoming_dir.mkdir(parents=True, exist_ok=True) + target_file = incoming_dir / image_name + + start = datetime.now(timezone.utc) + + # Resume support + resume_offset = 0 + if target_file.is_file(): + resume_offset = target_file.stat().st_size + + req_headers = dict(headers or {}) + if resume_offset > 0: + req_headers["Range"] = f"bytes={resume_offset}-" + + try: + req = Request(url, headers=req_headers) + with urlopen(req, timeout=30) as resp: + total = int(resp.headers.get("Content-Length", 0)) + resume_offset + mode = "ab" if resume_offset > 0 else "wb" + + received = resume_offset + + with open(target_file, mode) as f: + while True: + chunk = resp.read(CHUNK_SIZE) + if not chunk: + break + f.write(chunk) + received += len(chunk) + if on_progress: + on_progress(received, total) + + duration = (datetime.now(timezone.utc) - start).total_seconds() + actual_md5 = _file_md5(target_file) + + # Verify MD5 + if expected_md5 and actual_md5 != expected_md5: + logger.error("MD5 mismatch for %s: expected %s, got %s", + image_name, expected_md5, actual_md5) + return { + "success": False, + "error": f"MD5 mismatch: expected {expected_md5}, got {actual_md5}", + "duration": duration, + } + + # Backup existing image before overwriting + target_dir.mkdir(parents=True, exist_ok=True) + final_path = target_dir / image_name + if final_path.is_file(): + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M") + backup_dir = target_dir / "backup" / timestamp + backup_dir.mkdir(parents=True, exist_ok=True) + for existing in target_dir.iterdir(): + if existing.is_file(): + shutil.copy2(str(existing), str(backup_dir / existing.name)) + logger.info("Backed up existing image to %s", backup_dir) + + # Move new image into place + shutil.move(str(target_file), str(final_path)) + + # Cleanup incoming dir + try: + incoming_dir.rmdir() + except OSError: + pass + + return { + "success": True, + "path": str(final_path), + "size": received, + "md5": actual_md5, + "duration": duration, + } + + except (URLError, OSError) as e: + duration = (datetime.now(timezone.utc) - start).total_seconds() + return { + "success": False, + "error": str(e), + "duration": duration, + } + + def delete_image(self, image_name: str) -> bool: + """Delete an image and its directory.""" + image_name = self._validate_image_name(image_name) + base = image_name.rsplit(".", 1)[0] if "." in image_name else image_name + target_dir = self.images_dir / base + if target_dir.is_dir(): + shutil.rmtree(str(target_dir)) + return True + return False + + +# ============================================================================= +# Image Serving (server-side: serve images TO caching servers) +# ============================================================================= + + +IMAGE_EXTS = {".qcow2", ".qdiff", ".cloop"} +INCOMING_DIR_NAME = ".incoming" + + +def validate_image_path(name: str) -> None: + """Validate an image or filename for path safety. + + Raises ValueError if unsafe. + """ + if not name or not isinstance(name, str): + raise ValueError("Name must not be empty") + if "/" in name or "\\" in name or ".." in name or "\0" in name: + raise ValueError(f"Unsafe path component: {name}") + + +def resolve_image_file(images_dir: Path, image_name: str, filename: str) -> Path: + """Resolve and validate an image file path. + + Returns the absolute Path if valid and file exists. + Raises ValueError for invalid paths, FileNotFoundError if missing. + """ + validate_image_path(image_name) + validate_image_path(filename) + + file_path = (images_dir / image_name / filename).resolve() + if not file_path.is_relative_to(images_dir.resolve()): + raise ValueError("Path escapes images directory") + if not file_path.is_file(): + raise FileNotFoundError(f"File not found: {filename}") + return file_path + + +def get_image_file_info(file_path: Path) -> dict: + """Get metadata for an image file. + + Returns {size, mtime_ts, etag, last_modified}. + """ + stat = file_path.stat() + etag = hashlib.md5( + f"{file_path}:{stat.st_mtime}:{stat.st_size}".encode() + ).hexdigest() + mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) + return { + "size": stat.st_size, + "mtime_ts": stat.st_mtime, + "etag": etag, + "last_modified": mtime.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + + +def receive_upload_chunk( + images_dir: Path, image_name: str, filename: str, + data: bytes, offset: int | None = None, +) -> dict: + """Write a chunk of upload data to the staging directory. + + Args: + images_dir: Base images directory + image_name: Target image name + filename: Target filename + data: Chunk bytes + offset: Byte offset for ranged writes (None = overwrite) + + Returns {received, offset}. + """ + validate_image_path(image_name) + validate_image_path(filename) + + staging_dir = images_dir / INCOMING_DIR_NAME / image_name + staging_dir.mkdir(parents=True, exist_ok=True) + file_path = staging_dir / filename + + if offset is not None and offset > 0: + if not file_path.exists(): + raise ValueError("Cannot resume upload without an existing staged file") + current_size = file_path.stat().st_size + if current_size != offset: + raise ValueError(f"Offset mismatch: expected {current_size}, got {offset}") + with open(file_path, "r+b") as f: + f.seek(offset) + f.write(data) + else: + file_path.write_bytes(data) + + return {"received": len(data), "offset": (offset or 0) + len(data)} + + +def get_upload_status(images_dir: Path, image_name: str, filename: str) -> dict: + """Check how many bytes have been received for a staged upload. + + Returns {bytesReceived, complete}. + """ + validate_image_path(image_name) + validate_image_path(filename) + + file_path = images_dir / INCOMING_DIR_NAME / image_name / filename + if not file_path.is_file(): + return {"bytesReceived": 0, "complete": False} + return {"bytesReceived": file_path.stat().st_size, "complete": False} + + +def finalize_upload(images_dir: Path, image_name: str) -> dict: + """Move staged files to final directory with backup. + + Backs up existing image files to a timestamped subdirectory + before replacing them. + + Returns {finalized, files, backup}. + Raises FileNotFoundError if no staged files exist. + """ + validate_image_path(image_name) + + staging_dir = images_dir / INCOMING_DIR_NAME / image_name + if not staging_dir.is_dir(): + raise FileNotFoundError("No staged files found") + + target_dir = images_dir / image_name + target_dir.mkdir(parents=True, exist_ok=True) + + # Backup existing image files before overwriting + backup_dir = None + existing_images = [ + f for f in target_dir.iterdir() + if f.is_file() and f.suffix in IMAGE_EXTS + ] + if existing_images: + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + backup_dir = target_dir / "backup" / timestamp + backup_dir.mkdir(parents=True, exist_ok=True) + + for f in target_dir.iterdir(): + if f.is_file(): + try: + shutil.copy2(str(f), str(backup_dir / f.name)) + except OSError as e: + logger.warning("Backup failed for %s: %s", f.name, e) + + logger.info("Backed up existing image to %s", backup_dir) + + # Move staged files to target + moved = [] + for f in staging_dir.iterdir(): + if f.is_file(): + target = target_dir / f.name + shutil.move(str(f), str(target)) + moved.append(f.name) + + try: + staging_dir.rmdir() + except OSError: + pass + + logger.info("Image upload finalized: %s (%d files)", image_name, len(moved)) + return { + "finalized": True, + "files": moved, + "backup": str(backup_dir) if backup_dir else None, + } + + +def cancel_upload(images_dir: Path, image_name: str) -> dict: + """Clean up staged upload files. + + Returns {cleaned, detail?}. + """ + validate_image_path(image_name) + + staging_dir = images_dir / INCOMING_DIR_NAME / image_name + if staging_dir.is_dir(): + shutil.rmtree(str(staging_dir)) + return {"cleaned": True} + return {"cleaned": False, "detail": "No staging directory found"} diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/kernel.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/kernel.py new file mode 100644 index 0000000..71295bf --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/kernel.py @@ -0,0 +1,269 @@ +""" +LINBO Kernel Management — variant switching and rebuild control. + +Supports stable, longterm, and legacy kernel variants. +Persists state via JSON file, uses file-based locking for rebuilds. +""" + +import json +import logging +import os +import shutil +import subprocess +import tempfile +from pathlib import Path + +logger = logging.getLogger(__name__) + +CONFIG_DIR = os.environ.get("CONFIG_DIR", "/etc/linuxmuster/linbo") +LINBO_DIR = os.environ.get("LINBO_DIR", "/srv/linbo") + +VALID_VARIANTS = ["stable", "longterm", "legacy"] +CUSTOM_KERNEL_FILE = Path(CONFIG_DIR) / "custom_kernel" +KERNEL_STATE_FILE = Path(CONFIG_DIR) / "kernel_state.json" +REBUILD_LOCK = Path(CONFIG_DIR) / ".rebuild.lock" + +DEFAULT_STATE = { + "lastSwitchAt": None, + "lastError": None, + "lastRequestedVariant": None, + "lastSuccessfulVariant": None, + "rebuildStatus": "completed", +} + + +class LinboKernelManager: + """Manage kernel variant switching and rebuild state.""" + + def __init__(self): + self._rebuild_active = False + + def read_state(self) -> dict: + """Read kernel state from JSON file. + + Detects interrupted rebuilds (crash recovery). + """ + try: + raw = KERNEL_STATE_FILE.read_text(encoding="utf-8") + state = {**DEFAULT_STATE, **json.loads(raw)} + + if state["rebuildStatus"] == "running" and not self._rebuild_active: + state["rebuildStatus"] = "failed" + state["lastError"] = "Rebuild interrupted (process restart)" + self.write_state(state) + + return state + except FileNotFoundError: + return {**DEFAULT_STATE} + except json.JSONDecodeError: + logger.error("Corrupt kernel_state.json, resetting") + return {**DEFAULT_STATE} + + def write_state(self, updates: dict) -> dict: + """Write kernel state atomically (temp + rename).""" + try: + raw = KERNEL_STATE_FILE.read_text(encoding="utf-8") + state = {**DEFAULT_STATE, **json.loads(raw), **updates} + except (FileNotFoundError, json.JSONDecodeError): + state = {**DEFAULT_STATE, **updates} + + fd, tmp = tempfile.mkstemp(dir=CONFIG_DIR, suffix=".tmp") + try: + with os.fdopen(fd, "w") as f: + json.dump(state, f, indent=2) + os.rename(tmp, str(KERNEL_STATE_FILE)) + except Exception: + os.unlink(tmp) + raise + + return state + + def read_custom_kernel_config(self) -> dict: + """Read custom_kernel config file. + + Returns: + {variant, raw, valid, warning} + """ + try: + raw = CUSTOM_KERNEL_FILE.read_text(encoding="utf-8") + lines = [l for l in raw.splitlines() + if l.strip() and not l.strip().startswith("#")] + kernel_lines = [l for l in lines if "KERNELPATH=" in l] + + if not kernel_lines: + return {"variant": "stable", "raw": raw, "valid": True, "warning": None} + + last = kernel_lines[-1] + value = last.split("=", 1)[1].strip().strip('"').strip() + + if value in VALID_VARIANTS: + return {"variant": value, "raw": raw, "valid": True, "warning": None} + if value == "": + return {"variant": "stable", "raw": raw, "valid": True, "warning": None} + + return { + "variant": value, + "raw": raw, + "valid": False, + "warning": f"Unknown variant: {value}", + } + except FileNotFoundError: + return {"variant": "stable", "raw": "", "valid": True, "warning": None} + + def write_custom_kernel_config(self, variant: str) -> None: + """Write custom_kernel config file. + + Args: + variant: Kernel variant (stable, longterm, legacy) + + Raises: + ValueError: If variant is not valid + """ + if variant not in VALID_VARIANTS: + raise ValueError(f"Invalid variant: {variant}. Must be one of {VALID_VARIANTS}") + + content = f'KERNELPATH="{variant}"\n' + CUSTOM_KERNEL_FILE.write_text(content, encoding="utf-8") + + def get_status(self) -> dict: + """Get combined kernel status. + + Returns: + {state, customKernel, variants} + """ + state = self.read_state() + config = self.read_custom_kernel_config() + + # List available variants from filesystem + variants = [] + for v in VALID_VARIANTS: + kernel_path = Path(LINBO_DIR) / v / "linbo64" + variants.append({ + "name": v, + "available": kernel_path.is_file(), + "active": config["variant"] == v, + }) + + return { + "state": state, + "customKernel": config, + "variants": variants, + } + + def switch_variant(self, variant: str, rebuild: bool = True) -> dict: + """Switch kernel variant and optionally trigger linbofs rebuild. + + Args: + variant: Target variant + rebuild: Whether to trigger linbofs rebuild + + Returns: + {success, state, rebuild} + """ + if variant not in VALID_VARIANTS: + raise ValueError(f"Invalid variant: {variant}") + + from datetime import datetime, timezone + now = datetime.now(timezone.utc).isoformat() + + self.write_custom_kernel_config(variant) + state = self.write_state({ + "lastSwitchAt": now, + "lastRequestedVariant": variant, + "rebuildStatus": "pending" if rebuild else "completed", + }) + + rebuild_result = None + if rebuild: + rebuild_result = self.trigger_rebuild(variant) + return { + "success": rebuild_result.get("success", False), + "state": self.read_state(), + "rebuild": rebuild_result, + } + + return {"success": True, "state": state, "rebuild": rebuild_result} + + def trigger_rebuild(self, variant: str) -> dict: + """Trigger linbofs64 rebuild via update-linbofs. + + Runs /usr/sbin/update-linbofs with a 5-minute timeout. + Tracks state as running → completed/failed. + + Args: + variant: The kernel variant being built (for state tracking) + + Returns: + {success, output} on success, {success, error} on failure + """ + script_path = os.environ.get("UPDATE_LINBOFS_SCRIPT", "/usr/sbin/update-linbofs") + + # Acquire file-based lock + lock_fd = None + try: + lock_fd = os.open(str(REBUILD_LOCK), os.O_CREAT | os.O_EXCL | os.O_WRONLY) + except FileExistsError: + return {"success": False, "error": "Rebuild already in progress (lock exists)"} + + self._rebuild_active = True + self.write_state({"rebuildStatus": "running"}) + + try: + env = { + **os.environ, + "LINBO_DIR": LINBO_DIR, + "CONFIG_DIR": CONFIG_DIR, + } + cmd = [script_path] + if os.getuid() != 0 and shutil.which("fakeroot"): + cmd.insert(0, "fakeroot") + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300, + env=env, + ) + + self._rebuild_active = False + output = result.stdout + result.stderr + + if result.returncode == 0: + self.write_state({ + "rebuildStatus": "completed", + "lastSuccessfulVariant": variant, + "lastError": None, + }) + return {"success": True, "output": output} + else: + error_msg = result.stderr.strip() or f"Exit code {result.returncode}" + self.write_state({ + "rebuildStatus": "failed", + "lastError": error_msg, + }) + return {"success": False, "error": error_msg, "output": output} + + except subprocess.TimeoutExpired: + self._rebuild_active = False + self.write_state({ + "rebuildStatus": "failed", + "lastError": "Rebuild timed out after 300s", + }) + return {"success": False, "error": "Rebuild timed out after 300s"} + + except Exception as e: + self._rebuild_active = False + self.write_state({ + "rebuildStatus": "failed", + "lastError": str(e), + }) + return {"success": False, "error": str(e)} + + finally: + if lock_fd is not None: + os.close(lock_fd) + try: + os.unlink(str(REBUILD_LOCK)) + except OSError: + pass diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/linbo_update.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/linbo_update.py new file mode 100644 index 0000000..c411663 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/linbo_update.py @@ -0,0 +1,161 @@ +""" +LINBO Update Service — check and install linuxmuster-linbo7 package updates. + +Uses APT package management to check for updates, download, and install. +Supports version comparison and status tracking. +""" + +import json +import logging +import os +import re +import subprocess +import tempfile +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +PACKAGE_NAME = "linuxmuster-linbo7" +STATE_FILE = Path(os.environ.get("CONFIG_DIR", "/etc/linuxmuster/linbo")) / "update_state.json" + + +class LinboUpdateManager: + """Manage LINBO package updates.""" + + def check_version(self) -> dict: + """Check installed and available LINBO versions. + + Returns: + {installed, available, updateAvailable} + """ + installed = self._get_installed_version() + available = self._get_available_version() + + update_available = False + if installed and available and installed != available: + update_available = self._version_gt(available, installed) + + return { + "installed": installed, + "available": available, + "updateAvailable": update_available, + } + + def start_update(self, timeout: int = 600) -> dict: + """Run apt-get install to update the LINBO package. + + Args: + timeout: Max seconds for the update (default: 10 min) + + Returns: + {success, output, errors, duration, installedVersion} + """ + state = self._read_state() + if state.get("status") == "running": + return {"success": False, "errors": "Update already in progress"} + + self._write_state({"status": "running", "startedAt": datetime.now(timezone.utc).isoformat()}) + + start = datetime.now(timezone.utc) + try: + # Update package lists first + subprocess.run( + ["apt-get", "update", "-qq"], + capture_output=True, text=True, timeout=120, + ) + + # Install/upgrade + result = subprocess.run( + ["apt-get", "install", "-y", "--only-upgrade", PACKAGE_NAME], + capture_output=True, text=True, timeout=timeout, + env={**os.environ, "DEBIAN_FRONTEND": "noninteractive"}, + ) + + duration = (datetime.now(timezone.utc) - start).total_seconds() + installed = self._get_installed_version() + + self._write_state({ + "status": "completed" if result.returncode == 0 else "failed", + "completedAt": datetime.now(timezone.utc).isoformat(), + "installedVersion": installed, + "exitCode": result.returncode, + }) + + return { + "success": result.returncode == 0, + "output": result.stdout, + "errors": result.stderr if result.returncode != 0 else None, + "duration": duration, + "installedVersion": installed, + } + + except subprocess.TimeoutExpired: + duration = (datetime.now(timezone.utc) - start).total_seconds() + self._write_state({"status": "failed", "error": f"Timeout after {timeout}s"}) + return {"success": False, "errors": f"Timeout after {timeout}s", "duration": duration} + + except Exception as e: + self._write_state({"status": "failed", "error": str(e)}) + return {"success": False, "errors": str(e)} + + def get_status(self) -> dict: + """Get current update status.""" + return self._read_state() + + def _get_installed_version(self) -> str | None: + """Get installed package version via dpkg.""" + try: + result = subprocess.run( + ["dpkg-query", "-W", "-f=${Version}", PACKAGE_NAME], + capture_output=True, text=True, timeout=5, + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + return None + + def _get_available_version(self) -> str | None: + """Get available package version via apt-cache.""" + try: + result = subprocess.run( + ["apt-cache", "policy", PACKAGE_NAME], + capture_output=True, text=True, timeout=10, + ) + if result.returncode == 0: + m = re.search(r"Candidate:\s+(\S+)", result.stdout) + if m and m.group(1) != "(none)": + return m.group(1) + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + return None + + @staticmethod + def _version_gt(a: str, b: str) -> bool: + """Compare Debian version strings (simplified).""" + try: + result = subprocess.run( + ["dpkg", "--compare-versions", a, "gt", b], + capture_output=True, timeout=5, + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError): + return a > b + + def _read_state(self) -> dict: + try: + return json.loads(STATE_FILE.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {"status": "idle"} + + def _write_state(self, updates: dict): + state = {**self._read_state(), **updates} + fd, tmp = tempfile.mkstemp(suffix=".tmp", dir=str(STATE_FILE.parent)) + try: + with os.fdopen(fd, "w") as f: + json.dump(state, f, indent=2) + os.rename(tmp, str(STATE_FILE)) + except Exception: + os.unlink(tmp) + raise diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/linbofs.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/linbofs.py new file mode 100644 index 0000000..2819a35 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/linbofs.py @@ -0,0 +1,180 @@ +""" +LINBO linbofs Rebuild Service — execute update-linbofs with locking. + +Manages linbofs64 rebuilds with: +- File-based mutual exclusion +- Build log rotation (keeps 3 most recent) +- Fakeroot detection +- Output buffering with size limits +""" + +import logging +import os +import shutil +import subprocess +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +UPDATE_SCRIPT = os.environ.get("UPDATE_LINBOFS_SCRIPT", "/usr/sbin/update-linbofs") +LINBO_DIR = os.environ.get("LINBO_DIR", "/srv/linbo") +CONFIG_DIR = os.environ.get("CONFIG_DIR", "/etc/linuxmuster/linbo") +LOCK_FILE = Path(CONFIG_DIR) / ".linbofs-rebuild.lock" +MAX_LOG_FILES = 3 + + +def _is_fakeroot_available() -> bool: + """Check if fakeroot is available.""" + return shutil.which("fakeroot") is not None + + +def rotate_build_logs() -> int: + """Rotate build logs, keeping only the 3 most recent. + + Returns: + Number of deleted log files + """ + log_dir = Path(LINBO_DIR) + if not log_dir.is_dir(): + return 0 + + logs = [] + for f in log_dir.iterdir(): + if f.name.startswith(".linbofs-build") and f.name.endswith(".log"): + try: + logs.append((f, f.stat().st_mtime)) + except OSError: + pass + + logs.sort(key=lambda x: x[1], reverse=True) + + deleted = 0 + for f, _ in logs[MAX_LOG_FILES:]: + try: + f.unlink() + deleted += 1 + except OSError: + pass + + return deleted + + +def update_linbofs( + linbo_dir: str | None = None, + config_dir: str | None = None, + timeout: int = 300, +) -> dict: + """Execute update-linbofs script. + + Args: + linbo_dir: Override LINBO directory + config_dir: Override config directory + timeout: Maximum execution time in seconds (default: 5 min) + + Returns: + {success, output, errors, duration} + """ + script = Path(UPDATE_SCRIPT) + if not script.is_file(): + return {"success": False, "errors": f"Script not found: {UPDATE_SCRIPT}"} + + # Atomic lock acquisition (O_CREAT|O_EXCL prevents race condition) + try: + fd = os.open(str(LOCK_FILE), os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.write(fd, str(os.getpid()).encode()) + os.close(fd) + except FileExistsError: + return {"success": False, "errors": "linbofs update already in progress"} + except OSError as e: + return {"success": False, "errors": f"Cannot create lock: {e}"} + + rotate_build_logs() + + env = { + **os.environ, + "LINBO_DIR": linbo_dir or LINBO_DIR, + "CONFIG_DIR": config_dir or CONFIG_DIR, + } + + start = datetime.now(timezone.utc) + log_name = f".linbofs-build.{start.strftime('%Y%m%d%H%M%S')}.log" + log_path = Path(linbo_dir or LINBO_DIR) / log_name + + try: + # Use fakeroot if available and not running as root + cmd = [str(script)] + if os.getuid() != 0 and _is_fakeroot_available(): + cmd = ["fakeroot"] + cmd + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + env=env, + ) + + duration = (datetime.now(timezone.utc) - start).total_seconds() + + # Write build log + try: + log_path.write_text( + f"# update-linbofs {start.isoformat()}\n" + f"# Exit code: {result.returncode}\n" + f"# Duration: {duration:.1f}s\n\n" + f"=== STDOUT ===\n{result.stdout}\n\n" + f"=== STDERR ===\n{result.stderr}\n" + ) + except OSError: + pass + + return { + "success": result.returncode == 0, + "output": result.stdout, + "errors": result.stderr if result.returncode != 0 else None, + "duration": duration, + "logFile": str(log_path), + } + + except subprocess.TimeoutExpired: + duration = (datetime.now(timezone.utc) - start).total_seconds() + return { + "success": False, + "errors": f"Timeout after {timeout}s", + "duration": duration, + } + finally: + # Release lock + try: + LOCK_FILE.unlink(missing_ok=True) + except OSError: + pass + + +def get_linbofs_info() -> dict: + """Get linbofs64 file info. + + Returns: + {exists, size, modifiedAt, md5} + """ + linbofs = Path(LINBO_DIR) / "linbofs64" + if not linbofs.is_file(): + return {"exists": False} + + stat = linbofs.stat() + + # Read MD5 if available + md5 = None + md5_path = Path(LINBO_DIR) / "linbofs64.md5" + try: + md5 = md5_path.read_text().strip().split()[0] + except OSError: + pass + + return { + "exists": True, + "size": stat.st_size, + "modifiedAt": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), + "md5": md5, + } diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/multicast.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/multicast.py new file mode 100644 index 0000000..87b88e4 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/multicast.py @@ -0,0 +1,111 @@ +""" +LINBO Multicast Service — wrapper around /usr/sbin/linbo-multicast. + +Controls multicast image distribution for fast client provisioning. +""" + +import logging +import subprocess +from pathlib import Path + +from ._validation import check_linbo_image_name + +logger = logging.getLogger(__name__) + +MULTICAST_BIN = "/usr/sbin/linbo-multicast" +CONFIG_PATH = "/etc/default/linbo-multicast" + + +class LinboMulticast: + """Manage multicast image distribution.""" + + def status(self) -> list[dict]: + """Get active multicast sessions. + + Returns: + List of {image, port, pid, startedAt} dicts + """ + try: + result = subprocess.run( + [MULTICAST_BIN, "status"], + capture_output=True, text=True, timeout=10, + ) + return self._parse_status(result.stdout) + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + logger.warning("multicast status failed: %s", e) + return [] + + @staticmethod + def _validate_image(image: str) -> str: + if not check_linbo_image_name(image): + raise ValueError(f"Unsafe image name: {image}") + return image + + def start(self, image: str | None = None) -> dict: + """Start multicast seeding. + + Args: + image: Specific image name, or None for all images + + Returns: + {success, message} + """ + cmd = [MULTICAST_BIN, "start"] + if image: + cmd.append(self._validate_image(image)) + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return { + "success": result.returncode == 0, + "message": result.stdout.strip() or result.stderr.strip(), + } + + def stop(self, image: str | None = None) -> dict: + """Stop multicast seeding. + + Args: + image: Specific image, or None for all + + Returns: + {success, message} + """ + cmd = [MULTICAST_BIN, "stop"] + if image: + cmd.append(self._validate_image(image)) + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return { + "success": result.returncode == 0, + "message": result.stdout.strip() or result.stderr.strip(), + } + + def get_config(self) -> dict: + """Read multicast configuration from /etc/default/linbo-multicast.""" + config = {} + try: + for line in Path(CONFIG_PATH).read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + key, _, value = line.partition("=") + config[key.strip()] = value.strip().strip('"') + except OSError: + pass + return config + + def _parse_status(self, output: str) -> list[dict]: + """Parse linbo-multicast status output.""" + sessions = [] + for line in output.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + # Format varies, parse what we can + parts = line.split() + if len(parts) >= 2: + sessions.append({ + "image": parts[0], + "info": " ".join(parts[1:]), + }) + return sessions diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/ssh.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/ssh.py new file mode 100644 index 0000000..ac20fc7 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/ssh.py @@ -0,0 +1,279 @@ +""" +LINBO SSH Service — execute commands on LINBO clients via SSH. + +Uses the system ssh client. Key loading is lazy: the private key is loaded +on first use, not at module import time. + +Default connection: root@host:2222 with RSA key from +/etc/linuxmuster/linbo/ssh_host_rsa_key_client +""" + +import logging +import os +import re +import subprocess +from pathlib import Path + +logger = logging.getLogger(__name__) + +# Key paths +_KEY_PATH = os.environ.get( + "LINBO_CLIENT_SSH_KEY", + "/etc/linuxmuster/linbo/ssh_host_rsa_key_client", +) +_SSH_BIN = os.environ.get("LINBO_CLIENT_SSH_BIN", "ssh") +_SSH_PORT = int(os.environ.get("LINBO_CLIENT_SSH_PORT", "2222")) +_SSH_TIMEOUT = int(os.environ.get("SSH_TIMEOUT", "10")) +_MAX_OUTPUT = 10 * 1024 * 1024 # 10 MB + +# Lazy-loaded key cache +_cached_key = None + +# Validation patterns for shell-safe parameters +SAFE_OSNAME_RE = re.compile(r"^[A-Za-z0-9 ._-]{1,100}$") +SAFE_PARTITION_RE = re.compile(r"^/dev/[a-z]{2,8}[0-9]{1,3}$") +SAFE_DOWNLOAD_TYPE_RE = re.compile(r"^(rsync|multicast|torrent)$") + + +def _get_private_key_path() -> str: + """Get SSH private key path. Tries primary, then fallback. + + Returns: + Path to the private key file + + Raises: + FileNotFoundError: If no key file is available + """ + global _cached_key + if _cached_key is not None: + return _cached_key + + if Path(_KEY_PATH).is_file(): + _cached_key = _KEY_PATH + logger.info("Loaded LINBO client key from %s", _KEY_PATH) + return _cached_key + + fallback = os.environ.get("SSH_PRIVATE_KEY") + if fallback and Path(fallback).is_file(): + _cached_key = fallback + logger.warning("Using fallback SSH key: %s", fallback) + return _cached_key + + raise FileNotFoundError( + f"SSH private key not available. Expected: {_KEY_PATH}" + ) + + +def _build_ssh_command( + host: str, + command: str, + port: int | None = None, + timeout: float | None = None, +) -> list[str]: + """Build the ssh command used for remote execution.""" + if port is None: + port = _SSH_PORT + if timeout is None: + timeout = _SSH_TIMEOUT + + key_path = _get_private_key_path() + connect_timeout = max(int(timeout), 1) + + return [ + _SSH_BIN, + "-i", + key_path, + "-p", + str(port), + "-o", + "BatchMode=yes", + "-o", + "StrictHostKeyChecking=no", + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + f"ConnectTimeout={connect_timeout}", + f"root@{host}", + command, + ] + + +def execute_command( + host: str, + command: str, + port: int | None = None, + timeout: float | None = None, +) -> dict: + """Execute a command on a remote host via SSH. + + Args: + host: Target hostname or IP + command: Shell command to execute + port: SSH port (default: 2222) + timeout: Connection timeout in seconds + + Returns: + {stdout, stderr, code} + """ + if timeout is None: + timeout = _SSH_TIMEOUT + + result = subprocess.run( + _build_ssh_command(host, command, port=port, timeout=timeout), + capture_output=True, + text=True, + timeout=max(timeout, 1), + ) + + return { + "stdout": result.stdout[:_MAX_OUTPUT], + "stderr": result.stderr[:_MAX_OUTPUT], + "code": result.returncode, + } + + +def execute_commands( + host: str, + commands: list[str], + port: int | None = None, + timeout: float | None = None, + continue_on_error: bool = False, +) -> list[dict]: + """Execute multiple commands sequentially. + + Args: + host: Target hostname or IP + commands: List of shell commands + continue_on_error: Continue even if a command fails + + Returns: + List of {command, success, stdout, stderr, code} dicts + """ + results = [] + for cmd in commands: + try: + result = execute_command(host, cmd, port=port, timeout=timeout) + results.append({ + "command": cmd, + "success": result["code"] == 0, + **result, + }) + if result["code"] != 0 and not continue_on_error: + break + except Exception as e: + results.append({ + "command": cmd, + "success": False, + "error": str(e), + "code": -1, + }) + if not continue_on_error: + break + return results + + +def test_connection(host: str, port: int | None = None) -> dict: + """Test SSH connectivity to a host. + + Returns: + {success, connected} or {success: False, error} + """ + try: + result = execute_command(host, 'echo "connected"', port=port, timeout=5) + return { + "success": True, + "connected": result["stdout"].strip() == "connected", + } + except Exception as e: + return {"success": False, "error": str(e)} + + +def _validate_param(value: str, name: str, pattern: re.Pattern) -> None: + """Validate a parameter for shell safety.""" + if not pattern.match(value): + raise ValueError(f'Invalid {name}: "{value}"') + + +def execute_linbo_command(host: str, linbo_command: str, params: dict | None = None) -> dict: + """Execute a LINBO command on a host. + + Supported commands: sync, start, reboot, shutdown, halt, initcache, partition, format + + Args: + host: Target hostname or IP + linbo_command: LINBO command name + params: Optional parameters (osName, forceNew, downloadType, partition) + + Returns: + {stdout, stderr, code} + """ + if params is None: + params = {} + + def _build_cmd() -> str: + if linbo_command == "sync": + cmd = "linbo_cmd synconly" + if params.get("forceNew"): + cmd += " -f" + if params.get("osName"): + _validate_param(params["osName"], "osName", SAFE_OSNAME_RE) + cmd += " " + params["osName"] + return cmd + + if linbo_command == "start": + cmd = "linbo_cmd start" + if params.get("osName"): + _validate_param(params["osName"], "osName", SAFE_OSNAME_RE) + cmd += " " + params["osName"] + return cmd + + if linbo_command in ("reboot", "shutdown", "halt", "partition"): + return "linbo_cmd " + linbo_command + + if linbo_command == "initcache": + cmd = "linbo_cmd initcache" + if params.get("downloadType"): + _validate_param(params["downloadType"], "downloadType", SAFE_DOWNLOAD_TYPE_RE) + cmd += " " + params["downloadType"] + return cmd + + if linbo_command == "format": + cmd = "linbo_cmd format" + if params.get("partition"): + _validate_param(params["partition"], "partition", SAFE_PARTITION_RE) + cmd += " " + params["partition"] + return cmd + + raise ValueError(f"Unknown LINBO command: {linbo_command}") + + command = _build_cmd() + return execute_command(host, command, port=params.get("port")) + + +def get_linbo_status(host: str, port: int | None = None) -> dict: + """Get LINBO status from a host (images, cache, disks). + + Returns: + {success, images, cache, disks} or {success: False, error} + """ + try: + results = execute_commands(host, [ + "linbo_cmd listimages", + "df -h /cache", + "lsblk -J", + ], port=port, continue_on_error=True) + + return { + "success": True, + "images": results[0]["stdout"].strip().split("\n") if results[0]["success"] else [], + "cache": results[1]["stdout"] if len(results) > 1 and results[1]["success"] else "", + "disks": results[2]["stdout"] if len(results) > 2 and results[2]["success"] else "", + } + except Exception as e: + return {"success": False, "error": str(e)} + + +def reset_key_cache() -> None: + """Reset the cached SSH key (for testing).""" + global _cached_key + _cached_key = None diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/terminal.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/terminal.py new file mode 100644 index 0000000..9c5ba39 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/terminal.py @@ -0,0 +1,125 @@ +""" +LINBO Terminal Service — interactive SSH session management. + +Manages SSH sessions with PTY support for web-based terminal access +to LINBO clients. Supports session lifecycle, input/output streaming, +PTY resize, and idle timeout enforcement. +""" + +import logging +import os +import uuid +from datetime import datetime, timezone + +from ._validation import check_ip + +logger = logging.getLogger(__name__) + +MAX_SESSIONS = int(os.environ.get("TERMINAL_MAX_SESSIONS", "10")) +IDLE_TIMEOUT = int(os.environ.get("TERMINAL_IDLE_TIMEOUT", str(30 * 60))) # 30 min + + +class TerminalSession: + """A single interactive SSH terminal session.""" + + def __init__(self, session_id: str, host_ip: str, user_id: str): + self.id = session_id + self.host_ip = host_ip + self.user_id = user_id + self.mode = "pty" + self.created_at = datetime.now(timezone.utc) + self.last_activity = self.created_at + self._client = None + self._channel = None + + def to_dict(self) -> dict: + return { + "id": self.id, + "hostIp": self.host_ip, + "userId": self.user_id, + "mode": self.mode, + "createdAt": self.created_at.isoformat(), + "lastActivity": self.last_activity.isoformat(), + } + + def touch(self): + """Update last activity timestamp.""" + self.last_activity = datetime.now(timezone.utc) + + def is_idle(self) -> bool: + """Check if session has exceeded idle timeout.""" + elapsed = (datetime.now(timezone.utc) - self.last_activity).total_seconds() + return elapsed > IDLE_TIMEOUT + + +class LinboTerminalManager: + """Manage interactive SSH terminal sessions.""" + + def __init__(self): + self._sessions: dict[str, TerminalSession] = {} + + def list_sessions(self) -> list[dict]: + """List all active sessions.""" + self._cleanup_idle() + return [s.to_dict() for s in self._sessions.values()] + + def get_session(self, session_id: str) -> TerminalSession | None: + """Get a session by ID.""" + return self._sessions.get(session_id) + + def create_session(self, host_ip: str, user_id: str) -> TerminalSession: + """Create a new terminal session. + + Args: + host_ip: Target LINBO client IP (must be valid IPv4) + user_id: ID of the user creating the session + + Returns: + New TerminalSession + + Raises: + ValueError: If host_ip is not a valid IPv4 address + RuntimeError: If max sessions reached + """ + if not host_ip or not check_ip(host_ip): + raise ValueError(f"Invalid IP address: {host_ip}") + if not user_id or not isinstance(user_id, str): + raise ValueError("user_id must be a non-empty string") + + self._cleanup_idle() + + if len(self._sessions) >= MAX_SESSIONS: + raise RuntimeError(f"Maximum sessions ({MAX_SESSIONS}) reached") + + session_id = str(uuid.uuid4()) + session = TerminalSession(session_id, host_ip, user_id) + self._sessions[session_id] = session + + logger.info("Terminal session %s created for %s by %s", session_id, host_ip, user_id) + return session + + def destroy_session(self, session_id: str) -> bool: + """Destroy a terminal session. + + Returns: + True if session was found and destroyed + """ + session = self._sessions.pop(session_id, None) + if session is None: + return False + + logger.info("Terminal session %s destroyed", session_id) + return True + + def destroy_all(self) -> int: + """Destroy all sessions. Returns count.""" + count = len(self._sessions) + self._sessions.clear() + return count + + def _cleanup_idle(self): + """Remove sessions that have exceeded idle timeout.""" + idle = [sid for sid, s in self._sessions.items() if s.is_idle()] + for sid in idle: + logger.info("Terminal session %s idle timeout", sid) + self._sessions.pop(sid, None) diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/torrent.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/torrent.py new file mode 100644 index 0000000..83ed0e7 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/torrent.py @@ -0,0 +1,106 @@ +""" +LINBO Torrent Service — wrapper around /usr/sbin/linbo-torrent. + +Controls BitTorrent seeding for peer-to-peer image distribution. +""" + +import logging +import subprocess +from pathlib import Path + +from ._validation import check_linbo_image_name + +logger = logging.getLogger(__name__) + +TORRENT_BIN = "/usr/sbin/linbo-torrent" +CONFIG_PATH = "/etc/default/linbo-torrent" + + +class LinboTorrent: + """Manage BitTorrent image seeding.""" + + @staticmethod + def _validate_image(image: str) -> str: + if not check_linbo_image_name(image): + raise ValueError(f"Unsafe image name: {image}") + return image + + def status(self) -> list[dict]: + """Get active torrent sessions. + + Returns: + List of {image, info} dicts + """ + try: + result = subprocess.run( + [TORRENT_BIN, "status"], + capture_output=True, text=True, timeout=10, + ) + sessions = [] + for line in result.stdout.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split() + if parts: + sessions.append({ + "image": parts[0], + "info": " ".join(parts[1:]) if len(parts) > 1 else "", + }) + return sessions + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + logger.warning("torrent status failed: %s", e) + return [] + + def start(self, image: str | None = None) -> dict: + """Start torrent seeding. + + Args: + image: Specific image, or None for all + + Returns: + {success, message} + """ + cmd = [TORRENT_BIN, "start"] + if image: + cmd.append(self._validate_image(image)) + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return { + "success": result.returncode == 0, + "message": result.stdout.strip() or result.stderr.strip(), + } + + def stop(self, image: str | None = None) -> dict: + """Stop torrent seeding. + + Args: + image: Specific image, or None for all + + Returns: + {success, message} + """ + cmd = [TORRENT_BIN, "stop"] + if image: + cmd.append(self._validate_image(image)) + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return { + "success": result.returncode == 0, + "message": result.stdout.strip() or result.stderr.strip(), + } + + def get_config(self) -> dict: + """Read torrent configuration.""" + config = {} + try: + for line in Path(CONFIG_PATH).read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + key, _, value = line.partition("=") + config[key.strip()] = value.strip().strip('"') + except OSError: + pass + return config diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/wlan.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/wlan.py new file mode 100644 index 0000000..5b187a9 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/wlan.py @@ -0,0 +1,118 @@ +""" +LINBO WLAN Configuration — manage wireless network settings for linbofs. + +Reads and writes wpa_supplicant configuration for LINBO clients. +""" + +import logging +import re +from pathlib import Path + +logger = logging.getLogger(__name__) + +DEFAULT_CONFIG_PATH = "/etc/linuxmuster/linbo/wlan.conf" + + +class LinboWlanConfig: + """Manage WLAN configuration for LINBO clients.""" + + def __init__(self, config_path: str | None = None): + self.config_path = Path(config_path or DEFAULT_CONFIG_PATH) + + def get_config(self) -> dict: + """Read WLAN configuration. + + Returns: + {enabled, ssid, keyMgmt, hasPsk, scanSsid} — PSK is never returned + """ + if not self.config_path.is_file(): + return {"enabled": False, "ssid": "", "keyMgmt": "NONE", "hasPsk": False} + + try: + content = self.config_path.read_text(encoding="utf-8") + except OSError: + return {"enabled": False, "ssid": "", "keyMgmt": "NONE", "hasPsk": False} + + ssid = "" + key_mgmt = "NONE" + has_psk = False + scan_ssid = False + + for line in content.splitlines(): + line = line.strip() + m = re.match(r'ssid="(.+)"', line) + if m: + ssid = m.group(1) + if re.match(r"key_mgmt=", line): + key_mgmt = line.split("=", 1)[1].strip() + if re.match(r'psk="', line) or re.match(r"psk=", line): + has_psk = True + if re.match(r"scan_ssid=1", line): + scan_ssid = True + + return { + "enabled": bool(ssid), + "ssid": ssid, + "keyMgmt": key_mgmt, + "hasPsk": has_psk, + "scanSsid": scan_ssid, + } + + def update_config( + self, + ssid: str, + key_mgmt: str = "WPA-PSK", + psk: str | None = None, + scan_ssid: bool = False, + ) -> dict: + """Write WLAN configuration. + + Args: + ssid: Network SSID (1-32 chars) + key_mgmt: Key management (WPA-PSK or NONE) + psk: Pre-shared key (required for WPA-PSK) + scan_ssid: Enable scan_ssid=1 for hidden networks + + Returns: + Updated config dict + + Raises: + ValueError: If parameters are invalid + """ + if not ssid or len(ssid) > 32: + raise ValueError("SSID must be 1-32 characters") + if key_mgmt not in ("WPA-PSK", "NONE"): + raise ValueError("keyMgmt must be WPA-PSK or NONE") + if key_mgmt == "WPA-PSK" and not psk: + raise ValueError("PSK required for WPA-PSK") + if psk and len(psk) > 128: + raise ValueError("PSK must be max 128 characters") + + lines = [ + "# LINBO WLAN configuration", + "# Managed by linuxmuster-tools", + "network={", + f' ssid="{ssid}"', + f" key_mgmt={key_mgmt}", + ] + if psk and key_mgmt == "WPA-PSK": + lines.append(f' psk="{psk}"') + if scan_ssid: + lines.append(" scan_ssid=1") + lines.append("}") + + self.config_path.parent.mkdir(parents=True, exist_ok=True) + self.config_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + return self.get_config() + + def reset_config(self) -> bool: + """Delete WLAN configuration. + + Returns: + True if file was deleted + """ + if self.config_path.is_file(): + self.config_path.unlink() + return True + return False diff --git a/usr/lib/python3/dist-packages/linuxmusterTools/linbo/wol.py b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/wol.py new file mode 100644 index 0000000..b86d437 --- /dev/null +++ b/usr/lib/python3/dist-packages/linuxmusterTools/linbo/wol.py @@ -0,0 +1,116 @@ +""" +LINBO Wake-on-LAN — send magic packets to wake up hosts. + +Supports direct UDP broadcast and optional Networkbox API proxy. +""" + +import logging +import os +import re +import socket + +from ._validation import check_mac + +logger = logging.getLogger(__name__) + + +def is_valid_mac(mac: str) -> bool: + """Validate MAC address format (XX:XX:XX:XX:XX:XX or XX-XX-XX-XX-XX-XX).""" + return check_mac(mac) + + +def normalize_mac(mac: str) -> str: + """Normalize MAC to lowercase hex string (for WoL packet construction).""" + return re.sub(r"[:\-]", "", mac).lower() + + +def create_magic_packet(mac_address: str) -> bytes: + """Create IEEE 802.3 Wake-on-LAN magic packet. + + 6 bytes of 0xFF followed by the MAC address repeated 16 times = 102 bytes. + + Args: + mac_address: MAC address (XX:XX:XX:XX:XX:XX or XX-XX-XX-XX-XX-XX) + + Returns: + 102-byte magic packet + + Raises: + ValueError: If MAC address is invalid + """ + mac = re.sub(r"[:\-]", "", mac_address) + if len(mac) != 12 or not re.match(r"^[0-9a-fA-F]+$", mac): + raise ValueError(f"Invalid MAC address: {mac_address}") + + mac_bytes = bytes.fromhex(mac) + return b"\xff" * 6 + mac_bytes * 16 + + +def send_wol( + mac_address: str, + broadcast: str | None = None, + port: int = 9, + count: int = 3, +) -> dict: + """Send Wake-on-LAN packet via UDP broadcast. + + Args: + mac_address: Target MAC address + broadcast: Broadcast address (default: WOL_BROADCAST_ADDRESS env or 255.255.255.255) + port: UDP port (default: 9) + count: Number of packets to send (default: 3) + + Returns: + Dict with macAddress, packetsSent, broadcastAddress, port + """ + if broadcast is None: + broadcast = os.environ.get("WOL_BROADCAST_ADDRESS", "255.255.255.255") + + packet = create_magic_packet(mac_address) + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + try: + for _ in range(count): + sock.sendto(packet, (broadcast, port)) + finally: + sock.close() + + return { + "macAddress": mac_address, + "packetsSent": count, + "broadcastAddress": broadcast, + "port": port, + } + + +def send_wol_bulk( + mac_addresses: list[str], + broadcast: str | None = None, + port: int = 9, + count: int = 3, +) -> dict: + """Send Wake-on-LAN to multiple hosts. + + Returns: + Dict with total, successful, failed, and per-host results + """ + results = [] + successful = 0 + failed = 0 + + for mac in mac_addresses: + try: + send_wol(mac, broadcast=broadcast, port=port, count=count) + results.append({"macAddress": mac, "success": True, "error": None}) + successful += 1 + except Exception as e: + results.append({"macAddress": mac, "success": False, "error": str(e)}) + failed += 1 + + return { + "total": len(mac_addresses), + "successful": successful, + "failed": failed, + "results": results, + }