ops: add summary and auth failure views

This commit is contained in:
server
2026-04-14 16:00:14 +02:00
parent 4fccf13e09
commit 825cfbc19b
2 changed files with 349 additions and 0 deletions

View File

@@ -2,8 +2,12 @@
from __future__ import annotations
import argparse
import collections
import datetime as dt
import json
import os
import pwd
import re
import shutil
import subprocess
import sys
@@ -15,12 +19,24 @@ RUNTIME_ROOT = Path("{{RUNTIME_ROOT}}")
HEALTHCHECK_PATH = Path("/usr/local/sbin/metin-login-healthcheck")
INCIDENT_COLLECTOR_PATH = Path("/usr/local/sbin/metin-collect-incident")
INCIDENT_ROOT = Path("/var/lib/metin/incidents")
AUTH_SYSLOG_PATH = RUNTIME_ROOT / "channels" / "auth" / "syslog.log"
SOURCE_REPO_ROOT = RUNTIME_ROOT.parent.parent / "repos" / "m2dev-server-src"
sys.path.insert(0, str(REPO_ROOT))
import channel_inventory
AUTH_LOG_LINE_RE = re.compile(r"^\[(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[[^]]+\] (?P<message>.*)$")
AUTH_CONN_RE = re.compile(r"SYSTEM: new connection from \[(?P<ip>[^]]+)\].* ptr (?P<desc>0x[0-9a-fA-F]+)")
AUTH_LOGIN_RE = re.compile(r"InputAuth::Login : (?P<login>[^(]+)\(\d+\) desc (?P<desc>0x[0-9a-fA-F]+)")
AUTH_INVALID_LOGIN_RE = re.compile(r"InputAuth::Login : IS_NOT_VALID_LOGIN_STRING\((?P<login>[^)]+)\) desc (?P<desc>0x[0-9a-fA-F]+)")
AUTH_START_RE = re.compile(r"(?:AUTH_LOGIN_DIRECT|QID_AUTH_LOGIN): START \d+ (?P<desc>0x[0-9a-fA-F]+)")
AUTH_SUCCESS_DIRECT_RE = re.compile(r"AUTH_LOGIN_DIRECT: SUCCESS (?P<login>.+)$")
AUTH_FAILURE_RE = re.compile(r"^(NOID|WRONGPWD|NOTAVAIL|ALREADY)$")
SMOKE_LOGIN_PREFIXES = ("smk", "smkhc", "smkdel", "smkfull", "smkneg", "smoke_", "csmk")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Operational CLI for the Debian Metin runtime")
subparsers = parser.add_subparsers(dest="command", required=True)
@@ -30,6 +46,11 @@ def parse_args() -> argparse.Namespace:
subparsers.add_parser("units", help="List managed systemd units")
summary_parser = subparsers.add_parser("summary", help="Show an operational summary")
summary_parser.add_argument("--hours", type=int, default=24, help="Auth activity window in hours")
summary_parser.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins in auth summary")
summary_parser.add_argument("--json", action="store_true", help="Print raw JSON")
status_parser = subparsers.add_parser("status", help="Show current unit state")
status_parser.add_argument("target", nargs="?", default="all", help="stack, db, auth, game, channel:<id>, instance:<name>")
@@ -56,6 +77,12 @@ def parse_args() -> argparse.Namespace:
incident_collect.add_argument("--since", default="-30 minutes", help="journalctl --since value")
incident_collect.add_argument("--include-cores", action="store_true", help="Copy matching core files into the bundle")
auth_failures = subparsers.add_parser("auth-failures", help="Show recent auth failures from auth syslog")
auth_failures.add_argument("--hours", type=int, default=24, help="How many hours back to inspect")
auth_failures.add_argument("--limit", type=int, default=20, help="Maximum failures to show")
auth_failures.add_argument("--include-smoke", action="store_true", help="Include smoke-test logins")
auth_failures.add_argument("--json", action="store_true", help="Print raw JSON")
wait_ready = subparsers.add_parser("wait-ready", help="Wait until the runtime passes the login-ready probe")
wait_ready.add_argument("--timeout", type=int, default=120, help="Maximum seconds to wait")
wait_ready.add_argument("--interval", type=float, default=5.0, help="Seconds between healthcheck attempts")
@@ -80,6 +107,22 @@ def run(command: list[str], require_root: bool = False, capture_output: bool = F
)
def run_as_repo_owner(command: list[str], repo_path: Path, capture_output: bool = False, check: bool = True) -> subprocess.CompletedProcess[str]:
effective_command = list(command)
if repo_path.exists():
owner_uid = repo_path.stat().st_uid
if os.geteuid() == 0 and owner_uid != 0:
owner_name = pwd.getpwuid(owner_uid).pw_name
effective_command = ["sudo", "-u", owner_name, *effective_command]
return subprocess.run(
effective_command,
check=check,
capture_output=capture_output,
text=True,
)
def get_unit_state(unit: str) -> tuple[str, str, str]:
active = run(["systemctl", "is-active", unit], capture_output=True, check=False).stdout.strip() or "unknown"
enabled = run(["systemctl", "is-enabled", unit], capture_output=True, check=False).stdout.strip() or "unknown"
@@ -144,6 +187,144 @@ def iter_core_files() -> list[Path]:
return [path for path in sorted(RUNTIME_ROOT.glob("channels/**/core*")) if path.is_file()]
def count_incident_bundles() -> int:
if not INCIDENT_ROOT.exists():
return 0
return sum(1 for path in INCIDENT_ROOT.iterdir() if path.is_dir())
def git_summary(repo_path: Path) -> dict[str, object]:
summary: dict[str, object] = {"path": str(repo_path), "present": repo_path.exists()}
if not repo_path.exists():
return summary
head = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--short", "HEAD"], repo_path, capture_output=True, check=False)
branch = run_as_repo_owner(["git", "-C", str(repo_path), "rev-parse", "--abbrev-ref", "HEAD"], repo_path, capture_output=True, check=False)
status = run_as_repo_owner(["git", "-C", str(repo_path), "status", "--short"], repo_path, capture_output=True, check=False)
summary.update(
{
"head": head.stdout.strip() or "unknown",
"branch": branch.stdout.strip() or "unknown",
"dirty": bool(status.stdout.strip()),
"status_count": len([line for line in status.stdout.splitlines() if line.strip()]),
}
)
return summary
def is_smoke_login(login: str) -> bool:
lowered = login.lower()
return lowered.startswith(SMOKE_LOGIN_PREFIXES)
def parse_auth_timestamp(value: str) -> dt.datetime:
return dt.datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f")
def load_auth_activity(hours: int) -> list[dict[str, object]]:
if not AUTH_SYSLOG_PATH.exists():
return []
cutoff = dt.datetime.now() - dt.timedelta(hours=hours)
desc_ips: dict[str, str] = {}
pending_by_desc: dict[str, dict[str, object]] = {}
active_desc: str | None = None
events: list[dict[str, object]] = []
with AUTH_SYSLOG_PATH.open("r", encoding="utf-8", errors="replace") as handle:
for raw_line in handle:
line = raw_line.replace("\x00", "").rstrip("\n")
match = AUTH_LOG_LINE_RE.match(line)
if not match:
continue
timestamp = parse_auth_timestamp(match.group("timestamp"))
if timestamp < cutoff:
continue
message = match.group("message").strip()
conn_match = AUTH_CONN_RE.search(message)
if conn_match:
desc_ips[conn_match.group("desc")] = conn_match.group("ip")
continue
invalid_login_match = AUTH_INVALID_LOGIN_RE.match(message)
if invalid_login_match:
login = invalid_login_match.group("login")
events.append(
{
"time": timestamp,
"login": login,
"ip": desc_ips.get(invalid_login_match.group("desc"), "-"),
"status": "failure",
"reason": "INVALID_LOGIN_STRING",
"smoke": is_smoke_login(login),
}
)
continue
login_match = AUTH_LOGIN_RE.match(message)
if login_match and " key " not in message:
login = login_match.group("login").strip()
desc = login_match.group("desc")
pending_by_desc[desc] = {
"time": timestamp,
"login": login,
"ip": desc_ips.get(desc, "-"),
"smoke": is_smoke_login(login),
}
continue
start_match = AUTH_START_RE.match(message)
if start_match:
active_desc = start_match.group("desc")
continue
failure_match = AUTH_FAILURE_RE.match(message)
if failure_match and active_desc and active_desc in pending_by_desc:
entry = pending_by_desc.pop(active_desc)
entry.update({"status": "failure", "reason": failure_match.group(1)})
events.append(entry)
active_desc = None
continue
success_direct_match = AUTH_SUCCESS_DIRECT_RE.match(message)
if success_direct_match and active_desc and active_desc in pending_by_desc:
entry = pending_by_desc.pop(active_desc)
entry.update({"status": "success", "reason": "SUCCESS"})
events.append(entry)
active_desc = None
continue
if message.startswith("QID_AUTH_LOGIN: SUCCESS") and active_desc and active_desc in pending_by_desc:
entry = pending_by_desc.pop(active_desc)
entry.update({"status": "success", "reason": "SUCCESS"})
events.append(entry)
active_desc = None
return events
def summarize_auth_activity(hours: int, include_smoke: bool) -> dict[str, object]:
events = load_auth_activity(hours)
filtered = [event for event in events if include_smoke or not event["smoke"]]
successes = [event for event in filtered if event["status"] == "success"]
failures = [event for event in filtered if event["status"] == "failure"]
reasons = collections.Counter(str(event["reason"]) for event in failures)
return {
"window_hours": hours,
"include_smoke": include_smoke,
"success_count": len(successes),
"failure_count": len(failures),
"failure_reasons": dict(reasons),
"latest_success": successes[-1] if successes else None,
"latest_failure": failures[-1] if failures else None,
}
def live_ports() -> set[int]:
if shutil.which("ss") is None:
return set()
@@ -201,6 +382,107 @@ def print_units() -> int:
return 0
def print_summary(hours: int, include_smoke: bool, as_json: bool) -> int:
units = [
channel_inventory.STACK_UNIT,
channel_inventory.DB_UNIT,
channel_inventory.DB_READY_UNIT,
channel_inventory.AUTH_UNIT,
*channel_inventory.get_game_units(),
]
unit_rows = []
for unit in units:
active, sub_state, enabled = get_unit_state(unit)
unit_rows.append({"unit": unit, "active": active, "sub": sub_state, "enabled": enabled})
game_units = [row for row in unit_rows if row["unit"].startswith("metin-game@")]
enabled_game_units = [row for row in game_units if row["enabled"] == "enabled"]
game_active = sum(1 for row in enabled_game_units if row["active"] == "active")
listening = live_ports()
port_rows = iter_port_rows()
auth_summary = summarize_auth_activity(hours, include_smoke)
repos = {
"m2dev-server": git_summary(REPO_ROOT),
"m2dev-server-src": git_summary(SOURCE_REPO_ROOT),
}
incident_count = count_incident_bundles()
core_count = len(iter_core_files())
payload = {
"repos": repos,
"units": unit_rows,
"game_active": game_active,
"game_enabled": len(enabled_game_units),
"game_declared": len(game_units),
"ports": [
{
**row,
"live": int(row["port"]) in listening,
}
for row in port_rows
],
"auth": auth_summary,
"core_count": core_count,
"incident_count": incident_count,
}
if as_json:
print(json.dumps(payload, indent=2, default=str))
return 0
repo_rows = []
for name, summary in repos.items():
repo_rows.append(
[
name,
str(summary.get("branch", "unknown")),
str(summary.get("head", "unknown")),
"yes" if summary.get("dirty") else "no",
]
)
public_ports = [row for row in payload["ports"] if row["visibility"] == "public"]
public_port_rows = [
[row["name"], row["port"], row["p2p_port"], "yes" if row["live"] else "no"]
for row in public_ports
]
print("Repos")
print_table(["repo", "branch", "head", "dirty"], repo_rows)
print()
print("Runtime")
print_table(["unit", "active", "sub", "enabled"], [[row["unit"], row["active"], row["sub"], row["enabled"]] for row in unit_rows[:4]])
print(f"game instances active: {game_active}/{len(enabled_game_units)} enabled ({len(game_units)} declared)")
print(f"core files: {core_count}")
print(f"incident bundles: {incident_count}")
print()
print("Public Ports")
print_table(["name", "port", "p2p", "live"], public_port_rows)
print()
print(f"Auth ({hours}h)")
print(f"successes: {auth_summary['success_count']}")
print(f"failures: {auth_summary['failure_count']}")
if auth_summary["failure_reasons"]:
reason_line = ", ".join(f"{reason}={count}" for reason, count in sorted(auth_summary["failure_reasons"].items()))
print(f"failure reasons: {reason_line}")
latest_success = auth_summary["latest_success"]
if latest_success:
print(
f"latest success: {latest_success['time'].strftime('%Y-%m-%d %H:%M:%S')} "
f"{latest_success['login']} from {latest_success['ip']}"
)
latest_failure = auth_summary["latest_failure"]
if latest_failure:
print(
f"latest failure: {latest_failure['time'].strftime('%Y-%m-%d %H:%M:%S')} "
f"{latest_failure['login']} from {latest_failure['ip']} reason={latest_failure['reason']}"
)
return 0
def resolve_target_units(target: str) -> list[str]:
normalized = target.strip().lower()
@@ -317,6 +599,49 @@ def run_logs(target: str, lines: int, follow: bool) -> int:
return 0
def print_auth_failures(hours: int, limit: int, include_smoke: bool, as_json: bool) -> int:
events = load_auth_activity(hours)
failures = [event for event in events if event["status"] == "failure" and (include_smoke or not event["smoke"])]
failures = failures[-limit:]
reason_counts = collections.Counter(str(event["reason"]) for event in failures)
payload = {
"window_hours": hours,
"limit": limit,
"include_smoke": include_smoke,
"count": len(failures),
"reasons": dict(reason_counts),
"entries": [
{
"time": event["time"].strftime("%Y-%m-%d %H:%M:%S"),
"login": event["login"],
"ip": event["ip"],
"reason": event["reason"],
}
for event in failures
],
}
if as_json:
print(json.dumps(payload, indent=2))
return 0
if not failures:
print(f"No auth failures in the last {hours}h.")
return 0
if reason_counts:
print(", ".join(f"{reason}={count}" for reason, count in sorted(reason_counts.items())))
print()
rows = [
[event["time"].strftime("%Y-%m-%d %H:%M:%S"), str(event["login"]), str(event["ip"]), str(event["reason"])]
for event in failures
]
print_table(["time", "login", "ip", "reason"], rows)
return 0
def run_healthcheck(mode: str) -> int:
if not HEALTHCHECK_PATH.exists():
raise SystemExit(f"Missing healthcheck wrapper: {HEALTHCHECK_PATH}")
@@ -369,6 +694,8 @@ def main() -> int:
return print_inventory(args.json)
if args.command == "units":
return print_units()
if args.command == "summary":
return print_summary(args.hours, args.include_smoke, args.json)
if args.command == "status":
return print_status(args.target)
if args.command == "ports":
@@ -377,6 +704,8 @@ def main() -> int:
return print_cores(args.json)
if args.command == "incidents":
return print_incidents(args.limit)
if args.command == "auth-failures":
return print_auth_failures(args.hours, args.limit, args.include_smoke, args.json)
if args.command in {"start", "stop", "restart"}:
return run_unit_action(args.command, args.target)
if args.command == "logs":