Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 3 additions & 32 deletions usr/lib/python3/dist-packages/linuxmusterTools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,5 @@
import logging
"""Overlay package for linuxmusterTools extensions."""

logger = logging.getLogger(__name__)
logger_handler = logging.StreamHandler()
logger.addHandler(logger_handler)
logger.setLevel(logging.INFO)
from pkgutil import extend_path

class LMNToolsFormatter(logging.Formatter):
grey = '\x1b[38;5;245m'
green = '\x1b[38;5;82m'
orange = '\x1b[38;5;214m'
red = '\x1b[38;5;196m'
bold_red = '\x1b[31;1m'
violet = '\x1b[38;5;129m'
reset = '\x1b[0m'

def __init__(self):
super().__init__()
self.fmtprefix = "%(asctime)s %(name)60s" + self.violet + " %(filename)20s:%(lineno)d " + self.reset
self.fmt = f"%(levelname)8s - %(message)s"
self.FORMATS = {
logging.DEBUG: self.fmtprefix + self.grey + self.fmt + self.reset,
logging.INFO: self.fmtprefix + self.green + self.fmt + self.reset,
logging.WARNING: self.fmtprefix + self.orange + self.fmt + self.reset,
logging.ERROR: self.fmtprefix + self.red + self.fmt + self.reset,
logging.CRITICAL: self.fmtprefix + self.bold_red + self.fmt + self.reset
}

def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%Y-%m-%d %H:%M:%S")
return formatter.format(record)

logger_handler.setFormatter(LMNToolsFormatter())
__path__ = extend_path(__path__, __name__)
57 changes: 54 additions & 3 deletions usr/lib/python3/dist-packages/linuxmusterTools/linbo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,54 @@
from .images import *
from .config import *
from .command import *
"""LINBO modules provided by this overlay package."""

from importlib import import_module
from pkgutil import extend_path
import subprocess

__path__ = extend_path(__path__, __name__)

from .wol import *
from .host_status import *
from .boot_logs import *
from .hooks import *
from .multicast import *
from .torrent import *
from .ssh import *
from .terminal import *
from .firmware import *
from .drivers import *
from .kernel import *
from .linbofs import *
from .wlan import *
from .image_sync import *
from .linbo_update import *
from .grub_generator import *
from .dhcp import *

# NOTE: hosts, config, grub, images, changes are provided by
# upstream linuxmuster-tools — do NOT re-export here

_LEGACY_MODULES = ("config", "images", "command", "changes", "hosts", "grub")


def __getattr__(name: str):
"""Resolve legacy upstream exports lazily.

This keeps package import lightweight while still allowing callers to
access upstream objects such as LinboConfigManager when available.
"""
if name in _LEGACY_MODULES:
module = import_module(f"{__name__}.{name}")
globals()[name] = module
return module

for module_name in _LEGACY_MODULES:
try:
module = import_module(f"{__name__}.{module_name}")
except (ImportError, OSError, subprocess.SubprocessError):
continue
if hasattr(module, name):
value = getattr(module, name)
globals()[name] = value
return value

raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Centralized validation helpers backed by upstream NameChecker."""

from linuxmusterTools.common.checks import NameChecker

_checker = NameChecker()


def _check_segmented_name(name: str, check_part) -> bool:
"""Allow dot-separated names while validating each segment upstream-style."""
if not isinstance(name, str) or not name:
return False
if "/" in name or "\\" in name or "\0" in name or ".." in name:
return False

parts = name.split(".")
return all(part and check_part(part) for part in parts)


def check_ip(ip: str) -> bool:
"""Validate IPv4 address."""
return _checker.check_ip_name(ip)


def check_mac(mac: str) -> bool:
"""Validate MAC address (colon or hyphen separated)."""
if not isinstance(mac, str) or not mac:
return False
value = mac.strip()
return _checker.check_mac1_name(value) or _checker.check_mac2_name(value)


def normalize_mac(mac: str) -> str | None:
"""Normalize MAC to uppercase colon-separated. Returns None if invalid."""
if not isinstance(mac, str) or not mac:
return None
normalized = _checker.normalize_mac(mac.strip())
if normalized is None:
return None
return normalized.upper()


def check_linbo_image_name(name: str) -> bool:
"""Validate LINBO image name."""
return _check_segmented_name(name, _checker.check_linbo_image_name)


def check_linbo_conf_name(name: str) -> bool:
"""Validate LINBO config / group name."""
return _check_segmented_name(name, _checker.check_linbo_conf_name)
142 changes: 142 additions & 0 deletions usr/lib/python3/dist-packages/linuxmusterTools/linbo/boot_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""
LINBO Boot Logs — read client boot logs from /var/log/linuxmuster/linbo/.

Provides safe file listing, reading, and deletion with path traversal protection.
"""

import logging
import os
import re
from datetime import datetime, timezone
from pathlib import Path

logger = logging.getLogger(__name__)

DEFAULT_LOG_DIR = "/var/log/linuxmuster/linbo"
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB
_SAFE_FILENAME = re.compile(r"^[a-zA-Z0-9._-]+$")
_IMAGE_STATUS_PATTERN = re.compile(r"^(\d{12})\s+(\w+):\s+(\S+)(?:\s+\"(\d+)\")?")


def get_host_image_status(log_dir: str | None = None) -> dict:
"""Parse _image.status files for per-host last sync info.

Reads *_image.status files from the LINBO log directory. Each file
contains a single line like: 202603241142 applied: win11_pro_edu.qcow2 "202601271107"

Args:
log_dir: Override log directory (default: /var/log/linuxmuster/linbo)

Returns:
Dict mapping hostname to {lastSync, action, image, imageVersion}
"""
base = Path(log_dir) if log_dir else Path(DEFAULT_LOG_DIR)
if not base.is_dir():
return {}

result = {}
try:
entries = os.listdir(base)
except OSError:
return {}

status_files = [f for f in entries if f.endswith("_image.status")]

for filename in sorted(status_files):
hostname = filename.removesuffix("_image.status")
filepath = base / filename
try:
content = filepath.read_text(encoding="utf-8", errors="replace").strip()
except OSError:
continue

m = _IMAGE_STATUS_PATTERN.match(content)
if m:
ts = m.group(1) # YYYYMMDDHHMI
year, month, day = ts[0:4], ts[4:6], ts[6:8]
hour, minute = ts[8:10], ts[10:12]

result[hostname] = {
"lastSync": f"{year}-{month}-{day}T{hour}:{minute}:00.000Z",
"action": m.group(2),
"image": m.group(3),
"imageVersion": m.group(4) or None,
}

return result


class LinboBootLogs:
"""Read-only access to LINBO client boot logs."""

def __init__(self, log_dir: str = DEFAULT_LOG_DIR):
self.log_dir = Path(log_dir)

def list_logs(self) -> list[dict]:
"""List all log files with metadata.

Returns:
List of {filename, size, modifiedAt} dicts, sorted newest first
"""
if not self.log_dir.is_dir():
return []

logs = []
for f in self.log_dir.iterdir():
if not f.is_file():
continue
stat = f.stat()
logs.append({
"filename": f.name,
"size": stat.st_size,
"modifiedAt": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
})

return sorted(logs, key=lambda x: x["modifiedAt"], reverse=True)

def read_log(self, filename: str) -> str | None:
"""Read a log file's content.

Args:
filename: Name of the log file (validated for path traversal)

Returns:
File content as string, or None if not found

Raises:
ValueError: If filename contains unsafe characters
ValueError: If file exceeds MAX_FILE_SIZE
"""
if not _SAFE_FILENAME.match(filename):
raise ValueError(f"Unsafe filename: {filename}")

filepath = self.log_dir / filename
if not filepath.is_file():
return None

if filepath.stat().st_size > MAX_FILE_SIZE:
raise ValueError(f"File too large: {filepath.stat().st_size} bytes (max {MAX_FILE_SIZE})")

return filepath.read_text(encoding="utf-8", errors="replace")

def delete_log(self, filename: str) -> bool:
"""Delete a log file.

Args:
filename: Name of the log file

Returns:
True if deleted, False if not found

Raises:
ValueError: If filename contains unsafe characters
"""
if not _SAFE_FILENAME.match(filename):
raise ValueError(f"Unsafe filename: {filename}")

filepath = self.log_dir / filename
if not filepath.is_file():
return False

filepath.unlink()
return True
106 changes: 106 additions & 0 deletions usr/lib/python3/dist-packages/linuxmusterTools/linbo/changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
LINBO Change Tracker — cursor-based delta detection via filesystem mtimes.

Compares file modification times against a unix-timestamp cursor to
determine which hosts, start.confs, and configs have changed.
"""

import logging
import time
from datetime import datetime, timezone

from .hosts import LinboHostProvider, devices_csv_path, get_mtime
from .config import LinboConfigManager
from .grub import LinboGrubReader

logger = logging.getLogger(__name__)


class LinboChangeTracker:
"""Cursor-based change detection using filesystem mtimes."""

def __init__(self, school: str = "default-school"):
self.school = school
self.host_provider = LinboHostProvider(school)
self.config_manager = LinboConfigManager()
self.grub_reader = LinboGrubReader()

def get_changes(self, since_cursor: str = "0") -> dict:
"""Compare filesystem state against cursor, return delta.

Args:
since_cursor: Unix timestamp string. '0' = full snapshot.

Returns:
Dict with nextCursor, hostsChanged, startConfsChanged,
configsChanged, dhcpChanged, deletedHosts, deletedStartConfs,
allHostMacs, allStartConfIds, allConfigIds.
"""
try:
cursor_ts = int(since_cursor) if since_cursor else 0
except ValueError:
cursor_ts = 0

cursor_dt = (
datetime.fromtimestamp(cursor_ts, tz=timezone.utc) if cursor_ts > 0
else None
)

# Parse all known entities
all_hosts, _ = self.host_provider.parse_devices_csv()
school_groups = {h["hostgroup"] for h in all_hosts}
all_host_macs = [h["mac"] for h in all_hosts]
all_startconf_ids = [
id for id in self.config_manager.list_startconf_ids()
if id in school_groups
]
all_config_ids = [
id for id in self.grub_reader.list_grub_cfg_ids()
if id in school_groups
]

# Detect host changes via devices.csv mtime
devices_mtime = get_mtime(devices_csv_path(self.school))
hosts_changed_macs: list[str] = []
deleted_hosts: list[str] = []
dhcp_changed = False

devices_modified = (
cursor_dt is None
or devices_mtime is None
or (devices_mtime > cursor_dt)
)

if devices_modified:
hosts_changed_macs = list(all_host_macs)
dhcp_changed = True

# Check start.conf files
startconfs_changed: list[str] = []
deleted_startconfs: list[str] = []
for group in all_startconf_ids:
mtime = self.config_manager.get_startconf_mtime(group)
if cursor_dt is None or (mtime and mtime > cursor_dt):
startconfs_changed.append(group)

# Check GRUB configs
configs_changed: list[str] = []
for group in all_config_ids:
mtime = self.grub_reader.get_cfg_mtime(group)
if cursor_dt is None or (mtime and mtime > cursor_dt):
configs_changed.append(group)

next_cursor = str(int(time.time()))

return {
"nextCursor": next_cursor,
"hostsChanged": hosts_changed_macs,
"startConfsChanged": startconfs_changed,
"configsChanged": configs_changed,
"dhcpChanged": dhcp_changed,
"deletedHosts": deleted_hosts,
"deletedStartConfs": deleted_startconfs,
"allHostMacs": all_host_macs,
"allStartConfIds": all_startconf_ids,
"allConfigIds": all_config_ids,
}
Loading