#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import shutil import socket import subprocess import sys from datetime import datetime, timezone from pathlib import Path REPO_ROOT = Path("{{REPO_ROOT}}") RUNTIME_ROOT = Path("{{RUNTIME_ROOT}}") INCIDENT_ROOT_DEFAULT = Path("/var/lib/metin/incidents") sys.path.insert(0, str(REPO_ROOT)) import channel_inventory def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Collect a Metin runtime incident bundle") parser.add_argument("--tag", default="manual", help="Short incident tag used in the bundle directory name") parser.add_argument("--since", default="-30 minutes", help="journalctl --since value") parser.add_argument("--output-root", default=str(INCIDENT_ROOT_DEFAULT), help="Incident bundle root directory") parser.add_argument("--include-cores", action="store_true", help="Copy matching core files into the bundle") return parser.parse_args() def ensure_root() -> None: if os.geteuid() != 0: raise SystemExit("Run as root.") def sanitize_tag(value: str) -> str: filtered = "".join(char if char.isalnum() or char in {"-", "_"} else "-" for char in value.strip()) return filtered or "manual" def run(command: list[str], check: bool = True) -> subprocess.CompletedProcess[str]: return subprocess.run(command, check=check, capture_output=True, text=True) def write_text(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") def write_command_output(bundle_dir: Path, filename: str, command: list[str], check: bool = False) -> None: completed = run(command, check=check) content = f"$ {' '.join(command)}\n\n" if completed.stdout: content += completed.stdout if completed.stderr: content += "\n[stderr]\n" + completed.stderr write_text(bundle_dir / filename, content) def copy_log_tails(bundle_dir: Path) -> None: logs_dir = bundle_dir / "logs" for path in sorted(RUNTIME_ROOT.glob("channels/**/syslog.log")) + sorted(RUNTIME_ROOT.glob("channels/**/syserr.log")): if not path.is_file(): continue relative = path.relative_to(RUNTIME_ROOT) destination = logs_dir / relative destination.parent.mkdir(parents=True, exist_ok=True) completed = run(["tail", "-n", "400", str(path)], check=False) content = f"# tail -n 400 {path}\n\n" if completed.stdout: content += completed.stdout if completed.stderr: content += "\n[stderr]\n" + completed.stderr destination.write_text(content, encoding="utf-8") def find_core_files() -> list[Path]: matches: list[Path] = [] for path in sorted(RUNTIME_ROOT.glob("channels/**/core*")): if path.is_file(): matches.append(path) return matches def write_core_metadata(bundle_dir: Path, core_files: list[Path]) -> None: rows = [] for path in core_files: stat = path.stat() rows.append( { "path": str(path), "size_bytes": stat.st_size, "mtime": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), } ) write_text(bundle_dir / "core-files.json", json.dumps(rows, indent=2)) def copy_core_files(bundle_dir: Path, core_files: list[Path]) -> None: cores_dir = bundle_dir / "cores" for path in core_files: relative = path.relative_to(RUNTIME_ROOT) destination = cores_dir / relative destination.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(path, destination) def git_summary(repo_path: Path) -> dict[str, object]: summary: dict[str, object] = {"path": str(repo_path), "present": repo_path.exists()} if not repo_path.exists(): return summary head = run(["git", "-C", str(repo_path), "rev-parse", "HEAD"], check=False) status = run(["git", "-C", str(repo_path), "status", "--short"], check=False) branch = run(["git", "-C", str(repo_path), "rev-parse", "--abbrev-ref", "HEAD"], check=False) summary.update( { "head": head.stdout.strip(), "branch": branch.stdout.strip(), "dirty": bool(status.stdout.strip()), "status": status.stdout.splitlines(), } ) return summary def main() -> int: args = parse_args() ensure_root() timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") tag = sanitize_tag(args.tag) output_root = Path(args.output_root) bundle_dir = output_root / f"{timestamp}-{tag}" bundle_dir.mkdir(parents=True, exist_ok=False) os.chmod(bundle_dir, 0o700) units = [ channel_inventory.STACK_UNIT, channel_inventory.DB_UNIT, channel_inventory.DB_READY_UNIT, channel_inventory.AUTH_UNIT, *channel_inventory.get_game_units(), ] source_repo = RUNTIME_ROOT.parent.parent / "repos" / "m2dev-server-src" runtime_repo = REPO_ROOT meta = { "created_at": datetime.now(timezone.utc).isoformat(), "hostname": socket.gethostname(), "runtime_root": str(RUNTIME_ROOT), "output_root": str(output_root), "tag": tag, "since": args.since, "repos": { "m2dev-server": git_summary(runtime_repo), "m2dev-server-src": git_summary(source_repo), }, } write_text(bundle_dir / "meta.json", json.dumps(meta, indent=2)) write_command_output(bundle_dir, "uname.txt", ["uname", "-a"]) write_command_output(bundle_dir, "df.txt", ["df", "-h"]) write_command_output(bundle_dir, "free.txt", ["free", "-h"], check=False) write_command_output(bundle_dir, "ports.txt", ["ss", "-ltnp"], check=False) write_command_output(bundle_dir, "systemctl-status.txt", ["systemctl", "status", "--no-pager", *units], check=False) journal_dir = bundle_dir / "journal" for unit in units: safe_name = unit.replace("@", "_").replace(".", "_") write_command_output( journal_dir, f"{safe_name}.log", ["journalctl", "--no-pager", "--since", args.since, "-u", unit], check=False, ) copy_log_tails(bundle_dir) core_files = find_core_files() write_core_metadata(bundle_dir, core_files) if args.include_cores and core_files: copy_core_files(bundle_dir, core_files) print(bundle_dir) return 0 if __name__ == "__main__": raise SystemExit(main())