Files
m2dev-server/deploy/systemd/bin/metinctl.in
2026-04-14 17:01:50 +02:00

1263 lines
47 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import collections
import datetime as dt
import json
import os
import pwd
import re
import shutil
import subprocess
import sys
import time
from pathlib import Path
REPO_ROOT = Path("{{REPO_ROOT}}")
RUNTIME_ROOT = Path("{{RUNTIME_ROOT}}")
HEALTHCHECK_PATH = Path("/usr/local/sbin/metin-login-healthcheck")
INCIDENT_COLLECTOR_PATH = Path("/usr/local/sbin/metin-collect-incident")
CORE_BACKTRACE_PATH = Path("/usr/local/sbin/metin-core-backtrace")
INCIDENT_ROOT = Path("/var/lib/metin/incidents")
AUTH_SYSLOG_PATH = RUNTIME_ROOT / "channels" / "auth" / "syslog.log"
SOURCE_REPO_ROOT = RUNTIME_ROOT.parent.parent / "repos" / "m2dev-server-src"
SYSERR_GLOB = "channels/**/syserr.log"
sys.path.insert(0, str(REPO_ROOT))
import channel_inventory
AUTH_LOG_LINE_RE = re.compile(r"^\[(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[[^]]+\] (?P<message>.*)$")
GENERIC_LOG_LINE_RE = re.compile(r"^\[(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[(?P<level>[^]]+)\] (?P<message>.*)$")
AUTH_CONN_RE = re.compile(r"SYSTEM: new connection from \[(?P<ip>[^]]+)\].* ptr (?P<desc>0x[0-9a-fA-F]+)")
AUTH_LOGIN_RE = re.compile(r"InputAuth::Login : (?P<login>[^(]+)\(\d+\) desc (?P<desc>0x[0-9a-fA-F]+)")
AUTH_INVALID_LOGIN_RE = re.compile(r"InputAuth::Login : IS_NOT_VALID_LOGIN_STRING\((?P<login>[^)]+)\) desc (?P<desc>0x[0-9a-fA-F]+)")
AUTH_START_RE = re.compile(r"(?:AUTH_LOGIN_DIRECT|QID_AUTH_LOGIN): START \d+ (?P<desc>0x[0-9a-fA-F]+)")
AUTH_SUCCESS_DIRECT_RE = re.compile(r"AUTH_LOGIN_DIRECT: SUCCESS (?P<login>.+)$")
AUTH_FAILURE_RE = re.compile(r"^(NOID|WRONGPWD|NOTAVAIL|ALREADY)$")
SMOKE_LOGIN_PREFIXES = ("smk", "smkhc", "smkdel", "smkfull", "smkneg", "smoke_", "csmk")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Operational CLI for the Debian Metin runtime")
subparsers = parser.add_subparsers(dest="command", required=True)
inventory_parser = subparsers.add_parser("inventory", help="Show declared channel inventory")
inventory_parser.add_argument("--json", action="store_true", help="Print raw JSON")
subparsers.add_parser("units", help="List managed systemd units")
summary_parser = subparsers.add_parser("summary", help="Show an operational summary")
summary_parser.add_argument("--hours", type=int, default=24, help="Auth activity window in hours")
summary_parser.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins in auth summary")
summary_parser.add_argument("--json", action="store_true", help="Print raw JSON")
auth_activity = subparsers.add_parser("auth-activity", help="Show recent auth success/failure activity")
auth_activity.add_argument("--hours", type=int, default=24, help="How many hours back to inspect")
auth_activity.add_argument("--limit", type=int, default=30, help="Maximum events to show")
auth_activity.add_argument("--status", choices=("all", "success", "failure"), default="all", help="Filter by auth result")
auth_activity.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins")
auth_activity.add_argument("--json", action="store_true", help="Print raw JSON")
auth_ips = subparsers.add_parser("auth-ips", help="Summarize auth activity by source IP")
auth_ips.add_argument("--hours", type=int, default=24, help="How many hours back to inspect")
auth_ips.add_argument("--limit", type=int, default=20, help="Maximum IPs to show")
auth_ips.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins")
auth_ips.add_argument("--json", action="store_true", help="Print raw JSON")
recent_errors = subparsers.add_parser("recent-errors", help="Show recent syserr entries across runtime components")
recent_errors.add_argument("--hours", type=int, default=24, help="How many hours back to inspect")
recent_errors.add_argument("--limit", type=int, default=30, help="Maximum errors to show")
recent_errors.add_argument("--json", action="store_true", help="Print raw JSON")
error_summary = subparsers.add_parser("error-summary", help="Summarize recurring syserr entries")
error_summary.add_argument("--hours", type=int, default=24, help="How many hours back to inspect")
error_summary.add_argument("--limit", type=int, default=20, help="Maximum grouped errors to show")
error_summary.add_argument("--json", action="store_true", help="Print raw JSON")
sessions = subparsers.add_parser("sessions", help="Show recent login sessions from loginlog2")
sessions.add_argument("--hours", type=int, default=24, help="How many hours back to inspect")
sessions.add_argument("--limit", type=int, default=20, help="Maximum sessions to show")
sessions.add_argument("--active-only", action="store_true", help="Show only sessions without logout_time")
sessions.add_argument("--include-orphans", action="store_true", help="Include rows whose account login no longer exists")
sessions.add_argument("--json", action="store_true", help="Print raw JSON")
session_audit = subparsers.add_parser("session-audit", help="Show stale open sessions without logout")
session_audit.add_argument("--hours", type=int, default=72, help="How many hours back to inspect")
session_audit.add_argument("--stale-minutes", type=int, default=30, help="Minimum age for an open session to be considered stale")
session_audit.add_argument("--limit", type=int, default=20, help="Maximum sessions to show")
session_audit.add_argument("--include-orphans", action="store_true", help="Include rows whose account login no longer exists")
session_audit.add_argument("--json", action="store_true", help="Print raw JSON")
status_parser = subparsers.add_parser("status", help="Show current unit state")
status_parser.add_argument("target", nargs="?", default="all", help="stack, db, auth, game, channel:<id>, instance:<name>")
ports_parser = subparsers.add_parser("ports", help="Show declared listener ports")
ports_parser.add_argument("--live", action="store_true", help="Also show whether the port is currently listening")
for action in ("start", "stop", "restart"):
action_parser = subparsers.add_parser(action, help=f"{action.title()} a managed target")
action_parser.add_argument("target", help="stack, db, auth, game, channel:<id>, instance:<name>")
logs_parser = subparsers.add_parser("logs", help="Show journalctl logs for a managed target")
logs_parser.add_argument("target", help="stack, db, auth, game, channel:<id>, instance:<name>")
logs_parser.add_argument("-n", "--lines", type=int, default=100, help="Number of journal lines")
logs_parser.add_argument("-f", "--follow", action="store_true", help="Follow the journal")
cores_parser = subparsers.add_parser("cores", help="List core files under the runtime tree")
cores_parser.add_argument("--json", action="store_true", help="Print raw JSON")
incidents_parser = subparsers.add_parser("incidents", help="List collected incident bundles")
incidents_parser.add_argument("--limit", type=int, default=10, help="Maximum number of bundles to show")
incident_collect = subparsers.add_parser("incident-collect", help="Collect an incident bundle")
incident_collect.add_argument("--tag", default="manual", help="Short incident tag")
incident_collect.add_argument("--since", default="-30 minutes", help="journalctl --since value")
incident_collect.add_argument("--include-cores", action="store_true", help="Copy matching core files into the bundle")
backtrace = subparsers.add_parser("backtrace", help="Generate a backtrace for the newest or selected core file")
backtrace.add_argument("--core", help="Core file path. Defaults to the newest core in the runtime tree.")
backtrace.add_argument("--exe", help="Executable path override.")
auth_failures = subparsers.add_parser("auth-failures", help="Show recent auth failures from auth syslog")
auth_failures.add_argument("--hours", type=int, default=24, help="How many hours back to inspect")
auth_failures.add_argument("--limit", type=int, default=20, help="Maximum failures to show")
auth_failures.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins")
auth_failures.add_argument("--json", action="store_true", help="Print raw JSON")
wait_ready = subparsers.add_parser("wait-ready", help="Wait until the runtime passes the login-ready probe")
wait_ready.add_argument("--timeout", type=int, default=120, help="Maximum seconds to wait")
wait_ready.add_argument("--interval", type=float, default=5.0, help="Seconds between healthcheck attempts")
healthcheck = subparsers.add_parser("healthcheck", help="Run the root-only headless healthcheck")
healthcheck.add_argument("--mode", choices=("ready", "full"), default="full", help="Healthcheck depth")
return parser.parse_args()
def build_command(command: list[str], require_root: bool = False) -> list[str]:
if require_root and os.geteuid() != 0:
return ["sudo", *command]
return command
def run(command: list[str], require_root: bool = False, capture_output: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]:
return subprocess.run(
build_command(command, require_root=require_root),
check=check,
capture_output=capture_output,
text=True,
)
def run_as_repo_owner(command: list[str], repo_path: Path, capture_output: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]:
effective_command = list(command)
if repo_path.exists():
owner_uid = repo_path.stat().st_uid
if os.geteuid() == 0 and owner_uid != 0:
owner_name = pwd.getpwuid(owner_uid).pw_name
effective_command = ["sudo", "-u", owner_name, *effective_command]
return subprocess.run(
effective_command,
check=check,
capture_output=capture_output,
text=True,
)
def get_unit_state(unit: str) -> tuple[str, str, str]:
active = run(["systemctl", "is-active", unit], capture_output=True, check=False).stdout.strip() or "unknown"
enabled = run(["systemctl", "is-enabled", unit], capture_output=True, check=False).stdout.strip() or "unknown"
sub_state = run(["systemctl", "show", unit, "--property=SubState", "--value"], capture_output=True, check=False).stdout.strip() or "-"
return active, sub_state, enabled
def print_table(headers: list[str], rows: list[list[str]]) -> None:
widths = [len(header) for header in headers]
for row in rows:
for index, value in enumerate(row):
widths[index] = max(widths[index], len(value))
header_line = " ".join(header.ljust(widths[index]) for index, header in enumerate(headers))
print(header_line)
print(" ".join("-" * widths[index] for index in range(len(headers))))
for row in rows:
print(" ".join(value.ljust(widths[index]) for index, value in enumerate(row)))
def iter_port_rows() -> list[dict[str, str]]:
rows = [
{
"scope": "db",
"name": "db",
"port": str(channel_inventory.get_db()["port"]),
"p2p_port": "-",
"unit": channel_inventory.DB_UNIT,
"visibility": "internal",
},
{
"scope": "auth",
"name": "auth",
"port": str(channel_inventory.get_auth()["port"]),
"p2p_port": str(channel_inventory.get_auth()["p2p_port"]),
"unit": channel_inventory.AUTH_UNIT,
"visibility": "public",
},
]
for channel in channel_inventory.iter_channels():
channel_id = int(channel["id"])
visibility = "public" if channel.get("public") else "internal"
for core in sorted(channel["cores"], key=lambda item: int(item["id"])):
core_id = int(core["id"])
instance = channel_inventory.instance_name(channel_id, core_id)
rows.append(
{
"scope": f"channel:{channel_id}",
"name": instance,
"port": str(core["port"]),
"p2p_port": str(core["p2p_port"]),
"unit": channel_inventory.game_unit(instance),
"visibility": visibility,
}
)
return rows
def iter_core_files() -> list[Path]:
return [path for path in sorted(RUNTIME_ROOT.glob("channels/**/core*")) if path.is_file()]
def iter_syserr_files() -> list[Path]:
return sorted(path for path in RUNTIME_ROOT.glob(SYSERR_GLOB) if path.is_file())
def count_incident_bundles() -> int:
if not INCIDENT_ROOT.exists():
return 0
return sum(1 for path in INCIDENT_ROOT.iterdir() if path.is_dir())
def git_summary(repo_path: Path) -> dict[str, object]:
summary: dict[str, object] = {"path": str(repo_path), "present": repo_path.exists()}
if not repo_path.exists():
return summary
head = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--short", "HEAD"], repo_path, capture_output=True, check=False)
branch = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--abbrev-ref", "HEAD"], repo_path, capture_output=True, check=False)
status = run_as_repo_owner(["git", "-C", str(repo_path), "status", "--short"], repo_path, capture_output=True, check=False)
summary.update(
{
"head": head.stdout.strip() or "unknown",
"branch": branch.stdout.strip() or "unknown",
"dirty": bool(status.stdout.strip()),
"status_count": len([line for line in status.stdout.splitlines() if line.strip()]),
}
)
return summary
def is_smoke_login(login: str) -> bool:
lowered = login.lower()
return lowered.startswith(SMOKE_LOGIN_PREFIXES)
def parse_auth_timestamp(value: str) -> dt.datetime:
return dt.datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f")
def load_auth_activity(hours: int) -> list[dict[str, object]]:
if not AUTH_SYSLOG_PATH.exists():
return []
cutoff = dt.datetime.now() - dt.timedelta(hours=hours)
desc_ips: dict[str, str] = {}
pending_by_desc: dict[str, dict[str, object]] = {}
active_desc: str | None = None
events: list[dict[str, object]] = []
with AUTH_SYSLOG_PATH.open("r", encoding="utf-8", errors="replace") as handle:
for raw_line in handle:
line = raw_line.replace("\x00", "").rstrip("\n")
match = AUTH_LOG_LINE_RE.match(line)
if not match:
continue
timestamp = parse_auth_timestamp(match.group("timestamp"))
if timestamp < cutoff:
continue
message = match.group("message").strip()
conn_match = AUTH_CONN_RE.search(message)
if conn_match:
desc_ips[conn_match.group("desc")] = conn_match.group("ip")
continue
invalid_login_match = AUTH_INVALID_LOGIN_RE.match(message)
if invalid_login_match:
login = invalid_login_match.group("login")
events.append(
{
"time": timestamp,
"login": login,
"ip": desc_ips.get(invalid_login_match.group("desc"), "-"),
"status": "failure",
"reason": "INVALID_LOGIN_STRING",
"smoke": is_smoke_login(login),
}
)
continue
login_match = AUTH_LOGIN_RE.match(message)
if login_match and " key " not in message:
login = login_match.group("login").strip()
desc = login_match.group("desc")
pending_by_desc[desc] = {
"time": timestamp,
"login": login,
"ip": desc_ips.get(desc, "-"),
"smoke": is_smoke_login(login),
}
continue
start_match = AUTH_START_RE.match(message)
if start_match:
active_desc = start_match.group("desc")
continue
failure_match = AUTH_FAILURE_RE.match(message)
if failure_match and active_desc and active_desc in pending_by_desc:
entry = pending_by_desc.pop(active_desc)
entry.update({"status": "failure", "reason": failure_match.group(1)})
events.append(entry)
active_desc = None
continue
success_direct_match = AUTH_SUCCESS_DIRECT_RE.match(message)
if success_direct_match and active_desc and active_desc in pending_by_desc:
entry = pending_by_desc.pop(active_desc)
entry.update({"status": "success", "reason": "SUCCESS"})
events.append(entry)
active_desc = None
continue
if message.startswith("QID_AUTH_LOGIN: SUCCESS") and active_desc and active_desc in pending_by_desc:
entry = pending_by_desc.pop(active_desc)
entry.update({"status": "success", "reason": "SUCCESS"})
events.append(entry)
active_desc = None
return events
def summarize_auth_activity(hours: int, include_smoke: bool) -> dict[str, object]:
events = load_auth_activity(hours)
filtered = [event for event in events if include_smoke or not event["smoke"]]
successes = [event for event in filtered if event["status"] == "success"]
failures = [event for event in filtered if event["status"] == "failure"]
reasons = collections.Counter(str(event["reason"]) for event in failures)
return {
"window_hours": hours,
"include_smoke": include_smoke,
"success_count": len(successes),
"failure_count": len(failures),
"failure_reasons": dict(reasons),
"latest_success": successes[-1] if successes else None,
"latest_failure": failures[-1] if failures else None,
}
def filter_auth_events(hours: int, include_smoke: bool, status: str) -> list[dict[str, object]]:
events = load_auth_activity(hours)
filtered = [event for event in events if include_smoke or not event["smoke"]]
if status != "all":
filtered = [event for event in filtered if event["status"] == status]
return filtered
def load_syserr_entries(hours: int) -> list[dict[str, object]]:
cutoff = dt.datetime.now() - dt.timedelta(hours=hours)
entries: list[dict[str, object]] = []
for path in iter_syserr_files():
last_entry: dict[str, object] | None = None
with path.open("r", encoding="utf-8", errors="replace") as handle:
for raw_line in handle:
line = raw_line.replace("\x00", "").rstrip("\n")
match = GENERIC_LOG_LINE_RE.match(line)
if match:
timestamp = parse_auth_timestamp(match.group("timestamp"))
if timestamp < cutoff:
last_entry = None
continue
last_entry = {
"time": timestamp,
"level": match.group("level"),
"source": str(path.relative_to(RUNTIME_ROOT)),
"message": match.group("message").strip(),
}
entries.append(last_entry)
continue
if last_entry is not None and line.strip():
last_entry["message"] = f"{last_entry['message']} | {line.strip()}"
return [entry for entry in entries if str(entry["level"]).lower() == "error"]
def run_mariadb_query(query: str) -> list[list[str]]:
completed = run(["mariadb", "-N", "-B", "-e", query], require_root=True, capture_output=True)
rows: list[list[str]] = []
for line in completed.stdout.splitlines():
if not line.strip():
continue
rows.append(line.split("\t"))
return rows
def fetch_recent_sessions(hours: int, limit: int, active_only: bool, include_orphans: bool) -> list[dict[str, str]]:
where_clauses = [f"l.login_time >= NOW() - INTERVAL {int(hours)} HOUR"]
if active_only:
where_clauses.append("l.logout_time IS NULL")
if not include_orphans:
where_clauses.append("a.login IS NOT NULL")
query = f"""
SELECT
DATE_FORMAT(l.login_time, '%Y-%m-%d %H:%i:%s'),
COALESCE(DATE_FORMAT(l.logout_time, '%Y-%m-%d %H:%i:%s'), ''),
l.type,
COALESCE(a.login, ''),
l.account_id,
l.pid,
COALESCE(INET_NTOA(l.ip), ''),
COALESCE(l.client_version, '')
FROM log.loginlog2 l
LEFT JOIN account.account a ON a.id = l.account_id
WHERE {' AND '.join(where_clauses)}
ORDER BY l.id DESC
LIMIT {int(limit)}
""".strip()
entries: list[dict[str, str]] = []
for row in run_mariadb_query(query):
while len(row) < 8:
row.append("")
login_time, logout_time, raw_type, login, account_id, pid, ip, client_version = row[:8]
entries.append(
{
"login_time": login_time,
"logout_time": logout_time,
"raw_type": raw_type,
"session_state": "open" if not logout_time else "closed",
"login": login or f"<missing:{account_id}>",
"account_id": account_id,
"pid": pid,
"ip": ip or "-",
"client_version": client_version or "-",
}
)
return entries
def fetch_stale_sessions(hours: int, stale_minutes: int, limit: int, include_orphans: bool) -> list[dict[str, str]]:
where_clauses = [
f"l.login_time >= NOW() - INTERVAL {int(hours)} HOUR",
"l.logout_time IS NULL",
f"TIMESTAMPDIFF(MINUTE, l.login_time, NOW()) >= {int(stale_minutes)}",
]
if not include_orphans:
where_clauses.append("a.login IS NOT NULL")
query = f"""
SELECT
DATE_FORMAT(l.login_time, '%Y-%m-%d %H:%i:%s'),
l.type,
COALESCE(a.login, ''),
l.account_id,
l.pid,
COALESCE(INET_NTOA(l.ip), ''),
TIMESTAMPDIFF(MINUTE, l.login_time, NOW())
FROM log.loginlog2 l
LEFT JOIN account.account a ON a.id = l.account_id
WHERE {' AND '.join(where_clauses)}
ORDER BY l.login_time DESC
LIMIT {int(limit)}
""".strip()
entries: list[dict[str, str]] = []
for row in run_mariadb_query(query):
while len(row) < 7:
row.append("")
login_time, raw_type, login, account_id, pid, ip, age_minutes = row[:7]
entries.append(
{
"login_time": login_time,
"raw_type": raw_type,
"login": login or f"<missing:{account_id}>",
"account_id": account_id,
"pid": pid,
"ip": ip or "-",
"age_minutes": age_minutes or "0",
}
)
return entries
def count_stale_sessions(hours: int, stale_minutes: int, include_orphans: bool) -> int:
where_clauses = [
f"l.login_time >= NOW() - INTERVAL {int(hours)} HOUR",
"l.logout_time IS NULL",
f"TIMESTAMPDIFF(MINUTE, l.login_time, NOW()) >= {int(stale_minutes)}",
]
if not include_orphans:
where_clauses.append("a.login IS NOT NULL")
query = f"""
SELECT COUNT(*)
FROM log.loginlog2 l
LEFT JOIN account.account a ON a.id = l.account_id
WHERE {' AND '.join(where_clauses)}
""".strip()
rows = run_mariadb_query(query)
if not rows or not rows[0]:
return 0
try:
return int(rows[0][0])
except ValueError:
return 0
def live_ports() -> set[int]:
if shutil.which("ss") is None:
return set()
completed = run(["ss", "-ltnH"], capture_output=True, check=True)
ports: set[int] = set()
for line in completed.stdout.splitlines():
fields = line.split()
if len(fields) < 4:
continue
local = fields[3]
if ":" not in local:
continue
try:
ports.add(int(local.rsplit(":", 1)[1]))
except ValueError:
continue
return ports
def print_inventory(as_json: bool) -> int:
if as_json:
print(json.dumps(channel_inventory.load_inventory(), indent=2))
return 0
rows: list[list[str]] = []
for channel in channel_inventory.iter_channels():
channel_id = int(channel["id"])
ports = ",".join(str(core["port"]) for core in sorted(channel["cores"], key=lambda item: int(item["id"])))
rows.append(
[
str(channel_id),
channel["name"],
"yes" if channel.get("public") else "no",
"yes" if channel.get("client_visible") else "no",
str(len(channel["cores"])),
ports,
]
)
print_table(["channel", "name", "public", "visible", "cores", "ports"], rows)
return 0
def print_units() -> int:
rows = [
["stack", channel_inventory.STACK_UNIT],
["db", channel_inventory.DB_UNIT],
["db-ready", channel_inventory.DB_READY_UNIT],
["auth", channel_inventory.AUTH_UNIT],
]
for unit in channel_inventory.get_game_units():
rows.append(["game", unit])
print_table(["kind", "unit"], rows)
return 0
def print_summary(hours: int, include_smoke: bool, as_json: bool) -> int:
units = [
channel_inventory.STACK_UNIT,
channel_inventory.DB_UNIT,
channel_inventory.DB_READY_UNIT,
channel_inventory.AUTH_UNIT,
*channel_inventory.get_game_units(),
]
unit_rows = []
for unit in units:
active, sub_state, enabled = get_unit_state(unit)
unit_rows.append({"unit": unit, "active": active, "sub": sub_state, "enabled": enabled})
game_units = [row for row in unit_rows if row["unit"].startswith("metin-game@")]
enabled_game_units = [row for row in game_units if row["enabled"] == "enabled"]
game_active = sum(1 for row in enabled_game_units if row["active"] == "active")
listening = live_ports()
port_rows = iter_port_rows()
auth_summary = summarize_auth_activity(hours, include_smoke)
recent_errors = load_syserr_entries(hours)
stale_session_count = count_stale_sessions(hours=max(hours, 1), stale_minutes=30, include_orphans=False)
stale_total_count = count_stale_sessions(hours=max(hours, 1), stale_minutes=30, include_orphans=True)
stale_orphan_count = max(stale_total_count - stale_session_count, 0)
repos = {
"m2dev-server": git_summary(REPO_ROOT),
"m2dev-server-src": git_summary(SOURCE_REPO_ROOT),
}
incident_count = count_incident_bundles()
core_count = len(iter_core_files())
payload = {
"repos": repos,
"units": unit_rows,
"game_active": game_active,
"game_enabled": len(enabled_game_units),
"game_declared": len(game_units),
"ports": [
{
**row,
"live": int(row["port"]) in listening,
}
for row in port_rows
],
"auth": auth_summary,
"recent_error_count": len(recent_errors),
"latest_error": {
"time": recent_errors[-1]["time"].strftime("%Y-%m-%d %H:%M:%S"),
"source": recent_errors[-1]["source"],
"message": recent_errors[-1]["message"],
} if recent_errors else None,
"stale_open_sessions": {
"user_count": stale_session_count,
"orphan_count": stale_orphan_count,
"total_count": stale_total_count,
},
"core_count": core_count,
"incident_count": incident_count,
}
if as_json:
print(json.dumps(payload, indent=2, default=str))
return 0
repo_rows = []
for name, summary in repos.items():
repo_rows.append(
[
name,
str(summary.get("branch", "unknown")),
str(summary.get("head", "unknown")),
"yes" if summary.get("dirty") else "no",
]
)
public_ports = [row for row in payload["ports"] if row["visibility"] == "public"]
public_port_rows = [
[row["name"], row["port"], row["p2p_port"], "yes" if row["live"] else "no"]
for row in public_ports
]
print("Repos")
print_table(["repo", "branch", "head", "dirty"], repo_rows)
print()
print("Runtime")
print_table(["unit", "active", "sub", "enabled"], [[row["unit"], row["active"], row["sub"], row["enabled"]] for row in unit_rows[:4]])
print(f"game instances active: {game_active}/{len(enabled_game_units)} enabled ({len(game_units)} declared)")
print(f"core files: {core_count}")
print(f"incident bundles: {incident_count}")
print(f"recent syserr errors ({hours}h): {len(recent_errors)}")
print(f"stale open sessions (>30m): {stale_session_count} user, {stale_orphan_count} orphan")
print()
print("Public Ports")
print_table(["name", "port", "p2p", "live"], public_port_rows)
print()
print(f"Auth ({hours}h)")
print(f"successes: {auth_summary['success_count']}")
print(f"failures: {auth_summary['failure_count']}")
if auth_summary["failure_reasons"]:
reason_line = ", ".join(f"{reason}={count}" for reason, count in sorted(auth_summary["failure_reasons"].items()))
print(f"failure reasons: {reason_line}")
latest_success = auth_summary["latest_success"]
if latest_success:
print(
f"latest success: {latest_success['time'].strftime('%Y-%m-%d %H:%M:%S')} "
f"{latest_success['login']} from {latest_success['ip']}"
)
latest_failure = auth_summary["latest_failure"]
if latest_failure:
print(
f"latest failure: {latest_failure['time'].strftime('%Y-%m-%d %H:%M:%S')} "
f"{latest_failure['login']} from {latest_failure['ip']} reason={latest_failure['reason']}"
)
latest_error = payload["latest_error"]
if latest_error:
print(
f"latest error: {latest_error['time']} "
f"{latest_error['source']} {latest_error['message']}"
)
return 0
def print_auth_activity(hours: int, limit: int, status: str, include_smoke: bool, as_json: bool) -> int:
events = filter_auth_events(hours, include_smoke, status)
events = events[-limit:]
payload = {
"window_hours": hours,
"limit": limit,
"status": status,
"include_smoke": include_smoke,
"count": len(events),
"entries": [
{
"time": event["time"].strftime("%Y-%m-%d %H:%M:%S"),
"status": event["status"],
"login": event["login"],
"ip": event["ip"],
"reason": event["reason"],
}
for event in events
],
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not events:
print(f"No auth activity in the last {hours}h for status={status}.")
return 0
rows = [
[
event["time"].strftime("%Y-%m-%d %H:%M:%S"),
str(event["status"]),
str(event["login"]),
str(event["ip"]),
str(event["reason"]),
]
for event in events
]
print_table(["time", "status", "login", "ip", "reason"], rows)
return 0
def print_auth_ips(hours: int, limit: int, include_smoke: bool, as_json: bool) -> int:
events = filter_auth_events(hours, include_smoke, "all")
grouped: dict[str, dict[str, object]] = {}
for event in events:
ip = str(event["ip"])
bucket = grouped.setdefault(
ip,
{
"ip": ip,
"success_count": 0,
"failure_count": 0,
"last_seen": event["time"],
"last_login": str(event["login"]),
"last_reason": str(event["reason"]),
},
)
if event["status"] == "success":
bucket["success_count"] = int(bucket["success_count"]) + 1
else:
bucket["failure_count"] = int(bucket["failure_count"]) + 1
if event["time"] >= bucket["last_seen"]:
bucket["last_seen"] = event["time"]
bucket["last_login"] = str(event["login"])
bucket["last_reason"] = str(event["reason"])
rows = sorted(
grouped.values(),
key=lambda item: (
int(item["failure_count"]),
int(item["success_count"]),
item["last_seen"],
),
reverse=True,
)[:limit]
payload = {
"window_hours": hours,
"limit": limit,
"include_smoke": include_smoke,
"count": len(rows),
"entries": [
{
"ip": str(row["ip"]),
"success_count": int(row["success_count"]),
"failure_count": int(row["failure_count"]),
"last_seen": row["last_seen"].strftime("%Y-%m-%d %H:%M:%S"),
"last_login": str(row["last_login"]),
"last_reason": str(row["last_reason"]),
}
for row in rows
],
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not rows:
print(f"No auth IP activity in the last {hours}h.")
return 0
table_rows = [
[
str(row["ip"]),
str(row["success_count"]),
str(row["failure_count"]),
row["last_seen"].strftime("%Y-%m-%d %H:%M:%S"),
str(row["last_login"]),
str(row["last_reason"]),
]
for row in rows
]
print_table(["ip", "success", "failure", "last_seen", "last_login", "last_reason"], table_rows)
return 0
def print_recent_errors(hours: int, limit: int, as_json: bool) -> int:
entries = load_syserr_entries(hours)[-limit:]
payload = {
"window_hours": hours,
"limit": limit,
"count": len(entries),
"entries": [
{
"time": entry["time"].strftime("%Y-%m-%d %H:%M:%S"),
"source": str(entry["source"]),
"message": str(entry["message"]),
}
for entry in entries
],
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not entries:
print(f"No syserr entries in the last {hours}h.")
return 0
rows = [
[entry["time"].strftime("%Y-%m-%d %H:%M:%S"), str(entry["source"]), str(entry["message"])]
for entry in entries
]
print_table(["time", "source", "message"], rows)
return 0
def print_error_summary(hours: int, limit: int, as_json: bool) -> int:
entries = load_syserr_entries(hours)
grouped: dict[tuple[str, str], dict[str, object]] = {}
for entry in entries:
key = (str(entry["source"]), str(entry["message"]))
bucket = grouped.setdefault(
key,
{
"source": str(entry["source"]),
"message": str(entry["message"]),
"count": 0,
"last_seen": entry["time"],
},
)
bucket["count"] = int(bucket["count"]) + 1
if entry["time"] >= bucket["last_seen"]:
bucket["last_seen"] = entry["time"]
rows = sorted(
grouped.values(),
key=lambda item: (int(item["count"]), item["last_seen"]),
reverse=True,
)[:limit]
payload = {
"window_hours": hours,
"limit": limit,
"count": len(rows),
"entries": [
{
"source": str(row["source"]),
"count": int(row["count"]),
"last_seen": row["last_seen"].strftime("%Y-%m-%d %H:%M:%S"),
"message": str(row["message"]),
}
for row in rows
],
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not rows:
print(f"No syserr summary entries in the last {hours}h.")
return 0
table_rows = [
[str(row["count"]), row["last_seen"].strftime("%Y-%m-%d %H:%M:%S"), str(row["source"]), str(row["message"])]
for row in rows
]
print_table(["count", "last_seen", "source", "message"], table_rows)
return 0
def resolve_target_units(target: str) -> list[str]:
normalized = target.strip().lower()
if normalized in {"all", "stack", "server"}:
return [channel_inventory.STACK_UNIT]
if normalized == "db":
return [channel_inventory.DB_UNIT]
if normalized in {"db-ready", "db_ready"}:
return [channel_inventory.DB_READY_UNIT]
if normalized == "auth":
return [channel_inventory.AUTH_UNIT]
if normalized in {"game", "games"}:
return channel_inventory.get_game_units()
if normalized.startswith("channel:"):
channel_id = int(normalized.split(":", 1)[1])
return channel_inventory.get_game_units([channel_id])
if normalized.startswith("instance:"):
return [channel_inventory.game_unit(target.split(":", 1)[1])]
if normalized.startswith("channel") and "_core" in normalized:
return [channel_inventory.game_unit(target)]
raise SystemExit(f"Unknown target: {target}")
def print_status(target: str) -> int:
if target == "all":
units = [
channel_inventory.STACK_UNIT,
channel_inventory.DB_UNIT,
channel_inventory.DB_READY_UNIT,
channel_inventory.AUTH_UNIT,
*channel_inventory.get_game_units(),
]
else:
units = resolve_target_units(target)
rows: list[list[str]] = []
for unit in units:
active, sub_state, enabled = get_unit_state(unit)
rows.append([unit, active, sub_state, enabled])
print_table(["unit", "active", "sub", "enabled"], rows)
return 0
def print_ports(show_live: bool) -> int:
listening = live_ports() if show_live else set()
headers = ["scope", "name", "port", "p2p", "visibility", "unit"]
if show_live:
headers.append("live")
rows: list[list[str]] = []
for row in iter_port_rows():
values = [row["scope"], row["name"], row["port"], row["p2p_port"], row["visibility"], row["unit"]]
if show_live:
values.append("yes" if int(row["port"]) in listening else "no")
rows.append(values)
print_table(headers, rows)
return 0
def print_cores(as_json: bool) -> int:
entries = []
for path in iter_core_files():
stat = path.stat()
entries.append(
{
"path": str(path),
"relative_path": str(path.relative_to(RUNTIME_ROOT)),
"size_bytes": stat.st_size,
"mtime_epoch": int(stat.st_mtime),
}
)
if as_json:
print(json.dumps(entries, indent=2))
return 0
if not entries:
print("No core files found under the runtime tree.")
return 0
rows = [[entry["relative_path"], str(entry["size_bytes"]), str(entry["mtime_epoch"])] for entry in entries]
print_table(["path", "size_bytes", "mtime_epoch"], rows)
return 0
def print_incidents(limit: int) -> int:
if not INCIDENT_ROOT.exists():
print(f"No incident directory: {INCIDENT_ROOT}")
return 0
bundles = sorted((path for path in INCIDENT_ROOT.iterdir() if path.is_dir()), reverse=True)[:limit]
if not bundles:
print(f"No incident bundles in {INCIDENT_ROOT}")
return 0
rows = [[bundle.name, str(bundle)] for bundle in bundles]
print_table(["bundle", "path"], rows)
return 0
def run_unit_action(action: str, target: str) -> int:
units = resolve_target_units(target)
run(["systemctl", action, *units], require_root=True)
return 0
def run_logs(target: str, lines: int, follow: bool) -> int:
units = resolve_target_units(target)
command = ["journalctl", "--no-pager", f"-n{lines}"]
for unit in units:
command.extend(["-u", unit])
if follow:
command = ["journalctl", f"-n{lines}", "-f", *sum((["-u", unit] for unit in units), [])]
run(command, require_root=True)
return 0
def print_auth_failures(hours: int, limit: int, include_smoke: bool, as_json: bool) -> int:
events = load_auth_activity(hours)
failures = [event for event in events if event["status"] == "failure" and (include_smoke or not event["smoke"])]
failures = failures[-limit:]
reason_counts = collections.Counter(str(event["reason"]) for event in failures)
payload = {
"window_hours": hours,
"limit": limit,
"include_smoke": include_smoke,
"count": len(failures),
"reasons": dict(reason_counts),
"entries": [
{
"time": event["time"].strftime("%Y-%m-%d %H:%M:%S"),
"login": event["login"],
"ip": event["ip"],
"reason": event["reason"],
}
for event in failures
],
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not failures:
print(f"No auth failures in the last {hours}h.")
return 0
if reason_counts:
print(", ".join(f"{reason}={count}" for reason, count in sorted(reason_counts.items())))
print()
rows = [
[event["time"].strftime("%Y-%m-%d %H:%M:%S"), str(event["login"]), str(event["ip"]), str(event["reason"])]
for event in failures
]
print_table(["time", "login", "ip", "reason"], rows)
return 0
def print_sessions(hours: int, limit: int, active_only: bool, include_orphans: bool, as_json: bool) -> int:
entries = fetch_recent_sessions(hours, limit, active_only, include_orphans)
payload = {
"window_hours": hours,
"limit": limit,
"active_only": active_only,
"include_orphans": include_orphans,
"count": len(entries),
"entries": entries,
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not entries:
print(f"No sessions in the last {hours}h.")
return 0
rows = [
[
entry["login_time"],
entry["logout_time"] or "-",
entry["session_state"],
entry["login"],
entry["account_id"],
entry["pid"],
entry["ip"],
]
for entry in entries
]
print_table(["login_time", "logout_time", "state", "login", "account", "pid", "ip"], rows)
return 0
def print_session_audit(hours: int, stale_minutes: int, limit: int, include_orphans: bool, as_json: bool) -> int:
entries = fetch_stale_sessions(hours, stale_minutes, limit, include_orphans)
payload = {
"window_hours": hours,
"stale_minutes": stale_minutes,
"limit": limit,
"include_orphans": include_orphans,
"count": len(entries),
"entries": entries,
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not entries:
print(f"No stale open sessions older than {stale_minutes} minutes in the last {hours}h.")
return 0
rows = [
[
entry["login_time"],
entry["age_minutes"],
entry["login"],
entry["account_id"],
entry["pid"],
entry["ip"],
entry["raw_type"],
]
for entry in entries
]
print_table(["login_time", "age_min", "login", "account", "pid", "ip", "raw_type"], rows)
return 0
def run_healthcheck(mode: str) -> int:
if not HEALTHCHECK_PATH.exists():
raise SystemExit(f"Missing healthcheck wrapper: {HEALTHCHECK_PATH}")
run([str(HEALTHCHECK_PATH), "--mode", mode], require_root=True)
return 0
def run_wait_ready(timeout_seconds: int, interval_seconds: float) -> int:
if not HEALTHCHECK_PATH.exists():
raise SystemExit(f"Missing healthcheck wrapper: {HEALTHCHECK_PATH}")
deadline = time.time() + timeout_seconds
attempt = 0
last_returncode = 1
while time.time() < deadline:
attempt += 1
print(f"Healthcheck attempt {attempt}...")
completed = subprocess.run(
build_command([str(HEALTHCHECK_PATH), "--mode", "ready"], require_root=True),
check=False,
text=True,
)
if completed.returncode == 0:
return 0
last_returncode = completed.returncode
remaining = deadline - time.time()
if remaining <= 0:
break
time.sleep(min(interval_seconds, remaining))
raise SystemExit(f"Timed out waiting for login-ready state. Last healthcheck exit code: {last_returncode}")
def run_incident_collect(tag: str, since: str, include_cores: bool) -> int:
if not INCIDENT_COLLECTOR_PATH.exists():
raise SystemExit(f"Missing incident collector: {INCIDENT_COLLECTOR_PATH}")
command = [str(INCIDENT_COLLECTOR_PATH), "--tag", tag, "--since", since]
if include_cores:
command.append("--include-cores")
run(command, require_root=True)
return 0
def run_backtrace(core: str | None, exe: str | None) -> int:
if not CORE_BACKTRACE_PATH.exists():
raise SystemExit(f"Missing core backtrace helper: {CORE_BACKTRACE_PATH}")
command = [str(CORE_BACKTRACE_PATH)]
if core:
command.extend(["--core", core])
if exe:
command.extend(["--exe", exe])
run(command, require_root=True)
return 0
def main() -> int:
args = parse_args()
if args.command == "inventory":
return print_inventory(args.json)
if args.command == "units":
return print_units()
if args.command == "summary":
return print_summary(args.hours, args.include_smoke, args.json)
if args.command == "auth-activity":
return print_auth_activity(args.hours, args.limit, args.status, args.include_smoke, args.json)
if args.command == "auth-ips":
return print_auth_ips(args.hours, args.limit, args.include_smoke, args.json)
if args.command == "recent-errors":
return print_recent_errors(args.hours, args.limit, args.json)
if args.command == "error-summary":
return print_error_summary(args.hours, args.limit, args.json)
if args.command == "status":
return print_status(args.target)
if args.command == "ports":
return print_ports(args.live)
if args.command == "cores":
return print_cores(args.json)
if args.command == "incidents":
return print_incidents(args.limit)
if args.command == "auth-failures":
return print_auth_failures(args.hours, args.limit, args.include_smoke, args.json)
if args.command == "sessions":
return print_sessions(args.hours, args.limit, args.active_only, args.include_orphans, args.json)
if args.command == "session-audit":
return print_session_audit(args.hours, args.stale_minutes, args.limit, args.include_orphans, args.json)
if args.command in {"start", "stop", "restart"}:
return run_unit_action(args.command, args.target)
if args.command == "logs":
return run_logs(args.target, args.lines, args.follow)
if args.command == "incident-collect":
return run_incident_collect(args.tag, args.since, args.include_cores)
if args.command == "backtrace":
return run_backtrace(args.core, args.exe)
if args.command == "healthcheck":
return run_healthcheck(args.mode)
if args.command == "wait-ready":
return run_wait_ready(args.timeout, args.interval)
raise SystemExit(f"Unsupported command: {args.command}")
if __name__ == "__main__":
raise SystemExit(main())