Add GM smoke compare workflow for pack profiling
Some checks failed
build / Windows Build (push) Has been cancelled

This commit is contained in:
server
2026-04-15 17:35:02 +02:00
parent db7ae1f841
commit b353339bd8
2 changed files with 766 additions and 0 deletions

View File

@@ -67,6 +67,44 @@ To run a full `pck` vs `m2p` comparison in one go:
The script captures both runs back-to-back and writes a combined compare report
into the same output directory.
## Real game-flow smoke compare
Startup-only runs are useful for bootstrap regressions, but they do not show the
real hot path once the client reaches `login`, `loading`, and `game`.
For that case, use the CH99 GM smoke compare wrapper:
```bash
python3 scripts/compare-pack-profile-gm-smoke.py \
--left-label pck-only \
--left-runtime-root /path/to/runtime-pck \
--right-label secure-mixed \
--right-runtime-root /path/to/runtime-m2p \
--master-key /path/to/master.key \
--sign-pubkey /path/to/signing.pub \
--account-login admin
```
What it does:
- copies the built client into a temporary workspace outside the repository
- stages each runtime into that workspace
- temporarily updates the selected GM account password and map position
- auto-logins through the special GM smoke channel (`11991` by default)
- enters game, performs one deterministic GM warp, archives `pack_profile.txt`
- restores the account password and the character map/position afterward
- deletes the temporary workspace unless `--keep-workspaces` is used
Archived outputs per run:
- raw report: `<out-dir>/<label>.pack_profile.txt`
- parsed summary: `<out-dir>/<label>.summary.txt`
- headless trace: `<out-dir>/<label>.headless_gm_teleport_trace.txt`
- startup trace when present: `<out-dir>/<label>.startup_trace.txt`
This flow is the current best approximation of a real client loading path on the
Linux-hosted Wine setup because it records phase markers beyond pure startup.
You can also summarize a single run:
```bash

View File

@@ -0,0 +1,728 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import secrets
import shutil
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from pathlib import Path
DEFAULT_SERVER_HOST = "173.249.9.66"
DEFAULT_AUTH_PORT = 11000
DEFAULT_CHANNEL_PORT = 11991
DEFAULT_MAP_NAME = "gm_guild_build"
DEFAULT_MAP_INDEX = 200
DEFAULT_GLOBAL_X = 96000
DEFAULT_GLOBAL_Y = 12800
ACCEPTABLE_RUN_CODES = {0, 124, 137}
@dataclass
class PlayerState:
player_id: int
name: str
map_index: int
x: int
y: int
@dataclass
class AccountState:
account_id: int
password_hash: str
player: PlayerState
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Run a real login -> game -> GM warp pack profile comparison under "
"Wine using two runtime roots."
),
)
parser.add_argument(
"--left-runtime-root",
type=Path,
required=True,
help="Runtime root for the first capture.",
)
parser.add_argument(
"--right-runtime-root",
type=Path,
required=True,
help="Runtime root for the second capture.",
)
parser.add_argument(
"--left-label",
default="left",
help="Label for the first capture.",
)
parser.add_argument(
"--right-label",
default="right",
help="Label for the second capture.",
)
parser.add_argument(
"--master-key",
type=Path,
required=True,
help="Path to the runtime master key hex file.",
)
parser.add_argument(
"--sign-pubkey",
type=Path,
required=True,
help="Path to the signing public key hex file.",
)
parser.add_argument(
"--key-id",
default="1",
help="Runtime key_id for the secure pack loader.",
)
parser.add_argument(
"--account-login",
default="admin",
help="GM account used for the smoke flow.",
)
parser.add_argument(
"--slot",
type=int,
default=0,
help="Character slot used for auto-select.",
)
parser.add_argument(
"--timeout",
type=int,
default=90,
help="Headless client timeout in seconds per capture.",
)
parser.add_argument(
"--server-host",
default=DEFAULT_SERVER_HOST,
help="Server host used in loginInfo.py.",
)
parser.add_argument(
"--auth-port",
type=int,
default=DEFAULT_AUTH_PORT,
help="Auth port used in loginInfo.py.",
)
parser.add_argument(
"--channel-port",
type=int,
default=DEFAULT_CHANNEL_PORT,
help="Channel port used in loginInfo.py.",
)
parser.add_argument(
"--map-name",
default=DEFAULT_MAP_NAME,
help="Headless GM target map name.",
)
parser.add_argument(
"--map-index",
type=int,
default=DEFAULT_MAP_INDEX,
help="Server-side map index used to place the GM character before login.",
)
parser.add_argument(
"--global-x",
type=int,
default=DEFAULT_GLOBAL_X,
help="Global X coordinate used for the initial load and GM warp target.",
)
parser.add_argument(
"--global-y",
type=int,
default=DEFAULT_GLOBAL_Y,
help="Global Y coordinate used for the initial load and GM warp target.",
)
parser.add_argument(
"--command-delay",
type=float,
default=3.0,
help="Delay before the headless GM sends the first /warp command.",
)
parser.add_argument(
"--settle-delay",
type=float,
default=1.0,
help="Delay between warp arrival and the next GM command.",
)
parser.add_argument(
"--warp-timeout",
type=float,
default=15.0,
help="Timeout per GM warp step in seconds.",
)
parser.add_argument(
"--out-dir",
type=Path,
default=None,
help="Directory for archived reports and compare output.",
)
parser.add_argument(
"--work-root",
type=Path,
default=None,
help="Directory where temporary workspaces are created.",
)
parser.add_argument(
"--keep-workspaces",
action="store_true",
help="Keep temporary workspaces for debugging instead of deleting them.",
)
parser.add_argument(
"source_build_bin_dir",
nargs="?",
default=None,
help="Source build/bin directory to copy out of the repository.",
)
return parser.parse_args()
def sanitize_label(value: str) -> str:
safe = []
prev_dash = False
for ch in value.lower():
keep = ch.isalnum() or ch in "._-"
if keep:
if ch == "-" and prev_dash:
continue
safe.append(ch)
prev_dash = ch == "-"
else:
if not prev_dash:
safe.append("-")
prev_dash = True
result = "".join(safe).strip("-")
return result or "capture"
def require_tool(name: str) -> None:
if shutil.which(name) is None:
raise SystemExit(f"{name} not found in PATH")
def run_checked(args: list[str], *, cwd: Path | None = None, env: dict[str, str] | None = None) -> subprocess.CompletedProcess[str]:
completed = subprocess.run(
args,
cwd=str(cwd) if cwd else None,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if completed.returncode != 0:
detail = completed.stderr.strip() or completed.stdout.strip()
raise RuntimeError(f"command failed ({completed.returncode}): {' '.join(args)}\n{detail}")
return completed
def run_mysql(database: str, sql: str) -> str:
completed = subprocess.run(
["mysql", "-B", "-N", database, "-e", sql],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if completed.returncode != 0:
detail = completed.stderr.strip() or completed.stdout.strip()
raise RuntimeError(f"mysql failed: {detail}")
return completed.stdout.strip()
def sql_quote(value: str) -> str:
return "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'"
def load_account_state(account_login: str, slot: int) -> AccountState:
if slot < 0 or slot > 3:
raise SystemExit(f"unsupported slot index: {slot}")
account_id_text = run_mysql(
"account",
f"SELECT id FROM account WHERE login={sql_quote(account_login)} LIMIT 1;",
)
if not account_id_text:
raise SystemExit(f"account not found: {account_login}")
account_id = int(account_id_text)
password_hash = run_mysql(
"account",
f"SELECT password FROM account WHERE login={sql_quote(account_login)} LIMIT 1;",
)
if not password_hash:
raise SystemExit(f"account password hash not found: {account_login}")
pid_text = run_mysql(
"player",
f"SELECT pid{slot + 1} FROM player_index WHERE id={account_id} LIMIT 1;",
)
if not pid_text or pid_text == "0":
raise SystemExit(f"slot {slot} has no character for account: {account_login}")
player_id = int(pid_text)
player_row = run_mysql(
"player",
f"SELECT id, name, map_index, x, y FROM player WHERE id={player_id} LIMIT 1;",
)
if not player_row:
raise SystemExit(f"player row not found for id={player_id}")
parts = player_row.split("\t")
if len(parts) != 5:
raise SystemExit(f"unexpected player row format for id={player_id}: {player_row}")
return AccountState(
account_id=account_id,
password_hash=password_hash,
player=PlayerState(
player_id=int(parts[0]),
name=parts[1],
map_index=int(parts[2]),
x=int(parts[3]),
y=int(parts[4]),
),
)
def restore_account_state(account_login: str, state: AccountState) -> None:
run_mysql(
"account",
(
"UPDATE account SET password=%s WHERE login=%s;"
% (sql_quote(state.password_hash), sql_quote(account_login))
),
)
run_mysql(
"player",
(
"UPDATE player SET map_index=%d, x=%d, y=%d WHERE id=%d;"
% (
state.player.map_index,
state.player.x,
state.player.y,
state.player.player_id,
)
),
)
def verify_account_state(account_login: str, slot: int, expected: AccountState) -> None:
current = load_account_state(account_login, slot)
if current.password_hash != expected.password_hash:
raise RuntimeError(
f"account password hash did not restore for {account_login}"
)
if (
current.player.map_index != expected.player.map_index
or current.player.x != expected.player.x
or current.player.y != expected.player.y
or current.player.player_id != expected.player.player_id
):
raise RuntimeError(
"player state did not restore for "
f"{account_login}: expected map={expected.player.map_index} "
f"x={expected.player.x} y={expected.player.y}, got "
f"map={current.player.map_index} x={current.player.x} y={current.player.y}"
)
def set_temp_account_state(
account_login: str,
player_id: int,
*,
temp_password: str,
map_index: int,
global_x: int,
global_y: int,
) -> None:
run_mysql(
"account",
(
"UPDATE account SET password=PASSWORD(%s) WHERE login=%s;"
% (sql_quote(temp_password), sql_quote(account_login))
),
)
run_mysql(
"player",
(
"UPDATE player SET map_index=%d, x=%d, y=%d WHERE id=%d;"
% (map_index, global_x, global_y, player_id)
),
)
def remove_previous_logs(build_bin_dir: Path) -> None:
log_dir = build_bin_dir / "log"
log_dir.mkdir(parents=True, exist_ok=True)
for path in log_dir.glob("*.txt"):
path.unlink(missing_ok=True)
for path in [build_bin_dir / "syserr.txt", build_bin_dir / "ErrorLog.txt", build_bin_dir / "loginInfo.py"]:
path.unlink(missing_ok=True)
def write_login_info(
build_bin_dir: Path,
*,
account_login: str,
temp_password: str,
slot: int,
server_host: str,
auth_port: int,
channel_port: int,
) -> None:
login_info = "\n".join(
[
f"addr={server_host!r}",
f"port={channel_port}",
f"account_addr={server_host!r}",
f"account_port={auth_port}",
f"id={account_login!r}",
f"pwd={temp_password!r}",
f"slot={slot}",
"autoLogin=1",
"autoSelect=1",
"",
]
)
(build_bin_dir / "loginInfo.py").write_text(login_info, encoding="utf-8")
def copy_source_bin(source_build_bin_dir: Path, workspace_bin_dir: Path) -> None:
run_checked(
[
"rsync",
"-a",
"--delete",
"--exclude",
"config",
"--exclude",
"pack",
"--exclude",
"mark",
"--exclude",
"log",
"--exclude",
"BGM",
"--exclude",
"bgm",
"--exclude",
"locale",
"--exclude",
"sound",
f"{source_build_bin_dir}/",
str(workspace_bin_dir),
]
)
def copy_if_exists(src: Path, dst: Path) -> None:
if src.exists():
shutil.copy2(src, dst)
def prepare_runtime_wrapper(runtime_root: Path, wrapper_root: Path) -> None:
if not (runtime_root / "config").exists():
raise SystemExit(f"runtime config missing: {runtime_root / 'config'}")
if not (runtime_root / "pack").exists():
raise SystemExit(f"runtime pack missing: {runtime_root / 'pack'}")
wrapper_root.mkdir(parents=True, exist_ok=True)
def link_entry(name: str, target: Path) -> None:
link_path = wrapper_root / name
if link_path.exists() or link_path.is_symlink():
link_path.unlink()
os.symlink(target, link_path)
link_entry("config", runtime_root / "config")
link_entry("pack", runtime_root / "pack")
optional_entries = {
"mark": runtime_root / "mark",
"locale": runtime_root / "locale",
"sound": runtime_root / "sound",
}
bgm_target = runtime_root / "BGM"
if not bgm_target.exists():
bgm_target = runtime_root / "bgm"
if bgm_target.exists():
optional_entries["BGM"] = bgm_target
for name, target in optional_entries.items():
if target.exists():
link_entry(name, target)
def capture_run(
*,
label: str,
runtime_root: Path,
source_build_bin_dir: Path,
out_dir: Path,
work_root: Path | None,
keep_workspace: bool,
master_key_hex: str,
sign_pubkey_hex: str,
key_id: str,
account_login: str,
slot: int,
timeout_seconds: int,
server_host: str,
auth_port: int,
channel_port: int,
map_name: str,
map_index: int,
global_x: int,
global_y: int,
command_delay: float,
settle_delay: float,
warp_timeout: float,
stage_script: Path,
run_script: Path,
parser_script: Path,
) -> tuple[Path, Path | None]:
workspace_path = Path(
tempfile.mkdtemp(
prefix=f"pack-profile-gm-smoke-{sanitize_label(label)}.",
dir=str(work_root) if work_root else None,
)
)
workspace_bin_dir = workspace_path / "bin"
workspace_runtime_dir = workspace_path / "runtime"
workspace_bin_dir.mkdir(parents=True, exist_ok=True)
account_state = load_account_state(account_login, slot)
temp_password = ("Tmp" + secrets.token_hex(6))[:16]
safe_label = sanitize_label(label)
raw_out = out_dir / f"{safe_label}.pack_profile.txt"
summary_out = out_dir / f"{safe_label}.summary.txt"
trace_out = out_dir / f"{safe_label}.headless_gm_teleport_trace.txt"
startup_out = out_dir / f"{safe_label}.startup_trace.txt"
syserr_out = out_dir / f"{safe_label}.syserr.txt"
try:
copy_source_bin(source_build_bin_dir, workspace_bin_dir)
remove_previous_logs(workspace_bin_dir)
prepare_runtime_wrapper(runtime_root, workspace_runtime_dir)
run_checked(["bash", str(stage_script), str(workspace_runtime_dir), str(workspace_bin_dir)])
write_login_info(
workspace_bin_dir,
account_login=account_login,
temp_password=temp_password,
slot=slot,
server_host=server_host,
auth_port=auth_port,
channel_port=channel_port,
)
set_temp_account_state(
account_login,
account_state.player.player_id,
temp_password=temp_password,
map_index=map_index,
global_x=global_x,
global_y=global_y,
)
env = os.environ.copy()
env.update(
{
"M2PACK_PROFILE": "1",
"M2PACK_MASTER_KEY_HEX": master_key_hex,
"M2PACK_SIGN_PUBKEY_HEX": sign_pubkey_hex,
"M2PACK_KEY_ID": key_id,
"M2_TIMEOUT": str(timeout_seconds),
"M2_HEADLESS_SCENARIO": "gm_teleport",
"M2_HEADLESS_WARP_STEPS": f"{map_name},{global_x},{global_y}",
"M2_HEADLESS_COMMAND_DELAY": str(command_delay),
"M2_HEADLESS_SETTLE_DELAY": str(settle_delay),
"M2_HEADLESS_WARP_TIMEOUT": str(warp_timeout),
}
)
env.setdefault("WINEDEBUG", "-all")
completed = subprocess.run(
["bash", str(run_script), str(workspace_bin_dir)],
cwd=str(workspace_bin_dir),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
report_path = workspace_bin_dir / "log" / "pack_profile.txt"
if not report_path.is_file():
detail = completed.stderr.strip() or completed.stdout.strip()
raise RuntimeError(f"pack profile report not generated for {label}: {detail}")
trace_path = workspace_bin_dir / "log" / "headless_gm_teleport_trace.txt"
startup_path = workspace_bin_dir / "log" / "startup_trace.txt"
syserr_path = workspace_bin_dir / "syserr.txt"
shutil.copy2(report_path, raw_out)
copy_if_exists(trace_path, trace_out)
copy_if_exists(startup_path, startup_out)
copy_if_exists(syserr_path, syserr_out)
summary = run_checked([sys.executable, str(parser_script), str(raw_out)])
summary_out.write_text(summary.stdout, encoding="utf-8")
trace_text = trace_path.read_text(encoding="utf-8", errors="ignore") if trace_path.exists() else ""
scenario_ok = "Scenario success current_map=" in trace_text
if completed.returncode not in ACCEPTABLE_RUN_CODES:
detail = completed.stderr.strip() or completed.stdout.strip()
raise RuntimeError(
f"client run for {label} exited with {completed.returncode}: {detail}"
)
if not scenario_ok:
raise RuntimeError(
f"GM smoke scenario for {label} did not finish successfully; "
f"see {trace_out if trace_out.exists() else trace_path}"
)
return raw_out, (workspace_path if keep_workspace else None)
finally:
try:
restore_account_state(account_login, account_state)
verify_account_state(account_login, slot, account_state)
finally:
if not keep_workspace:
shutil.rmtree(workspace_path, ignore_errors=True)
def main() -> int:
args = parse_args()
require_tool("mysql")
require_tool("rsync")
require_tool("python3")
script_dir = Path(__file__).resolve().parent
repo_root = script_dir.parent
source_build_bin_dir = (
Path(args.source_build_bin_dir).resolve()
if args.source_build_bin_dir
else (repo_root / "build-mingw64-lld" / "bin").resolve()
)
if not (source_build_bin_dir / "Metin2_RelWithDebInfo.exe").is_file():
raise SystemExit(f"client exe not found: {source_build_bin_dir / 'Metin2_RelWithDebInfo.exe'}")
stage_script = script_dir / "stage-linux-runtime.sh"
run_script = script_dir / "run-wine-headless.sh"
parser_script = script_dir / "pack-profile-report.py"
for path, kind in [
(stage_script, "runtime stage script"),
(run_script, "headless run script"),
(parser_script, "pack profile parser"),
]:
if not path.exists():
raise SystemExit(f"{kind} not found: {path}")
out_dir = (
args.out_dir.resolve()
if args.out_dir
else (source_build_bin_dir / "log" / "pack-profile-gm-smoke" / next(tempfile._get_candidate_names())).resolve()
)
out_dir.mkdir(parents=True, exist_ok=True)
work_root = args.work_root.resolve() if args.work_root else None
if work_root:
work_root.mkdir(parents=True, exist_ok=True)
master_key_hex = args.master_key.read_text(encoding="utf-8").strip()
sign_pubkey_hex = args.sign_pubkey.read_text(encoding="utf-8").strip()
if not master_key_hex:
raise SystemExit(f"master key file is empty: {args.master_key}")
if not sign_pubkey_hex:
raise SystemExit(f"sign pubkey file is empty: {args.sign_pubkey}")
left_report, left_workspace = capture_run(
label=args.left_label,
runtime_root=args.left_runtime_root.resolve(),
source_build_bin_dir=source_build_bin_dir,
out_dir=out_dir,
work_root=work_root,
keep_workspace=args.keep_workspaces,
master_key_hex=master_key_hex,
sign_pubkey_hex=sign_pubkey_hex,
key_id=args.key_id,
account_login=args.account_login,
slot=args.slot,
timeout_seconds=args.timeout,
server_host=args.server_host,
auth_port=args.auth_port,
channel_port=args.channel_port,
map_name=args.map_name,
map_index=args.map_index,
global_x=args.global_x,
global_y=args.global_y,
command_delay=args.command_delay,
settle_delay=args.settle_delay,
warp_timeout=args.warp_timeout,
stage_script=stage_script,
run_script=run_script,
parser_script=parser_script,
)
right_report, right_workspace = capture_run(
label=args.right_label,
runtime_root=args.right_runtime_root.resolve(),
source_build_bin_dir=source_build_bin_dir,
out_dir=out_dir,
work_root=work_root,
keep_workspace=args.keep_workspaces,
master_key_hex=master_key_hex,
sign_pubkey_hex=sign_pubkey_hex,
key_id=args.key_id,
account_login=args.account_login,
slot=args.slot,
timeout_seconds=args.timeout,
server_host=args.server_host,
auth_port=args.auth_port,
channel_port=args.channel_port,
map_name=args.map_name,
map_index=args.map_index,
global_x=args.global_x,
global_y=args.global_y,
command_delay=args.command_delay,
settle_delay=args.settle_delay,
warp_timeout=args.warp_timeout,
stage_script=stage_script,
run_script=run_script,
parser_script=parser_script,
)
compare = run_checked(
[
sys.executable,
str(parser_script),
f"{args.left_label}={left_report}",
f"{args.right_label}={right_report}",
]
)
compare_path = out_dir / (
f"compare-{sanitize_label(args.left_label)}-vs-{sanitize_label(args.right_label)}.txt"
)
compare_path.write_text(compare.stdout, encoding="utf-8")
print(compare.stdout, end="")
print()
print(f"saved compare report: {compare_path}")
if left_workspace:
print(f"kept workspace: {left_workspace}")
if right_workspace:
print(f"kept workspace: {right_workspace}")
return 0
if __name__ == "__main__":
sys.exit(main())