diff --git a/deploy/systemd/bin/metinctl.in b/deploy/systemd/bin/metinctl.in index 8864d0b..dd10ca1 100644 --- a/deploy/systemd/bin/metinctl.in +++ b/deploy/systemd/bin/metinctl.in @@ -2,8 +2,12 @@ from __future__ import annotations import argparse +import collections +import datetime as dt import json import os +import pwd +import re import shutil import subprocess import sys @@ -15,12 +19,24 @@ RUNTIME_ROOT = Path("{{RUNTIME_ROOT}}") HEALTHCHECK_PATH = Path("/usr/local/sbin/metin-login-healthcheck") INCIDENT_COLLECTOR_PATH = Path("/usr/local/sbin/metin-collect-incident") INCIDENT_ROOT = Path("/var/lib/metin/incidents") +AUTH_SYSLOG_PATH = RUNTIME_ROOT / "channels" / "auth" / "syslog.log" +SOURCE_REPO_ROOT = RUNTIME_ROOT.parent.parent / "repos" / "m2dev-server-src" sys.path.insert(0, str(REPO_ROOT)) import channel_inventory +AUTH_LOG_LINE_RE = re.compile(r"^\[(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[[^]]+\] (?P.*)$") +AUTH_CONN_RE = re.compile(r"SYSTEM: new connection from \[(?P[^]]+)\].* ptr (?P0x[0-9a-fA-F]+)") +AUTH_LOGIN_RE = re.compile(r"InputAuth::Login : (?P[^(]+)\(\d+\) desc (?P0x[0-9a-fA-F]+)") +AUTH_INVALID_LOGIN_RE = re.compile(r"InputAuth::Login : IS_NOT_VALID_LOGIN_STRING\((?P[^)]+)\) desc (?P0x[0-9a-fA-F]+)") +AUTH_START_RE = re.compile(r"(?:AUTH_LOGIN_DIRECT|QID_AUTH_LOGIN): START \d+ (?P0x[0-9a-fA-F]+)") +AUTH_SUCCESS_DIRECT_RE = re.compile(r"AUTH_LOGIN_DIRECT: SUCCESS (?P.+)$") +AUTH_FAILURE_RE = re.compile(r"^(NOID|WRONGPWD|NOTAVAIL|ALREADY)$") +SMOKE_LOGIN_PREFIXES = ("smk", "smkhc", "smkdel", "smkfull", "smkneg", "smoke_", "csmk") + + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Operational CLI for the Debian Metin runtime") subparsers = parser.add_subparsers(dest="command", required=True) @@ -30,6 +46,11 @@ def parse_args() -> argparse.Namespace: subparsers.add_parser("units", help="List managed systemd units") + summary_parser = subparsers.add_parser("summary", help="Show an operational summary") + summary_parser.add_argument("--hours", type=int, default=24, help="Auth activity window in hours") + summary_parser.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins in auth summary") + summary_parser.add_argument("--json", action="store_true", help="Print raw JSON") + status_parser = subparsers.add_parser("status", help="Show current unit state") status_parser.add_argument("target", nargs="?", default="all", help="stack, db, auth, game, channel:, instance:") @@ -56,6 +77,12 @@ def parse_args() -> argparse.Namespace: incident_collect.add_argument("--since", default="-30 minutes", help="journalctl --since value") incident_collect.add_argument("--include-cores", action="store_true", help="Copy matching core files into the bundle") + auth_failures = subparsers.add_parser("auth-failures", help="Show recent auth failures from auth syslog") + auth_failures.add_argument("--hours", type=int, default=24, help="How many hours back to inspect") + auth_failures.add_argument("--limit", type=int, default=20, help="Maximum failures to show") + auth_failures.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins") + auth_failures.add_argument("--json", action="store_true", help="Print raw JSON") + wait_ready = subparsers.add_parser("wait-ready", help="Wait until the runtime passes the login-ready probe") wait_ready.add_argument("--timeout", type=int, default=120, help="Maximum seconds to wait") wait_ready.add_argument("--interval", type=float, default=5.0, help="Seconds between healthcheck attempts") @@ -80,6 +107,22 @@ def run(command: list[str], require_root: bool = False, capture_output: bool = F ) +def run_as_repo_owner(command: list[str], repo_path: Path, capture_output: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]: + effective_command = list(command) + if repo_path.exists(): + owner_uid = repo_path.stat().st_uid + if os.geteuid() == 0 and owner_uid != 0: + owner_name = pwd.getpwuid(owner_uid).pw_name + effective_command = ["sudo", "-u", owner_name, *effective_command] + + return subprocess.run( + effective_command, + check=check, + capture_output=capture_output, + text=True, + ) + + def get_unit_state(unit: str) -> tuple[str, str, str]: active = run(["systemctl", "is-active", unit], capture_output=True, check=False).stdout.strip() or "unknown" enabled = run(["systemctl", "is-enabled", unit], capture_output=True, check=False).stdout.strip() or "unknown" @@ -144,6 +187,144 @@ def iter_core_files() -> list[Path]: return [path for path in sorted(RUNTIME_ROOT.glob("channels/**/core*")) if path.is_file()] +def count_incident_bundles() -> int: + if not INCIDENT_ROOT.exists(): + return 0 + return sum(1 for path in INCIDENT_ROOT.iterdir() if path.is_dir()) + + +def git_summary(repo_path: Path) -> dict[str, object]: + summary: dict[str, object] = {"path": str(repo_path), "present": repo_path.exists()} + if not repo_path.exists(): + return summary + + head = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--short", "HEAD"], repo_path, capture_output=True, check=False) + branch = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--abbrev-ref", "HEAD"], repo_path, capture_output=True, check=False) + status = run_as_repo_owner(["git", "-C", str(repo_path), "status", "--short"], repo_path, capture_output=True, check=False) + + summary.update( + { + "head": head.stdout.strip() or "unknown", + "branch": branch.stdout.strip() or "unknown", + "dirty": bool(status.stdout.strip()), + "status_count": len([line for line in status.stdout.splitlines() if line.strip()]), + } + ) + return summary + + +def is_smoke_login(login: str) -> bool: + lowered = login.lower() + return lowered.startswith(SMOKE_LOGIN_PREFIXES) + + +def parse_auth_timestamp(value: str) -> dt.datetime: + return dt.datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f") + + +def load_auth_activity(hours: int) -> list[dict[str, object]]: + if not AUTH_SYSLOG_PATH.exists(): + return [] + + cutoff = dt.datetime.now() - dt.timedelta(hours=hours) + desc_ips: dict[str, str] = {} + pending_by_desc: dict[str, dict[str, object]] = {} + active_desc: str | None = None + events: list[dict[str, object]] = [] + + with AUTH_SYSLOG_PATH.open("r", encoding="utf-8", errors="replace") as handle: + for raw_line in handle: + line = raw_line.replace("\x00", "").rstrip("\n") + match = AUTH_LOG_LINE_RE.match(line) + if not match: + continue + + timestamp = parse_auth_timestamp(match.group("timestamp")) + if timestamp < cutoff: + continue + + message = match.group("message").strip() + + conn_match = AUTH_CONN_RE.search(message) + if conn_match: + desc_ips[conn_match.group("desc")] = conn_match.group("ip") + continue + + invalid_login_match = AUTH_INVALID_LOGIN_RE.match(message) + if invalid_login_match: + login = invalid_login_match.group("login") + events.append( + { + "time": timestamp, + "login": login, + "ip": desc_ips.get(invalid_login_match.group("desc"), "-"), + "status": "failure", + "reason": "INVALID_LOGIN_STRING", + "smoke": is_smoke_login(login), + } + ) + continue + + login_match = AUTH_LOGIN_RE.match(message) + if login_match and " key " not in message: + login = login_match.group("login").strip() + desc = login_match.group("desc") + pending_by_desc[desc] = { + "time": timestamp, + "login": login, + "ip": desc_ips.get(desc, "-"), + "smoke": is_smoke_login(login), + } + continue + + start_match = AUTH_START_RE.match(message) + if start_match: + active_desc = start_match.group("desc") + continue + + failure_match = AUTH_FAILURE_RE.match(message) + if failure_match and active_desc and active_desc in pending_by_desc: + entry = pending_by_desc.pop(active_desc) + entry.update({"status": "failure", "reason": failure_match.group(1)}) + events.append(entry) + active_desc = None + continue + + success_direct_match = AUTH_SUCCESS_DIRECT_RE.match(message) + if success_direct_match and active_desc and active_desc in pending_by_desc: + entry = pending_by_desc.pop(active_desc) + entry.update({"status": "success", "reason": "SUCCESS"}) + events.append(entry) + active_desc = None + continue + + if message.startswith("QID_AUTH_LOGIN: SUCCESS") and active_desc and active_desc in pending_by_desc: + entry = pending_by_desc.pop(active_desc) + entry.update({"status": "success", "reason": "SUCCESS"}) + events.append(entry) + active_desc = None + + return events + + +def summarize_auth_activity(hours: int, include_smoke: bool) -> dict[str, object]: + events = load_auth_activity(hours) + filtered = [event for event in events if include_smoke or not event["smoke"]] + successes = [event for event in filtered if event["status"] == "success"] + failures = [event for event in filtered if event["status"] == "failure"] + reasons = collections.Counter(str(event["reason"]) for event in failures) + + return { + "window_hours": hours, + "include_smoke": include_smoke, + "success_count": len(successes), + "failure_count": len(failures), + "failure_reasons": dict(reasons), + "latest_success": successes[-1] if successes else None, + "latest_failure": failures[-1] if failures else None, + } + + def live_ports() -> set[int]: if shutil.which("ss") is None: return set() @@ -201,6 +382,107 @@ def print_units() -> int: return 0 +def print_summary(hours: int, include_smoke: bool, as_json: bool) -> int: + units = [ + channel_inventory.STACK_UNIT, + channel_inventory.DB_UNIT, + channel_inventory.DB_READY_UNIT, + channel_inventory.AUTH_UNIT, + *channel_inventory.get_game_units(), + ] + unit_rows = [] + for unit in units: + active, sub_state, enabled = get_unit_state(unit) + unit_rows.append({"unit": unit, "active": active, "sub": sub_state, "enabled": enabled}) + + game_units = [row for row in unit_rows if row["unit"].startswith("metin-game@")] + enabled_game_units = [row for row in game_units if row["enabled"] == "enabled"] + game_active = sum(1 for row in enabled_game_units if row["active"] == "active") + listening = live_ports() + port_rows = iter_port_rows() + auth_summary = summarize_auth_activity(hours, include_smoke) + repos = { + "m2dev-server": git_summary(REPO_ROOT), + "m2dev-server-src": git_summary(SOURCE_REPO_ROOT), + } + incident_count = count_incident_bundles() + core_count = len(iter_core_files()) + + payload = { + "repos": repos, + "units": unit_rows, + "game_active": game_active, + "game_enabled": len(enabled_game_units), + "game_declared": len(game_units), + "ports": [ + { + **row, + "live": int(row["port"]) in listening, + } + for row in port_rows + ], + "auth": auth_summary, + "core_count": core_count, + "incident_count": incident_count, + } + + if as_json: + print(json.dumps(payload, indent=2, default=str)) + return 0 + + repo_rows = [] + for name, summary in repos.items(): + repo_rows.append( + [ + name, + str(summary.get("branch", "unknown")), + str(summary.get("head", "unknown")), + "yes" if summary.get("dirty") else "no", + ] + ) + + public_ports = [row for row in payload["ports"] if row["visibility"] == "public"] + public_port_rows = [ + [row["name"], row["port"], row["p2p_port"], "yes" if row["live"] else "no"] + for row in public_ports + ] + + print("Repos") + print_table(["repo", "branch", "head", "dirty"], repo_rows) + print() + + print("Runtime") + print_table(["unit", "active", "sub", "enabled"], [[row["unit"], row["active"], row["sub"], row["enabled"]] for row in unit_rows[:4]]) + print(f"game instances active: {game_active}/{len(enabled_game_units)} enabled ({len(game_units)} declared)") + print(f"core files: {core_count}") + print(f"incident bundles: {incident_count}") + print() + + print("Public Ports") + print_table(["name", "port", "p2p", "live"], public_port_rows) + print() + + print(f"Auth ({hours}h)") + print(f"successes: {auth_summary['success_count']}") + print(f"failures: {auth_summary['failure_count']}") + if auth_summary["failure_reasons"]: + reason_line = ", ".join(f"{reason}={count}" for reason, count in sorted(auth_summary["failure_reasons"].items())) + print(f"failure reasons: {reason_line}") + latest_success = auth_summary["latest_success"] + if latest_success: + print( + f"latest success: {latest_success['time'].strftime('%Y-%m-%d %H:%M:%S')} " + f"{latest_success['login']} from {latest_success['ip']}" + ) + latest_failure = auth_summary["latest_failure"] + if latest_failure: + print( + f"latest failure: {latest_failure['time'].strftime('%Y-%m-%d %H:%M:%S')} " + f"{latest_failure['login']} from {latest_failure['ip']} reason={latest_failure['reason']}" + ) + return 0 + + def resolve_target_units(target: str) -> list[str]: normalized = target.strip().lower() @@ -317,6 +599,49 @@ def run_logs(target: str, lines: int, follow: bool) -> int: return 0 +def print_auth_failures(hours: int, limit: int, include_smoke: bool, as_json: bool) -> int: + events = load_auth_activity(hours) + failures = [event for event in events if event["status"] == "failure" and (include_smoke or not event["smoke"])] + failures = failures[-limit:] + reason_counts = collections.Counter(str(event["reason"]) for event in failures) + + payload = { + "window_hours": hours, + "limit": limit, + "include_smoke": include_smoke, + "count": len(failures), + "reasons": dict(reason_counts), + "entries": [ + { + "time": event["time"].strftime("%Y-%m-%d %H:%M:%S"), + "login": event["login"], + "ip": event["ip"], + "reason": event["reason"], + } + for event in failures + ], + } + + if as_json: + print(json.dumps(payload, indent=2)) + return 0 + + if not failures: + print(f"No auth failures in the last {hours}h.") + return 0 + + if reason_counts: + print(", ".join(f"{reason}={count}" for reason, count in sorted(reason_counts.items()))) + print() + + rows = [ + [event["time"].strftime("%Y-%m-%d %H:%M:%S"), str(event["login"]), str(event["ip"]), str(event["reason"])] + for event in failures + ] + print_table(["time", "login", "ip", "reason"], rows) + return 0 + + def run_healthcheck(mode: str) -> int: if not HEALTHCHECK_PATH.exists(): raise SystemExit(f"Missing healthcheck wrapper: {HEALTHCHECK_PATH}") @@ -369,6 +694,8 @@ def main() -> int: return print_inventory(args.json) if args.command == "units": return print_units() + if args.command == "summary": + return print_summary(args.hours, args.include_smoke, args.json) if args.command == "status": return print_status(args.target) if args.command == "ports": @@ -377,6 +704,8 @@ def main() -> int: return print_cores(args.json) if args.command == "incidents": return print_incidents(args.limit) + if args.command == "auth-failures": + return print_auth_failures(args.hours, args.limit, args.include_smoke, args.json) if args.command in {"start", "stop", "restart"}: return run_unit_action(args.command, args.target) if args.command == "logs": diff --git a/docs/server-management.md b/docs/server-management.md index e2c68b6..dac697a 100644 --- a/docs/server-management.md +++ b/docs/server-management.md @@ -32,10 +32,12 @@ The Debian deployment installs: `metinctl` is a lightweight operational CLI for: +- showing an operational summary - viewing inventory - listing managed units - checking service status - listing declared ports +- listing recent auth failures - restarting the whole stack or specific channels/instances - viewing logs - listing core files in the runtime tree @@ -57,12 +59,30 @@ Show current unit state: metinctl status ``` +Show a quick operational summary: + +```bash +metinctl summary +``` + Show declared ports and whether they are currently listening: ```bash metinctl ports --live ``` +Show recent real auth failures and skip smoke-test logins: + +```bash +metinctl auth-failures +``` + +Include smoke-test failures too: + +```bash +metinctl auth-failures --include-smoke +``` + Restart only channel 1 cores: ```bash