#!/usr/bin/env python3 from __future__ import annotations import argparse import collections import datetime as dt import json import os import pwd import re import shutil import subprocess import sys import time from pathlib import Path REPO_ROOT = Path("{{REPO_ROOT}}") RUNTIME_ROOT = Path("{{RUNTIME_ROOT}}") HEALTHCHECK_PATH = Path("/usr/local/sbin/metin-login-healthcheck") INCIDENT_COLLECTOR_PATH = Path("/usr/local/sbin/metin-collect-incident") INCIDENT_ROOT = Path("/var/lib/metin/incidents") AUTH_SYSLOG_PATH = RUNTIME_ROOT / "channels" / "auth" / "syslog.log" SOURCE_REPO_ROOT = RUNTIME_ROOT.parent.parent / "repos" / "m2dev-server-src" sys.path.insert(0, str(REPO_ROOT)) import channel_inventory AUTH_LOG_LINE_RE = re.compile(r"^\[(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[[^]]+\] (?P.*)$") AUTH_CONN_RE = re.compile(r"SYSTEM: new connection from \[(?P[^]]+)\].* ptr (?P0x[0-9a-fA-F]+)") AUTH_LOGIN_RE = re.compile(r"InputAuth::Login : (?P[^(]+)\(\d+\) desc (?P0x[0-9a-fA-F]+)") AUTH_INVALID_LOGIN_RE = re.compile(r"InputAuth::Login : IS_NOT_VALID_LOGIN_STRING\((?P[^)]+)\) desc (?P0x[0-9a-fA-F]+)") AUTH_START_RE = re.compile(r"(?:AUTH_LOGIN_DIRECT|QID_AUTH_LOGIN): START \d+ (?P0x[0-9a-fA-F]+)") AUTH_SUCCESS_DIRECT_RE = re.compile(r"AUTH_LOGIN_DIRECT: SUCCESS (?P.+)$") AUTH_FAILURE_RE = re.compile(r"^(NOID|WRONGPWD|NOTAVAIL|ALREADY)$") SMOKE_LOGIN_PREFIXES = ("smk", "smkhc", "smkdel", "smkfull", "smkneg", "smoke_", "csmk") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Operational CLI for the Debian Metin runtime") subparsers = parser.add_subparsers(dest="command", required=True) inventory_parser = subparsers.add_parser("inventory", help="Show declared channel inventory") inventory_parser.add_argument("--json", action="store_true", help="Print raw JSON") subparsers.add_parser("units", help="List managed systemd units") summary_parser = subparsers.add_parser("summary", help="Show an operational summary") summary_parser.add_argument("--hours", type=int, default=24, help="Auth activity window in hours") summary_parser.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins in auth summary") summary_parser.add_argument("--json", action="store_true", help="Print raw JSON") auth_activity = subparsers.add_parser("auth-activity", help="Show recent auth success/failure activity") auth_activity.add_argument("--hours", type=int, default=24, help="How many hours back to inspect") auth_activity.add_argument("--limit", type=int, default=30, help="Maximum events to show") auth_activity.add_argument("--status", choices=("all", "success", "failure"), default="all", help="Filter by auth result") auth_activity.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins") auth_activity.add_argument("--json", action="store_true", help="Print raw JSON") auth_ips = subparsers.add_parser("auth-ips", help="Summarize auth activity by source IP") auth_ips.add_argument("--hours", type=int, default=24, help="How many hours back to inspect") auth_ips.add_argument("--limit", type=int, default=20, help="Maximum IPs to show") auth_ips.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins") auth_ips.add_argument("--json", action="store_true", help="Print raw JSON") sessions = subparsers.add_parser("sessions", help="Show recent login sessions from loginlog2") sessions.add_argument("--hours", type=int, default=24, help="How many hours back to inspect") sessions.add_argument("--limit", type=int, default=20, help="Maximum sessions to show") sessions.add_argument("--active-only", action="store_true", help="Show only sessions without logout_time") sessions.add_argument("--include-orphans", action="store_true", help="Include rows whose account login no longer exists") sessions.add_argument("--json", action="store_true", help="Print raw JSON") session_audit = subparsers.add_parser("session-audit", help="Show stale open sessions without logout") session_audit.add_argument("--hours", type=int, default=72, help="How many hours back to inspect") session_audit.add_argument("--stale-minutes", type=int, default=30, help="Minimum age for an open session to be considered stale") session_audit.add_argument("--limit", type=int, default=20, help="Maximum sessions to show") session_audit.add_argument("--include-orphans", action="store_true", help="Include rows whose account login no longer exists") session_audit.add_argument("--json", action="store_true", help="Print raw JSON") status_parser = subparsers.add_parser("status", help="Show current unit state") status_parser.add_argument("target", nargs="?", default="all", help="stack, db, auth, game, channel:, instance:") ports_parser = subparsers.add_parser("ports", help="Show declared listener ports") ports_parser.add_argument("--live", action="store_true", help="Also show whether the port is currently listening") for action in ("start", "stop", "restart"): action_parser = subparsers.add_parser(action, help=f"{action.title()} a managed target") action_parser.add_argument("target", help="stack, db, auth, game, channel:, instance:") logs_parser = subparsers.add_parser("logs", help="Show journalctl logs for a managed target") logs_parser.add_argument("target", help="stack, db, auth, game, channel:, instance:") logs_parser.add_argument("-n", "--lines", type=int, default=100, help="Number of journal lines") logs_parser.add_argument("-f", "--follow", action="store_true", help="Follow the journal") cores_parser = subparsers.add_parser("cores", help="List core files under the runtime tree") cores_parser.add_argument("--json", action="store_true", help="Print raw JSON") incidents_parser = subparsers.add_parser("incidents", help="List collected incident bundles") incidents_parser.add_argument("--limit", type=int, default=10, help="Maximum number of bundles to show") incident_collect = subparsers.add_parser("incident-collect", help="Collect an incident bundle") incident_collect.add_argument("--tag", default="manual", help="Short incident tag") incident_collect.add_argument("--since", default="-30 minutes", help="journalctl --since value") incident_collect.add_argument("--include-cores", action="store_true", help="Copy matching core files into the bundle") auth_failures = subparsers.add_parser("auth-failures", help="Show recent auth failures from auth syslog") auth_failures.add_argument("--hours", type=int, default=24, help="How many hours back to inspect") auth_failures.add_argument("--limit", type=int, default=20, help="Maximum failures to show") auth_failures.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins") auth_failures.add_argument("--json", action="store_true", help="Print raw JSON") wait_ready = subparsers.add_parser("wait-ready", help="Wait until the runtime passes the login-ready probe") wait_ready.add_argument("--timeout", type=int, default=120, help="Maximum seconds to wait") wait_ready.add_argument("--interval", type=float, default=5.0, help="Seconds between healthcheck attempts") healthcheck = subparsers.add_parser("healthcheck", help="Run the root-only headless healthcheck") healthcheck.add_argument("--mode", choices=("ready", "full"), default="full", help="Healthcheck depth") return parser.parse_args() def build_command(command: list[str], require_root: bool = False) -> list[str]: if require_root and os.geteuid() != 0: return ["sudo", *command] return command def run(command: list[str], require_root: bool = False, capture_output: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: return subprocess.run( build_command(command, require_root=require_root), check=check, capture_output=capture_output, text=True, ) def run_as_repo_owner(command: list[str], repo_path: Path, capture_output: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: effective_command = list(command) if repo_path.exists(): owner_uid = repo_path.stat().st_uid if os.geteuid() == 0 and owner_uid != 0: owner_name = pwd.getpwuid(owner_uid).pw_name effective_command = ["sudo", "-u", owner_name, *effective_command] return subprocess.run( effective_command, check=check, capture_output=capture_output, text=True, ) def get_unit_state(unit: str) -> tuple[str, str, str]: active = run(["systemctl", "is-active", unit], capture_output=True, check=False).stdout.strip() or "unknown" enabled = run(["systemctl", "is-enabled", unit], capture_output=True, check=False).stdout.strip() or "unknown" sub_state = run(["systemctl", "show", unit, "--property=SubState", "--value"], capture_output=True, check=False).stdout.strip() or "-" return active, sub_state, enabled def print_table(headers: list[str], rows: list[list[str]]) -> None: widths = [len(header) for header in headers] for row in rows: for index, value in enumerate(row): widths[index] = max(widths[index], len(value)) header_line = " ".join(header.ljust(widths[index]) for index, header in enumerate(headers)) print(header_line) print(" ".join("-" * widths[index] for index in range(len(headers)))) for row in rows: print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row))) def iter_port_rows() -> list[dict[str, str]]: rows = [ { "scope": "db", "name": "db", "port": str(channel_inventory.get_db()["port"]), "p2p_port": "-", "unit": channel_inventory.DB_UNIT, "visibility": "internal", }, { "scope": "auth", "name": "auth", "port": str(channel_inventory.get_auth()["port"]), "p2p_port": str(channel_inventory.get_auth()["p2p_port"]), "unit": channel_inventory.AUTH_UNIT, "visibility": "public", }, ] for channel in channel_inventory.iter_channels(): channel_id = int(channel["id"]) visibility = "public" if channel.get("public") else "internal" for core in sorted(channel["cores"], key=lambda item: int(item["id"])): core_id = int(core["id"]) instance = channel_inventory.instance_name(channel_id, core_id) rows.append( { "scope": f"channel:{channel_id}", "name": instance, "port": str(core["port"]), "p2p_port": str(core["p2p_port"]), "unit": channel_inventory.game_unit(instance), "visibility": visibility, } ) return rows def iter_core_files() -> list[Path]: return [path for path in sorted(RUNTIME_ROOT.glob("channels/**/core*")) if path.is_file()] def count_incident_bundles() -> int: if not INCIDENT_ROOT.exists(): return 0 return sum(1 for path in INCIDENT_ROOT.iterdir() if path.is_dir()) def git_summary(repo_path: Path) -> dict[str, object]: summary: dict[str, object] = {"path": str(repo_path), "present": repo_path.exists()} if not repo_path.exists(): return summary head = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--short", "HEAD"], repo_path, capture_output=True, check=False) branch = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--abbrev-ref", "HEAD"], repo_path, capture_output=True, check=False) status = run_as_repo_owner(["git", "-C", str(repo_path), "status", "--short"], repo_path, capture_output=True, check=False) summary.update( { "head": head.stdout.strip() or "unknown", "branch": branch.stdout.strip() or "unknown", "dirty": bool(status.stdout.strip()), "status_count": len([line for line in status.stdout.splitlines() if line.strip()]), } ) return summary def is_smoke_login(login: str) -> bool: lowered = login.lower() return lowered.startswith(SMOKE_LOGIN_PREFIXES) def parse_auth_timestamp(value: str) -> dt.datetime: return dt.datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f") def load_auth_activity(hours: int) -> list[dict[str, object]]: if not AUTH_SYSLOG_PATH.exists(): return [] cutoff = dt.datetime.now() - dt.timedelta(hours=hours) desc_ips: dict[str, str] = {} pending_by_desc: dict[str, dict[str, object]] = {} active_desc: str | None = None events: list[dict[str, object]] = [] with AUTH_SYSLOG_PATH.open("r", encoding="utf-8", errors="replace") as handle: for raw_line in handle: line = raw_line.replace("\x00", "").rstrip("\n") match = AUTH_LOG_LINE_RE.match(line) if not match: continue timestamp = parse_auth_timestamp(match.group("timestamp")) if timestamp < cutoff: continue message = match.group("message").strip() conn_match = AUTH_CONN_RE.search(message) if conn_match: desc_ips[conn_match.group("desc")] = conn_match.group("ip") continue invalid_login_match = AUTH_INVALID_LOGIN_RE.match(message) if invalid_login_match: login = invalid_login_match.group("login") events.append( { "time": timestamp, "login": login, "ip": desc_ips.get(invalid_login_match.group("desc"), "-"), "status": "failure", "reason": "INVALID_LOGIN_STRING", "smoke": is_smoke_login(login), } ) continue login_match = AUTH_LOGIN_RE.match(message) if login_match and " key " not in message: login = login_match.group("login").strip() desc = login_match.group("desc") pending_by_desc[desc] = { "time": timestamp, "login": login, "ip": desc_ips.get(desc, "-"), "smoke": is_smoke_login(login), } continue start_match = AUTH_START_RE.match(message) if start_match: active_desc = start_match.group("desc") continue failure_match = AUTH_FAILURE_RE.match(message) if failure_match and active_desc and active_desc in pending_by_desc: entry = pending_by_desc.pop(active_desc) entry.update({"status": "failure", "reason": failure_match.group(1)}) events.append(entry) active_desc = None continue success_direct_match = AUTH_SUCCESS_DIRECT_RE.match(message) if success_direct_match and active_desc and active_desc in pending_by_desc: entry = pending_by_desc.pop(active_desc) entry.update({"status": "success", "reason": "SUCCESS"}) events.append(entry) active_desc = None continue if message.startswith("QID_AUTH_LOGIN: SUCCESS") and active_desc and active_desc in pending_by_desc: entry = pending_by_desc.pop(active_desc) entry.update({"status": "success", "reason": "SUCCESS"}) events.append(entry) active_desc = None return events def summarize_auth_activity(hours: int, include_smoke: bool) -> dict[str, object]: events = load_auth_activity(hours) filtered = [event for event in events if include_smoke or not event["smoke"]] successes = [event for event in filtered if event["status"] == "success"] failures = [event for event in filtered if event["status"] == "failure"] reasons = collections.Counter(str(event["reason"]) for event in failures) return { "window_hours": hours, "include_smoke": include_smoke, "success_count": len(successes), "failure_count": len(failures), "failure_reasons": dict(reasons), "latest_success": successes[-1] if successes else None, "latest_failure": failures[-1] if failures else None, } def filter_auth_events(hours: int, include_smoke: bool, status: str) -> list[dict[str, object]]: events = load_auth_activity(hours) filtered = [event for event in events if include_smoke or not event["smoke"]] if status != "all": filtered = [event for event in filtered if event["status"] == status] return filtered def run_mariadb_query(query: str) -> list[list[str]]: completed = run(["mariadb", "-N", "-B", "-e", query], require_root=True, capture_output=True) rows: list[list[str]] = [] for line in completed.stdout.splitlines(): if not line.strip(): continue rows.append(line.split("\t")) return rows def fetch_recent_sessions(hours: int, limit: int, active_only: bool, include_orphans: bool) -> list[dict[str, str]]: where_clauses = [f"l.login_time >= NOW() - INTERVAL {int(hours)} HOUR"] if active_only: where_clauses.append("l.logout_time IS NULL") if not include_orphans: where_clauses.append("a.login IS NOT NULL") query = f""" SELECT DATE_FORMAT(l.login_time, '%Y-%m-%d %H:%i:%s'), COALESCE(DATE_FORMAT(l.logout_time, '%Y-%m-%d %H:%i:%s'), ''), l.type, COALESCE(a.login, ''), l.account_id, l.pid, COALESCE(INET_NTOA(l.ip), ''), COALESCE(l.client_version, '') FROM log.loginlog2 l LEFT JOIN account.account a ON a.id = l.account_id WHERE {' AND '.join(where_clauses)} ORDER BY l.id DESC LIMIT {int(limit)} """.strip() entries: list[dict[str, str]] = [] for row in run_mariadb_query(query): while len(row) < 8: row.append("") login_time, logout_time, raw_type, login, account_id, pid, ip, client_version = row[:8] entries.append( { "login_time": login_time, "logout_time": logout_time, "raw_type": raw_type, "session_state": "open" if not logout_time else "closed", "login": login or f"", "account_id": account_id, "pid": pid, "ip": ip or "-", "client_version": client_version or "-", } ) return entries def fetch_stale_sessions(hours: int, stale_minutes: int, limit: int, include_orphans: bool) -> list[dict[str, str]]: where_clauses = [ f"l.login_time >= NOW() - INTERVAL {int(hours)} HOUR", "l.logout_time IS NULL", f"TIMESTAMPDIFF(MINUTE, l.login_time, NOW()) >= {int(stale_minutes)}", ] if not include_orphans: where_clauses.append("a.login IS NOT NULL") query = f""" SELECT DATE_FORMAT(l.login_time, '%Y-%m-%d %H:%i:%s'), l.type, COALESCE(a.login, ''), l.account_id, l.pid, COALESCE(INET_NTOA(l.ip), ''), TIMESTAMPDIFF(MINUTE, l.login_time, NOW()) FROM log.loginlog2 l LEFT JOIN account.account a ON a.id = l.account_id WHERE {' AND '.join(where_clauses)} ORDER BY l.login_time DESC LIMIT {int(limit)} """.strip() entries: list[dict[str, str]] = [] for row in run_mariadb_query(query): while len(row) < 7: row.append("") login_time, raw_type, login, account_id, pid, ip, age_minutes = row[:7] entries.append( { "login_time": login_time, "raw_type": raw_type, "login": login or f"", "account_id": account_id, "pid": pid, "ip": ip or "-", "age_minutes": age_minutes or "0", } ) return entries def count_stale_sessions(hours: int, stale_minutes: int, include_orphans: bool) -> int: where_clauses = [ f"l.login_time >= NOW() - INTERVAL {int(hours)} HOUR", "l.logout_time IS NULL", f"TIMESTAMPDIFF(MINUTE, l.login_time, NOW()) >= {int(stale_minutes)}", ] if not include_orphans: where_clauses.append("a.login IS NOT NULL") query = f""" SELECT COUNT(*) FROM log.loginlog2 l LEFT JOIN account.account a ON a.id = l.account_id WHERE {' AND '.join(where_clauses)} """.strip() rows = run_mariadb_query(query) if not rows or not rows[0]: return 0 try: return int(rows[0][0]) except ValueError: return 0 def live_ports() -> set[int]: if shutil.which("ss") is None: return set() completed = run(["ss", "-ltnH"], capture_output=True, check=True) ports: set[int] = set() for line in completed.stdout.splitlines(): fields = line.split() if len(fields) < 4: continue local = fields[3] if ":" not in local: continue try: ports.add(int(local.rsplit(":", 1)[1])) except ValueError: continue return ports def print_inventory(as_json: bool) -> int: if as_json: print(json.dumps(channel_inventory.load_inventory(), indent=2)) return 0 rows: list[list[str]] = [] for channel in channel_inventory.iter_channels(): channel_id = int(channel["id"]) ports = ",".join(str(core["port"]) for core in sorted(channel["cores"], key=lambda item: int(item["id"]))) rows.append( [ str(channel_id), channel["name"], "yes" if channel.get("public") else "no", "yes" if channel.get("client_visible") else "no", str(len(channel["cores"])), ports, ] ) print_table(["channel", "name", "public", "visible", "cores", "ports"], rows) return 0 def print_units() -> int: rows = [ ["stack", channel_inventory.STACK_UNIT], ["db", channel_inventory.DB_UNIT], ["db-ready", channel_inventory.DB_READY_UNIT], ["auth", channel_inventory.AUTH_UNIT], ] for unit in channel_inventory.get_game_units(): rows.append(["game", unit]) print_table(["kind", "unit"], rows) return 0 def print_summary(hours: int, include_smoke: bool, as_json: bool) -> int: units = [ channel_inventory.STACK_UNIT, channel_inventory.DB_UNIT, channel_inventory.DB_READY_UNIT, channel_inventory.AUTH_UNIT, *channel_inventory.get_game_units(), ] unit_rows = [] for unit in units: active, sub_state, enabled = get_unit_state(unit) unit_rows.append({"unit": unit, "active": active, "sub": sub_state, "enabled": enabled}) game_units = [row for row in unit_rows if row["unit"].startswith("metin-game@")] enabled_game_units = [row for row in game_units if row["enabled"] == "enabled"] game_active = sum(1 for row in enabled_game_units if row["active"] == "active") listening = live_ports() port_rows = iter_port_rows() auth_summary = summarize_auth_activity(hours, include_smoke) stale_session_count = count_stale_sessions(hours=max(hours, 1), stale_minutes=30, include_orphans=False) stale_total_count = count_stale_sessions(hours=max(hours, 1), stale_minutes=30, include_orphans=True) stale_orphan_count = max(stale_total_count - stale_session_count, 0) repos = { "m2dev-server": git_summary(REPO_ROOT), "m2dev-server-src": git_summary(SOURCE_REPO_ROOT), } incident_count = count_incident_bundles() core_count = len(iter_core_files()) payload = { "repos": repos, "units": unit_rows, "game_active": game_active, "game_enabled": len(enabled_game_units), "game_declared": len(game_units), "ports": [ { **row, "live": int(row["port"]) in listening, } for row in port_rows ], "auth": auth_summary, "stale_open_sessions": { "user_count": stale_session_count, "orphan_count": stale_orphan_count, "total_count": stale_total_count, }, "core_count": core_count, "incident_count": incident_count, } if as_json: print(json.dumps(payload, indent=2, default=str)) return 0 repo_rows = [] for name, summary in repos.items(): repo_rows.append( [ name, str(summary.get("branch", "unknown")), str(summary.get("head", "unknown")), "yes" if summary.get("dirty") else "no", ] ) public_ports = [row for row in payload["ports"] if row["visibility"] == "public"] public_port_rows = [ [row["name"], row["port"], row["p2p_port"], "yes" if row["live"] else "no"] for row in public_ports ] print("Repos") print_table(["repo", "branch", "head", "dirty"], repo_rows) print() print("Runtime") print_table(["unit", "active", "sub", "enabled"], [[row["unit"], row["active"], row["sub"], row["enabled"]] for row in unit_rows[:4]]) print(f"game instances active: {game_active}/{len(enabled_game_units)} enabled ({len(game_units)} declared)") print(f"core files: {core_count}") print(f"incident bundles: {incident_count}") print(f"stale open sessions (>30m): {stale_session_count} user, {stale_orphan_count} orphan") print() print("Public Ports") print_table(["name", "port", "p2p", "live"], public_port_rows) print() print(f"Auth ({hours}h)") print(f"successes: {auth_summary['success_count']}") print(f"failures: {auth_summary['failure_count']}") if auth_summary["failure_reasons"]: reason_line = ", ".join(f"{reason}={count}" for reason, count in sorted(auth_summary["failure_reasons"].items())) print(f"failure reasons: {reason_line}") latest_success = auth_summary["latest_success"] if latest_success: print( f"latest success: {latest_success['time'].strftime('%Y-%m-%d %H:%M:%S')} " f"{latest_success['login']} from {latest_success['ip']}" ) latest_failure = auth_summary["latest_failure"] if latest_failure: print( f"latest failure: {latest_failure['time'].strftime('%Y-%m-%d %H:%M:%S')} " f"{latest_failure['login']} from {latest_failure['ip']} reason={latest_failure['reason']}" ) return 0 def print_auth_activity(hours: int, limit: int, status: str, include_smoke: bool, as_json: bool) -> int: events = filter_auth_events(hours, include_smoke, status) events = events[-limit:] payload = { "window_hours": hours, "limit": limit, "status": status, "include_smoke": include_smoke, "count": len(events), "entries": [ { "time": event["time"].strftime("%Y-%m-%d %H:%M:%S"), "status": event["status"], "login": event["login"], "ip": event["ip"], "reason": event["reason"], } for event in events ], } if as_json: print(json.dumps(payload, indent=2)) return 0 if not events: print(f"No auth activity in the last {hours}h for status={status}.") return 0 rows = [ [ event["time"].strftime("%Y-%m-%d %H:%M:%S"), str(event["status"]), str(event["login"]), str(event["ip"]), str(event["reason"]), ] for event in events ] print_table(["time", "status", "login", "ip", "reason"], rows) return 0 def print_auth_ips(hours: int, limit: int, include_smoke: bool, as_json: bool) -> int: events = filter_auth_events(hours, include_smoke, "all") grouped: dict[str, dict[str, object]] = {} for event in events: ip = str(event["ip"]) bucket = grouped.setdefault( ip, { "ip": ip, "success_count": 0, "failure_count": 0, "last_seen": event["time"], "last_login": str(event["login"]), "last_reason": str(event["reason"]), }, ) if event["status"] == "success": bucket["success_count"] = int(bucket["success_count"]) + 1 else: bucket["failure_count"] = int(bucket["failure_count"]) + 1 if event["time"] >= bucket["last_seen"]: bucket["last_seen"] = event["time"] bucket["last_login"] = str(event["login"]) bucket["last_reason"] = str(event["reason"]) rows = sorted( grouped.values(), key=lambda item: ( int(item["failure_count"]), int(item["success_count"]), item["last_seen"], ), reverse=True, )[:limit] payload = { "window_hours": hours, "limit": limit, "include_smoke": include_smoke, "count": len(rows), "entries": [ { "ip": str(row["ip"]), "success_count": int(row["success_count"]), "failure_count": int(row["failure_count"]), "last_seen": row["last_seen"].strftime("%Y-%m-%d %H:%M:%S"), "last_login": str(row["last_login"]), "last_reason": str(row["last_reason"]), } for row in rows ], } if as_json: print(json.dumps(payload, indent=2)) return 0 if not rows: print(f"No auth IP activity in the last {hours}h.") return 0 table_rows = [ [ str(row["ip"]), str(row["success_count"]), str(row["failure_count"]), row["last_seen"].strftime("%Y-%m-%d %H:%M:%S"), str(row["last_login"]), str(row["last_reason"]), ] for row in rows ] print_table(["ip", "success", "failure", "last_seen", "last_login", "last_reason"], table_rows) return 0 def resolve_target_units(target: str) -> list[str]: normalized = target.strip().lower() if normalized in {"all", "stack", "server"}: return [channel_inventory.STACK_UNIT] if normalized == "db": return [channel_inventory.DB_UNIT] if normalized in {"db-ready", "db_ready"}: return [channel_inventory.DB_READY_UNIT] if normalized == "auth": return [channel_inventory.AUTH_UNIT] if normalized in {"game", "games"}: return channel_inventory.get_game_units() if normalized.startswith("channel:"): channel_id = int(normalized.split(":", 1)[1]) return channel_inventory.get_game_units([channel_id]) if normalized.startswith("instance:"): return [channel_inventory.game_unit(target.split(":", 1)[1])] if normalized.startswith("channel") and "_core" in normalized: return [channel_inventory.game_unit(target)] raise SystemExit(f"Unknown target: {target}") def print_status(target: str) -> int: if target == "all": units = [ channel_inventory.STACK_UNIT, channel_inventory.DB_UNIT, channel_inventory.DB_READY_UNIT, channel_inventory.AUTH_UNIT, *channel_inventory.get_game_units(), ] else: units = resolve_target_units(target) rows: list[list[str]] = [] for unit in units: active, sub_state, enabled = get_unit_state(unit) rows.append([unit, active, sub_state, enabled]) print_table(["unit", "active", "sub", "enabled"], rows) return 0 def print_ports(show_live: bool) -> int: listening = live_ports() if show_live else set() headers = ["scope", "name", "port", "p2p", "visibility", "unit"] if show_live: headers.append("live") rows: list[list[str]] = [] for row in iter_port_rows(): values = [row["scope"], row["name"], row["port"], row["p2p_port"], row["visibility"], row["unit"]] if show_live: values.append("yes" if int(row["port"]) in listening else "no") rows.append(values) print_table(headers, rows) return 0 def print_cores(as_json: bool) -> int: entries = [] for path in iter_core_files(): stat = path.stat() entries.append( { "path": str(path), "relative_path": str(path.relative_to(RUNTIME_ROOT)), "size_bytes": stat.st_size, "mtime_epoch": int(stat.st_mtime), } ) if as_json: print(json.dumps(entries, indent=2)) return 0 if not entries: print("No core files found under the runtime tree.") return 0 rows = [[entry["relative_path"], str(entry["size_bytes"]), str(entry["mtime_epoch"])] for entry in entries] print_table(["path", "size_bytes", "mtime_epoch"], rows) return 0 def print_incidents(limit: int) -> int: if not INCIDENT_ROOT.exists(): print(f"No incident directory: {INCIDENT_ROOT}") return 0 bundles = sorted((path for path in INCIDENT_ROOT.iterdir() if path.is_dir()), reverse=True)[:limit] if not bundles: print(f"No incident bundles in {INCIDENT_ROOT}") return 0 rows = [[bundle.name, str(bundle)] for bundle in bundles] print_table(["bundle", "path"], rows) return 0 def run_unit_action(action: str, target: str) -> int: units = resolve_target_units(target) run(["systemctl", action, *units], require_root=True) return 0 def run_logs(target: str, lines: int, follow: bool) -> int: units = resolve_target_units(target) command = ["journalctl", "--no-pager", f"-n{lines}"] for unit in units: command.extend(["-u", unit]) if follow: command = ["journalctl", f"-n{lines}", "-f", *sum((["-u", unit] for unit in units), [])] run(command, require_root=True) return 0 def print_auth_failures(hours: int, limit: int, include_smoke: bool, as_json: bool) -> int: events = load_auth_activity(hours) failures = [event for event in events if event["status"] == "failure" and (include_smoke or not event["smoke"])] failures = failures[-limit:] reason_counts = collections.Counter(str(event["reason"]) for event in failures) payload = { "window_hours": hours, "limit": limit, "include_smoke": include_smoke, "count": len(failures), "reasons": dict(reason_counts), "entries": [ { "time": event["time"].strftime("%Y-%m-%d %H:%M:%S"), "login": event["login"], "ip": event["ip"], "reason": event["reason"], } for event in failures ], } if as_json: print(json.dumps(payload, indent=2)) return 0 if not failures: print(f"No auth failures in the last {hours}h.") return 0 if reason_counts: print(", ".join(f"{reason}={count}" for reason, count in sorted(reason_counts.items()))) print() rows = [ [event["time"].strftime("%Y-%m-%d %H:%M:%S"), str(event["login"]), str(event["ip"]), str(event["reason"])] for event in failures ] print_table(["time", "login", "ip", "reason"], rows) return 0 def print_sessions(hours: int, limit: int, active_only: bool, include_orphans: bool, as_json: bool) -> int: entries = fetch_recent_sessions(hours, limit, active_only, include_orphans) payload = { "window_hours": hours, "limit": limit, "active_only": active_only, "include_orphans": include_orphans, "count": len(entries), "entries": entries, } if as_json: print(json.dumps(payload, indent=2)) return 0 if not entries: print(f"No sessions in the last {hours}h.") return 0 rows = [ [ entry["login_time"], entry["logout_time"] or "-", entry["session_state"], entry["login"], entry["account_id"], entry["pid"], entry["ip"], ] for entry in entries ] print_table(["login_time", "logout_time", "state", "login", "account", "pid", "ip"], rows) return 0 def print_session_audit(hours: int, stale_minutes: int, limit: int, include_orphans: bool, as_json: bool) -> int: entries = fetch_stale_sessions(hours, stale_minutes, limit, include_orphans) payload = { "window_hours": hours, "stale_minutes": stale_minutes, "limit": limit, "include_orphans": include_orphans, "count": len(entries), "entries": entries, } if as_json: print(json.dumps(payload, indent=2)) return 0 if not entries: print(f"No stale open sessions older than {stale_minutes} minutes in the last {hours}h.") return 0 rows = [ [ entry["login_time"], entry["age_minutes"], entry["login"], entry["account_id"], entry["pid"], entry["ip"], entry["raw_type"], ] for entry in entries ] print_table(["login_time", "age_min", "login", "account", "pid", "ip", "raw_type"], rows) return 0 def run_healthcheck(mode: str) -> int: if not HEALTHCHECK_PATH.exists(): raise SystemExit(f"Missing healthcheck wrapper: {HEALTHCHECK_PATH}") run([str(HEALTHCHECK_PATH), "--mode", mode], require_root=True) return 0 def run_wait_ready(timeout_seconds: int, interval_seconds: float) -> int: if not HEALTHCHECK_PATH.exists(): raise SystemExit(f"Missing healthcheck wrapper: {HEALTHCHECK_PATH}") deadline = time.time() + timeout_seconds attempt = 0 last_returncode = 1 while time.time() < deadline: attempt += 1 print(f"Healthcheck attempt {attempt}...") completed = subprocess.run( build_command([str(HEALTHCHECK_PATH), "--mode", "ready"], require_root=True), check=False, text=True, ) if completed.returncode == 0: return 0 last_returncode = completed.returncode remaining = deadline - time.time() if remaining <= 0: break time.sleep(min(interval_seconds, remaining)) raise SystemExit(f"Timed out waiting for login-ready state. Last healthcheck exit code: {last_returncode}") def run_incident_collect(tag: str, since: str, include_cores: bool) -> int: if not INCIDENT_COLLECTOR_PATH.exists(): raise SystemExit(f"Missing incident collector: {INCIDENT_COLLECTOR_PATH}") command = [str(INCIDENT_COLLECTOR_PATH), "--tag", tag, "--since", since] if include_cores: command.append("--include-cores") run(command, require_root=True) return 0 def main() -> int: args = parse_args() if args.command == "inventory": return print_inventory(args.json) if args.command == "units": return print_units() if args.command == "summary": return print_summary(args.hours, args.include_smoke, args.json) if args.command == "auth-activity": return print_auth_activity(args.hours, args.limit, args.status, args.include_smoke, args.json) if args.command == "auth-ips": return print_auth_ips(args.hours, args.limit, args.include_smoke, args.json) if args.command == "status": return print_status(args.target) if args.command == "ports": return print_ports(args.live) if args.command == "cores": return print_cores(args.json) if args.command == "incidents": return print_incidents(args.limit) if args.command == "auth-failures": return print_auth_failures(args.hours, args.limit, args.include_smoke, args.json) if args.command == "sessions": return print_sessions(args.hours, args.limit, args.active_only, args.include_orphans, args.json) if args.command == "session-audit": return print_session_audit(args.hours, args.stale_minutes, args.limit, args.include_orphans, args.json) if args.command in {"start", "stop", "restart"}: return run_unit_action(args.command, args.target) if args.command == "logs": return run_logs(args.target, args.lines, args.follow) if args.command == "incident-collect": return run_incident_collect(args.tag, args.since, args.include_cores) if args.command == "healthcheck": return run_healthcheck(args.mode) if args.command == "wait-ready": return run_wait_ready(args.timeout, args.interval) raise SystemExit(f"Unsupported command: {args.command}") if __name__ == "__main__": raise SystemExit(main())