#!/usr/bin/env python3 from __future__ import annotations import argparse import os import secrets import shutil import subprocess import sys import tempfile from dataclasses import dataclass from pathlib import Path DEFAULT_SERVER_HOST = "173.249.9.66" DEFAULT_AUTH_PORT = 11000 DEFAULT_CHANNEL_PORT = 11991 DEFAULT_MAP_NAME = "gm_guild_build" DEFAULT_MAP_INDEX = 200 DEFAULT_GLOBAL_X = 96000 DEFAULT_GLOBAL_Y = 12800 ACCEPTABLE_RUN_CODES = {0, 124, 137} @dataclass class PlayerState: player_id: int name: str map_index: int x: int y: int @dataclass class AccountState: account_id: int password_hash: str player: PlayerState def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Run a real login -> game -> GM warp pack profile comparison under " "Wine using two runtime roots." ), ) parser.add_argument( "--left-runtime-root", type=Path, required=True, help="Runtime root for the first capture.", ) parser.add_argument( "--right-runtime-root", type=Path, required=True, help="Runtime root for the second capture.", ) parser.add_argument( "--left-label", default="left", help="Label for the first capture.", ) parser.add_argument( "--right-label", default="right", help="Label for the second capture.", ) parser.add_argument( "--master-key", type=Path, required=True, help="Path to the runtime master key hex file.", ) parser.add_argument( "--sign-pubkey", type=Path, required=True, help="Path to the signing public key hex file.", ) parser.add_argument( "--key-id", default="1", help="Runtime key_id for the secure pack loader.", ) parser.add_argument( "--account-login", default="admin", help="GM account used for the smoke flow.", ) parser.add_argument( "--slot", type=int, default=0, help="Character slot used for auto-select.", ) parser.add_argument( "--timeout", type=int, default=90, help="Headless client timeout in seconds per capture.", ) parser.add_argument( "--server-host", default=DEFAULT_SERVER_HOST, help="Server host used in loginInfo.py.", ) parser.add_argument( "--auth-port", type=int, default=DEFAULT_AUTH_PORT, help="Auth port used in loginInfo.py.", ) parser.add_argument( "--channel-port", type=int, default=DEFAULT_CHANNEL_PORT, help="Channel port used in loginInfo.py.", ) parser.add_argument( "--map-name", default=DEFAULT_MAP_NAME, help="Headless GM target map name.", ) parser.add_argument( "--map-index", type=int, default=DEFAULT_MAP_INDEX, help="Server-side map index used to place the GM character before login.", ) parser.add_argument( "--global-x", type=int, default=DEFAULT_GLOBAL_X, help="Global X coordinate used for the initial load and GM warp target.", ) parser.add_argument( "--global-y", type=int, default=DEFAULT_GLOBAL_Y, help="Global Y coordinate used for the initial load and GM warp target.", ) parser.add_argument( "--command-delay", type=float, default=3.0, help="Delay before the headless GM sends the first /warp command.", ) parser.add_argument( "--settle-delay", type=float, default=1.0, help="Delay between warp arrival and the next GM command.", ) parser.add_argument( "--warp-timeout", type=float, default=15.0, help="Timeout per GM warp step in seconds.", ) parser.add_argument( "--out-dir", type=Path, default=None, help="Directory for archived reports and compare output.", ) parser.add_argument( "--work-root", type=Path, default=None, help="Directory where temporary workspaces are created.", ) parser.add_argument( "--keep-workspaces", action="store_true", help="Keep temporary workspaces for debugging instead of deleting them.", ) parser.add_argument( "source_build_bin_dir", nargs="?", default=None, help="Source build/bin directory to copy out of the repository.", ) return parser.parse_args() def sanitize_label(value: str) -> str: safe = [] prev_dash = False for ch in value.lower(): keep = ch.isalnum() or ch in "._-" if keep: if ch == "-" and prev_dash: continue safe.append(ch) prev_dash = ch == "-" else: if not prev_dash: safe.append("-") prev_dash = True result = "".join(safe).strip("-") return result or "capture" def require_tool(name: str) -> None: if shutil.which(name) is None: raise SystemExit(f"{name} not found in PATH") def run_checked(args: list[str], *, cwd: Path | None = None, env: dict[str, str] | None = None) -> subprocess.CompletedProcess[str]: completed = subprocess.run( args, cwd=str(cwd) if cwd else None, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) if completed.returncode != 0: detail = completed.stderr.strip() or completed.stdout.strip() raise RuntimeError(f"command failed ({completed.returncode}): {' '.join(args)}\n{detail}") return completed def run_mysql(database: str, sql: str) -> str: completed = subprocess.run( ["mysql", "-B", "-N", database, "-e", sql], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) if completed.returncode != 0: detail = completed.stderr.strip() or completed.stdout.strip() raise RuntimeError(f"mysql failed: {detail}") return completed.stdout.strip() def sql_quote(value: str) -> str: return "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" def load_account_state(account_login: str, slot: int) -> AccountState: if slot < 0 or slot > 3: raise SystemExit(f"unsupported slot index: {slot}") account_id_text = run_mysql( "account", f"SELECT id FROM account WHERE login={sql_quote(account_login)} LIMIT 1;", ) if not account_id_text: raise SystemExit(f"account not found: {account_login}") account_id = int(account_id_text) password_hash = run_mysql( "account", f"SELECT password FROM account WHERE login={sql_quote(account_login)} LIMIT 1;", ) if not password_hash: raise SystemExit(f"account password hash not found: {account_login}") pid_text = run_mysql( "player", f"SELECT pid{slot + 1} FROM player_index WHERE id={account_id} LIMIT 1;", ) if not pid_text or pid_text == "0": raise SystemExit(f"slot {slot} has no character for account: {account_login}") player_id = int(pid_text) player_row = run_mysql( "player", f"SELECT id, name, map_index, x, y FROM player WHERE id={player_id} LIMIT 1;", ) if not player_row: raise SystemExit(f"player row not found for id={player_id}") parts = player_row.split("\t") if len(parts) != 5: raise SystemExit(f"unexpected player row format for id={player_id}: {player_row}") return AccountState( account_id=account_id, password_hash=password_hash, player=PlayerState( player_id=int(parts[0]), name=parts[1], map_index=int(parts[2]), x=int(parts[3]), y=int(parts[4]), ), ) def restore_account_state(account_login: str, state: AccountState) -> None: run_mysql( "account", ( "UPDATE account SET password=%s WHERE login=%s;" % (sql_quote(state.password_hash), sql_quote(account_login)) ), ) run_mysql( "player", ( "UPDATE player SET map_index=%d, x=%d, y=%d WHERE id=%d;" % ( state.player.map_index, state.player.x, state.player.y, state.player.player_id, ) ), ) def verify_account_state(account_login: str, slot: int, expected: AccountState) -> None: current = load_account_state(account_login, slot) if current.password_hash != expected.password_hash: raise RuntimeError( f"account password hash did not restore for {account_login}" ) if ( current.player.map_index != expected.player.map_index or current.player.x != expected.player.x or current.player.y != expected.player.y or current.player.player_id != expected.player.player_id ): raise RuntimeError( "player state did not restore for " f"{account_login}: expected map={expected.player.map_index} " f"x={expected.player.x} y={expected.player.y}, got " f"map={current.player.map_index} x={current.player.x} y={current.player.y}" ) def set_temp_account_state( account_login: str, player_id: int, *, temp_password: str, map_index: int, global_x: int, global_y: int, ) -> None: run_mysql( "account", ( "UPDATE account SET password=PASSWORD(%s) WHERE login=%s;" % (sql_quote(temp_password), sql_quote(account_login)) ), ) run_mysql( "player", ( "UPDATE player SET map_index=%d, x=%d, y=%d WHERE id=%d;" % (map_index, global_x, global_y, player_id) ), ) def remove_previous_logs(build_bin_dir: Path) -> None: log_dir = build_bin_dir / "log" log_dir.mkdir(parents=True, exist_ok=True) for path in log_dir.glob("*.txt"): path.unlink(missing_ok=True) for path in [build_bin_dir / "syserr.txt", build_bin_dir / "ErrorLog.txt", build_bin_dir / "loginInfo.py"]: path.unlink(missing_ok=True) def write_login_info( build_bin_dir: Path, *, account_login: str, temp_password: str, slot: int, server_host: str, auth_port: int, channel_port: int, ) -> None: login_info = "\n".join( [ f"addr={server_host!r}", f"port={channel_port}", f"account_addr={server_host!r}", f"account_port={auth_port}", f"id={account_login!r}", f"pwd={temp_password!r}", f"slot={slot}", "autoLogin=1", "autoSelect=1", "", ] ) (build_bin_dir / "loginInfo.py").write_text(login_info, encoding="utf-8") def copy_source_bin(source_build_bin_dir: Path, workspace_bin_dir: Path) -> None: run_checked( [ "rsync", "-a", "--delete", "--exclude", "config", "--exclude", "pack", "--exclude", "mark", "--exclude", "log", "--exclude", "BGM", "--exclude", "bgm", "--exclude", "locale", "--exclude", "sound", f"{source_build_bin_dir}/", str(workspace_bin_dir), ] ) def copy_if_exists(src: Path, dst: Path) -> None: if src.exists(): shutil.copy2(src, dst) def prepare_runtime_wrapper(runtime_root: Path, wrapper_root: Path) -> None: if not (runtime_root / "config").exists(): raise SystemExit(f"runtime config missing: {runtime_root / 'config'}") if not (runtime_root / "pack").exists(): raise SystemExit(f"runtime pack missing: {runtime_root / 'pack'}") wrapper_root.mkdir(parents=True, exist_ok=True) def link_entry(name: str, target: Path) -> None: link_path = wrapper_root / name if link_path.exists() or link_path.is_symlink(): link_path.unlink() os.symlink(target, link_path) link_entry("config", runtime_root / "config") link_entry("pack", runtime_root / "pack") optional_entries = { "mark": runtime_root / "mark", "locale": runtime_root / "locale", "sound": runtime_root / "sound", } bgm_target = runtime_root / "BGM" if not bgm_target.exists(): bgm_target = runtime_root / "bgm" if bgm_target.exists(): optional_entries["BGM"] = bgm_target for name, target in optional_entries.items(): if target.exists(): link_entry(name, target) def capture_run( *, label: str, runtime_root: Path, source_build_bin_dir: Path, out_dir: Path, work_root: Path | None, keep_workspace: bool, master_key_hex: str, sign_pubkey_hex: str, key_id: str, account_login: str, slot: int, timeout_seconds: int, server_host: str, auth_port: int, channel_port: int, map_name: str, map_index: int, global_x: int, global_y: int, command_delay: float, settle_delay: float, warp_timeout: float, stage_script: Path, run_script: Path, parser_script: Path, ) -> tuple[Path, Path | None]: workspace_path = Path( tempfile.mkdtemp( prefix=f"pack-profile-gm-smoke-{sanitize_label(label)}.", dir=str(work_root) if work_root else None, ) ) workspace_bin_dir = workspace_path / "bin" workspace_runtime_dir = workspace_path / "runtime" workspace_bin_dir.mkdir(parents=True, exist_ok=True) account_state = load_account_state(account_login, slot) temp_password = ("Tmp" + secrets.token_hex(6))[:16] safe_label = sanitize_label(label) raw_out = out_dir / f"{safe_label}.pack_profile.txt" summary_out = out_dir / f"{safe_label}.summary.txt" trace_out = out_dir / f"{safe_label}.headless_gm_teleport_trace.txt" startup_out = out_dir / f"{safe_label}.startup_trace.txt" syserr_out = out_dir / f"{safe_label}.syserr.txt" try: copy_source_bin(source_build_bin_dir, workspace_bin_dir) remove_previous_logs(workspace_bin_dir) prepare_runtime_wrapper(runtime_root, workspace_runtime_dir) run_checked(["bash", str(stage_script), str(workspace_runtime_dir), str(workspace_bin_dir)]) write_login_info( workspace_bin_dir, account_login=account_login, temp_password=temp_password, slot=slot, server_host=server_host, auth_port=auth_port, channel_port=channel_port, ) set_temp_account_state( account_login, account_state.player.player_id, temp_password=temp_password, map_index=map_index, global_x=global_x, global_y=global_y, ) env = os.environ.copy() env.update( { "M2PACK_PROFILE": "1", "M2PACK_MASTER_KEY_HEX": master_key_hex, "M2PACK_SIGN_PUBKEY_HEX": sign_pubkey_hex, "M2PACK_KEY_ID": key_id, "M2_TIMEOUT": str(timeout_seconds), "M2_HEADLESS_SCENARIO": "gm_teleport", "M2_HEADLESS_WARP_STEPS": f"{map_name},{global_x},{global_y}", "M2_HEADLESS_COMMAND_DELAY": str(command_delay), "M2_HEADLESS_SETTLE_DELAY": str(settle_delay), "M2_HEADLESS_WARP_TIMEOUT": str(warp_timeout), } ) env.setdefault("WINEDEBUG", "-all") completed = subprocess.run( ["bash", str(run_script), str(workspace_bin_dir)], cwd=str(workspace_bin_dir), env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) report_path = workspace_bin_dir / "log" / "pack_profile.txt" if not report_path.is_file(): detail = completed.stderr.strip() or completed.stdout.strip() raise RuntimeError(f"pack profile report not generated for {label}: {detail}") trace_path = workspace_bin_dir / "log" / "headless_gm_teleport_trace.txt" startup_path = workspace_bin_dir / "log" / "startup_trace.txt" syserr_path = workspace_bin_dir / "syserr.txt" shutil.copy2(report_path, raw_out) copy_if_exists(trace_path, trace_out) copy_if_exists(startup_path, startup_out) copy_if_exists(syserr_path, syserr_out) summary = run_checked([sys.executable, str(parser_script), str(raw_out)]) summary_out.write_text(summary.stdout, encoding="utf-8") trace_text = trace_path.read_text(encoding="utf-8", errors="ignore") if trace_path.exists() else "" scenario_ok = "Scenario success current_map=" in trace_text if completed.returncode not in ACCEPTABLE_RUN_CODES: detail = completed.stderr.strip() or completed.stdout.strip() raise RuntimeError( f"client run for {label} exited with {completed.returncode}: {detail}" ) if not scenario_ok: raise RuntimeError( f"GM smoke scenario for {label} did not finish successfully; " f"see {trace_out if trace_out.exists() else trace_path}" ) return raw_out, (workspace_path if keep_workspace else None) finally: try: restore_account_state(account_login, account_state) verify_account_state(account_login, slot, account_state) finally: if not keep_workspace: shutil.rmtree(workspace_path, ignore_errors=True) def main() -> int: args = parse_args() require_tool("mysql") require_tool("rsync") require_tool("python3") script_dir = Path(__file__).resolve().parent repo_root = script_dir.parent source_build_bin_dir = ( Path(args.source_build_bin_dir).resolve() if args.source_build_bin_dir else (repo_root / "build-mingw64-lld" / "bin").resolve() ) if not (source_build_bin_dir / "Metin2_RelWithDebInfo.exe").is_file(): raise SystemExit(f"client exe not found: {source_build_bin_dir / 'Metin2_RelWithDebInfo.exe'}") stage_script = script_dir / "stage-linux-runtime.sh" run_script = script_dir / "run-wine-headless.sh" parser_script = script_dir / "pack-profile-report.py" for path, kind in [ (stage_script, "runtime stage script"), (run_script, "headless run script"), (parser_script, "pack profile parser"), ]: if not path.exists(): raise SystemExit(f"{kind} not found: {path}") out_dir = ( args.out_dir.resolve() if args.out_dir else (source_build_bin_dir / "log" / "pack-profile-gm-smoke" / next(tempfile._get_candidate_names())).resolve() ) out_dir.mkdir(parents=True, exist_ok=True) work_root = args.work_root.resolve() if args.work_root else None if work_root: work_root.mkdir(parents=True, exist_ok=True) master_key_hex = args.master_key.read_text(encoding="utf-8").strip() sign_pubkey_hex = args.sign_pubkey.read_text(encoding="utf-8").strip() if not master_key_hex: raise SystemExit(f"master key file is empty: {args.master_key}") if not sign_pubkey_hex: raise SystemExit(f"sign pubkey file is empty: {args.sign_pubkey}") left_report, left_workspace = capture_run( label=args.left_label, runtime_root=args.left_runtime_root.resolve(), source_build_bin_dir=source_build_bin_dir, out_dir=out_dir, work_root=work_root, keep_workspace=args.keep_workspaces, master_key_hex=master_key_hex, sign_pubkey_hex=sign_pubkey_hex, key_id=args.key_id, account_login=args.account_login, slot=args.slot, timeout_seconds=args.timeout, server_host=args.server_host, auth_port=args.auth_port, channel_port=args.channel_port, map_name=args.map_name, map_index=args.map_index, global_x=args.global_x, global_y=args.global_y, command_delay=args.command_delay, settle_delay=args.settle_delay, warp_timeout=args.warp_timeout, stage_script=stage_script, run_script=run_script, parser_script=parser_script, ) right_report, right_workspace = capture_run( label=args.right_label, runtime_root=args.right_runtime_root.resolve(), source_build_bin_dir=source_build_bin_dir, out_dir=out_dir, work_root=work_root, keep_workspace=args.keep_workspaces, master_key_hex=master_key_hex, sign_pubkey_hex=sign_pubkey_hex, key_id=args.key_id, account_login=args.account_login, slot=args.slot, timeout_seconds=args.timeout, server_host=args.server_host, auth_port=args.auth_port, channel_port=args.channel_port, map_name=args.map_name, map_index=args.map_index, global_x=args.global_x, global_y=args.global_y, command_delay=args.command_delay, settle_delay=args.settle_delay, warp_timeout=args.warp_timeout, stage_script=stage_script, run_script=run_script, parser_script=parser_script, ) compare = run_checked( [ sys.executable, str(parser_script), f"{args.left_label}={left_report}", f"{args.right_label}={right_report}", ] ) compare_path = out_dir / ( f"compare-{sanitize_label(args.left_label)}-vs-{sanitize_label(args.right_label)}.txt" ) compare_path.write_text(compare.stdout, encoding="utf-8") print(compare.stdout, end="") print() print(f"saved compare report: {compare_path}") if left_workspace: print(f"kept workspace: {left_workspace}") if right_workspace: print(f"kept workspace: {right_workspace}") return 0 if __name__ == "__main__": sys.exit(main())