#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import shutil import subprocess import sys from pathlib import Path REPO_ROOT = Path("{{REPO_ROOT}}") RUNTIME_ROOT = Path("{{RUNTIME_ROOT}}") HEALTHCHECK_PATH = Path("/usr/local/sbin/metin-login-healthcheck") sys.path.insert(0, str(REPO_ROOT)) import channel_inventory def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Operational CLI for the Debian Metin runtime") subparsers = parser.add_subparsers(dest="command", required=True) inventory_parser = subparsers.add_parser("inventory", help="Show declared channel inventory") inventory_parser.add_argument("--json", action="store_true", help="Print raw JSON") subparsers.add_parser("units", help="List managed systemd units") status_parser = subparsers.add_parser("status", help="Show current unit state") status_parser.add_argument("target", nargs="?", default="all", help="stack, db, auth, game, channel:, instance:") ports_parser = subparsers.add_parser("ports", help="Show declared listener ports") ports_parser.add_argument("--live", action="store_true", help="Also show whether the port is currently listening") for action in ("start", "stop", "restart"): action_parser = subparsers.add_parser(action, help=f"{action.title()} a managed target") action_parser.add_argument("target", help="stack, db, auth, game, channel:, instance:") logs_parser = subparsers.add_parser("logs", help="Show journalctl logs for a managed target") logs_parser.add_argument("target", help="stack, db, auth, game, channel:, instance:") logs_parser.add_argument("-n", "--lines", type=int, default=100, help="Number of journal lines") logs_parser.add_argument("-f", "--follow", action="store_true", help="Follow the journal") subparsers.add_parser("healthcheck", help="Run the root-only headless healthcheck") return parser.parse_args() def build_command(command: list[str], require_root: bool = False) -> list[str]: if require_root and os.geteuid() != 0: return ["sudo", *command] return command def run(command: list[str], require_root: bool = False, capture_output: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: return subprocess.run( build_command(command, require_root=require_root), check=check, capture_output=capture_output, text=True, ) def get_unit_state(unit: str) -> tuple[str, str, str]: active = run(["systemctl", "is-active", unit], capture_output=True, check=False).stdout.strip() or "unknown" enabled = run(["systemctl", "is-enabled", unit], capture_output=True, check=False).stdout.strip() or "unknown" sub_state = run(["systemctl", "show", unit, "--property=SubState", "--value"], capture_output=True, check=False).stdout.strip() or "-" return active, sub_state, enabled def print_table(headers: list[str], rows: list[list[str]]) -> None: widths = [len(header) for header in headers] for row in rows: for index, value in enumerate(row): widths[index] = max(widths[index], len(value)) header_line = " ".join(header.ljust(widths[index]) for index, header in enumerate(headers)) print(header_line) print(" ".join("-" * widths[index] for index in range(len(headers)))) for row in rows: print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row))) def iter_port_rows() -> list[dict[str, str]]: rows = [ { "scope": "db", "name": "db", "port": str(channel_inventory.get_db()["port"]), "p2p_port": "-", "unit": channel_inventory.DB_UNIT, "visibility": "internal", }, { "scope": "auth", "name": "auth", "port": str(channel_inventory.get_auth()["port"]), "p2p_port": str(channel_inventory.get_auth()["p2p_port"]), "unit": channel_inventory.AUTH_UNIT, "visibility": "public", }, ] for channel in channel_inventory.iter_channels(): channel_id = int(channel["id"]) visibility = "public" if channel.get("public") else "internal" for core in sorted(channel["cores"], key=lambda item: int(item["id"])): core_id = int(core["id"]) instance = channel_inventory.instance_name(channel_id, core_id) rows.append( { "scope": f"channel:{channel_id}", "name": instance, "port": str(core["port"]), "p2p_port": str(core["p2p_port"]), "unit": channel_inventory.game_unit(instance), "visibility": visibility, } ) return rows def live_ports() -> set[int]: if shutil.which("ss") is None: return set() completed = run(["ss", "-ltnH"], capture_output=True, check=True) ports: set[int] = set() for line in completed.stdout.splitlines(): fields = line.split() if len(fields) < 4: continue local = fields[3] if ":" not in local: continue try: ports.add(int(local.rsplit(":", 1)[1])) except ValueError: continue return ports def print_inventory(as_json: bool) -> int: if as_json: print(json.dumps(channel_inventory.load_inventory(), indent=2)) return 0 rows: list[list[str]] = [] for channel in channel_inventory.iter_channels(): channel_id = int(channel["id"]) ports = ",".join(str(core["port"]) for core in sorted(channel["cores"], key=lambda item: int(item["id"]))) rows.append( [ str(channel_id), channel["name"], "yes" if channel.get("public") else "no", "yes" if channel.get("client_visible") else "no", str(len(channel["cores"])), ports, ] ) print_table(["channel", "name", "public", "visible", "cores", "ports"], rows) return 0 def print_units() -> int: rows = [ ["stack", channel_inventory.STACK_UNIT], ["db", channel_inventory.DB_UNIT], ["db-ready", channel_inventory.DB_READY_UNIT], ["auth", channel_inventory.AUTH_UNIT], ] for unit in channel_inventory.get_game_units(): rows.append(["game", unit]) print_table(["kind", "unit"], rows) return 0 def resolve_target_units(target: str) -> list[str]: normalized = target.strip().lower() if normalized in {"all", "stack", "server"}: return [channel_inventory.STACK_UNIT] if normalized == "db": return [channel_inventory.DB_UNIT] if normalized in {"db-ready", "db_ready"}: return [channel_inventory.DB_READY_UNIT] if normalized == "auth": return [channel_inventory.AUTH_UNIT] if normalized in {"game", "games"}: return channel_inventory.get_game_units() if normalized.startswith("channel:"): channel_id = int(normalized.split(":", 1)[1]) return channel_inventory.get_game_units([channel_id]) if normalized.startswith("instance:"): return [channel_inventory.game_unit(target.split(":", 1)[1])] if normalized.startswith("channel") and "_core" in normalized: return [channel_inventory.game_unit(target)] raise SystemExit(f"Unknown target: {target}") def print_status(target: str) -> int: if target == "all": units = [ channel_inventory.STACK_UNIT, channel_inventory.DB_UNIT, channel_inventory.DB_READY_UNIT, channel_inventory.AUTH_UNIT, *channel_inventory.get_game_units(), ] else: units = resolve_target_units(target) rows: list[list[str]] = [] for unit in units: active, sub_state, enabled = get_unit_state(unit) rows.append([unit, active, sub_state, enabled]) print_table(["unit", "active", "sub", "enabled"], rows) return 0 def print_ports(show_live: bool) -> int: listening = live_ports() if show_live else set() headers = ["scope", "name", "port", "p2p", "visibility", "unit"] if show_live: headers.append("live") rows: list[list[str]] = [] for row in iter_port_rows(): values = [row["scope"], row["name"], row["port"], row["p2p_port"], row["visibility"], row["unit"]] if show_live: values.append("yes" if int(row["port"]) in listening else "no") rows.append(values) print_table(headers, rows) return 0 def run_unit_action(action: str, target: str) -> int: units = resolve_target_units(target) run(["systemctl", action, *units], require_root=True) return 0 def run_logs(target: str, lines: int, follow: bool) -> int: units = resolve_target_units(target) command = ["journalctl", "--no-pager", f"-n{lines}"] for unit in units: command.extend(["-u", unit]) if follow: command = ["journalctl", f"-n{lines}", "-f", *sum((["-u", unit] for unit in units), [])] run(command, require_root=True) return 0 def run_healthcheck() -> int: if not HEALTHCHECK_PATH.exists(): raise SystemExit(f"Missing healthcheck wrapper: {HEALTHCHECK_PATH}") run([str(HEALTHCHECK_PATH)], require_root=True) return 0 def main() -> int: args = parse_args() if args.command == "inventory": return print_inventory(args.json) if args.command == "units": return print_units() if args.command == "status": return print_status(args.target) if args.command == "ports": return print_ports(args.live) if args.command in {"start", "stop", "restart"}: return run_unit_action(args.command, args.target) if args.command == "logs": return run_logs(args.target, args.lines, args.follow) if args.command == "healthcheck": return run_healthcheck() raise SystemExit(f"Unsupported command: {args.command}") if __name__ == "__main__": raise SystemExit(main())