Files
m2pack-secure/scripts/gm_teleport_smoke_wine.py
server 9bbcb67351
Some checks failed
ci / headless-e2e (push) Has been cancelled
runtime-self-hosted / runtime-ci (push) Has been cancelled
Add Wine smoke runners and clear runtime baseline
2026-04-14 21:22:50 +02:00

576 lines
20 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import secrets
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_BINARY = REPO_ROOT / "build" / "m2pack"
TERRAIN_SIZE = 25600
PASS_MAX_NUM = 16
DEFAULT_ACCOUNT_LOGIN = "admin"
DEFAULT_MAPS = [
"metin2_map_a1",
"metin2_map_a3",
"metin2_map_trent",
"metin2_map_trent02",
]
DEFAULT_PACKS = [
"root",
"patch1",
"patch2",
"season3_eu",
"metin2_patch_snow",
"metin2_patch_snow_dungeon",
"metin2_patch_etc_costume1",
"metin2_patch_pet1",
"metin2_patch_pet2",
"metin2_patch_ramadan_costume",
"metin2_patch_flame",
"metin2_patch_flame_dungeon",
"locale",
"uiscript",
"uiloading",
"ETC",
"item",
"effect",
"icon",
"property",
"terrain",
"tree",
"zone",
"outdoora1",
"outdoora2",
"outdoorb1",
"outdoorc1",
"outdoorsnow1",
"pc",
"pc2",
"guild",
"npc",
"monster2",
"sound",
"sound_m",
"sound2",
"monster",
"npc2",
"textureset",
"outdoora3",
"outdoorb3",
"outdoorc3",
"outdoordesert1",
"outdoorflame1",
"outdoorfielddungeon1",
]
@dataclass
class AtlasEntry:
name: str
base_x: int
base_y: int
width: int
height: int
@property
def center_x(self) -> int:
return self.base_x + (self.width * TERRAIN_SIZE) // 2
@property
def center_y(self) -> int:
return self.base_y + (self.height * TERRAIN_SIZE) // 2
@dataclass
class MapStep:
name: str
global_x: int
global_y: int
@property
def meter_x(self) -> int:
return self.global_x // 100
@property
def meter_y(self) -> int:
return self.global_y // 100
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run a headless Wine GM teleport smoke scenario with real login and map warps."
)
parser.add_argument("--runtime-root", type=Path, required=True, help="Client runtime root containing assets/, config/, and pack/.")
parser.add_argument("--client-repo", type=Path, required=True, help="Path to m2dev-client-src checkout.")
parser.add_argument("--content-repo", type=Path, required=True, help="Path to m2dev-client asset checkout used to rebuild root.m2p.")
parser.add_argument("--master-key", type=Path, required=True, help="Path to m2pack runtime master key file.")
parser.add_argument("--sign-secret-key", type=Path, required=True, help="Path to m2pack signing secret key used to rebuild root.m2p.")
parser.add_argument("--key-id", type=str, default="1", help="Runtime key_id to inject into the client and archive build.")
parser.add_argument("--timeout", type=int, default=90, help="Wine run timeout in seconds.")
parser.add_argument("--server-host", type=str, default=None, help="Server host override. Defaults to runtime assets/root/serverinfo.py.")
parser.add_argument("--auth-port", type=int, default=None, help="Auth port override. Defaults to runtime assets/root/serverinfo.py.")
parser.add_argument("--channel-port", type=int, default=None, help="Channel port override. Defaults to runtime assets/root/serverinfo.py.")
parser.add_argument("--account-login", type=str, default=DEFAULT_ACCOUNT_LOGIN, help="Login name for the GM account.")
parser.add_argument("--slot", type=int, default=0, help="Character slot to auto-select.")
parser.add_argument("--command-delay", type=float, default=5.0, help="Delay before sending the first GM command after entering game.")
parser.add_argument("--settle-delay", type=float, default=2.0, help="Delay between successful warp arrival and the next warp command.")
parser.add_argument("--warp-timeout", type=float, default=20.0, help="Timeout per warp command in seconds.")
parser.add_argument("--map", dest="maps", action="append", default=[], help="Atlas map name to visit. Repeat for multiple maps.")
parser.add_argument("--pack", dest="packs", action="append", default=[], help="Pack basename to activate as .m2p. Repeat for multiple packs.")
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON result.")
return parser.parse_args()
def resolve_binary() -> Path:
env_binary = os.environ.get("M2PACK_BINARY")
if env_binary:
binary = Path(env_binary)
else:
binary = DEFAULT_BINARY
if not binary.is_file():
raise SystemExit(f"m2pack binary not found: {binary}")
return binary
def tail_lines(path: Path, count: int = 10) -> list[str]:
if not path.exists():
return []
return path.read_text(encoding="utf-8", errors="ignore").strip().splitlines()[-count:]
def sql_quote(value: str) -> str:
return "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'"
def run_mysql(sql: str, database: str = "account") -> str:
completed = subprocess.run(
["mysql", "-B", "-N", database, "-e", sql],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if completed.returncode != 0:
detail = completed.stderr.strip() or completed.stdout.strip()
raise RuntimeError(f"mysql failed: {detail}")
return completed.stdout.strip()
def load_character_state(account_login: str, slot: int) -> dict[str, object] | None:
if slot < 0 or slot > 3:
raise SystemExit(f"unsupported character slot: {slot}")
account_id = run_mysql(
f"SELECT id FROM account WHERE login={sql_quote(account_login)} LIMIT 1;"
)
if not account_id:
return None
pid = run_mysql(
f"SELECT pid{slot + 1} FROM player_index WHERE id={account_id} LIMIT 1;",
database="player",
)
if not pid or pid == "0":
return None
row = run_mysql(
f"SELECT id, name, map_index, x, y FROM player WHERE id={pid} LIMIT 1;",
database="player",
)
if not row:
return None
parts = row.split("\t")
if len(parts) != 5:
return None
return {
"id": int(parts[0]),
"name": parts[1],
"map_index": int(parts[2]),
"x": int(parts[3]),
"y": int(parts[4]),
}
def restore_character_state(state: dict[str, object]) -> None:
run_mysql(
"UPDATE player SET map_index=%d, x=%d, y=%d WHERE id=%d;"
% (state["map_index"], state["x"], state["y"], state["id"]),
database="player",
)
def activate_pack(pack_dir: Path, pack_name: str, moved: list[tuple[Path, Path]]) -> None:
offsingle = pack_dir / f"{pack_name}.m2p.offsingle"
m2p = pack_dir / f"{pack_name}.m2p"
legacy = pack_dir / f"{pack_name}.pck"
legacy_backup = pack_dir / f"{pack_name}.pck.testbak"
if offsingle.exists():
offsingle.rename(m2p)
moved.append((m2p, offsingle))
if legacy.exists():
legacy.rename(legacy_backup)
moved.append((legacy_backup, legacy))
def restore_moves(moved: list[tuple[Path, Path]]) -> None:
for src, dst in reversed(moved):
if src.exists():
src.rename(dst)
def load_server_defaults(runtime_root: Path) -> tuple[str, int, int]:
server_info_path = runtime_root / "assets" / "root" / "serverinfo.py"
defaults = {
"SERVER_IP": "173.249.9.66",
"PORT_AUTH": 11000,
"PORT_CH1": 11011,
}
if not server_info_path.is_file():
return defaults["SERVER_IP"], defaults["PORT_AUTH"], defaults["PORT_CH1"]
namespace: dict[str, object] = {}
exec(compile(server_info_path.read_text(encoding="utf-8", errors="ignore"), str(server_info_path), "exec"), namespace)
host = str(namespace.get("SERVER_IP", defaults["SERVER_IP"]))
auth_port = int(namespace.get("PORT_AUTH", defaults["PORT_AUTH"]))
channel_port = int(namespace.get("PORT_CH1", defaults["PORT_CH1"]))
return host, auth_port, channel_port
def load_atlas_entry(runtime_root: Path, map_name: str) -> AtlasEntry:
atlas_path = runtime_root / "assets" / "root" / "atlasinfo.txt"
if not atlas_path.is_file():
raise SystemExit(f"atlasinfo not found: {atlas_path}")
for raw_line in atlas_path.read_text(encoding="utf-8", errors="ignore").splitlines():
parts = raw_line.split()
if len(parts) < 5:
continue
name, base_x, base_y, width, height = parts[:5]
if name != map_name:
continue
try:
return AtlasEntry(name=name, base_x=int(base_x), base_y=int(base_y), width=int(width), height=int(height))
except ValueError as exc:
raise SystemExit(f"invalid atlas entry for {map_name}: {raw_line}") from exc
raise SystemExit(f"map not found in atlasinfo.txt: {map_name}")
def load_steps(runtime_root: Path, map_names: list[str]) -> list[MapStep]:
steps: list[MapStep] = []
for map_name in map_names:
atlas = load_atlas_entry(runtime_root, map_name)
steps.append(MapStep(name=map_name, global_x=atlas.center_x, global_y=atlas.center_y))
return steps
def serialize_steps(steps: list[MapStep]) -> str:
return "|".join(f"{step.name},{step.global_x},{step.global_y}" for step in steps)
def rebuild_root_pack(
binary: Path,
content_repo: Path,
pack_dir: Path,
master_key: Path,
sign_secret_key: Path,
key_id: str,
) -> dict[str, object]:
source_root = content_repo / "assets" / "root"
target_root = pack_dir / "root.m2p"
if not source_root.is_dir():
raise SystemExit(f"content root assets not found: {source_root}")
fd, tmp_name = tempfile.mkstemp(prefix="root-gm-teleport-", suffix=".m2p", dir=str(pack_dir))
os.close(fd)
tmp_archive = Path(tmp_name)
try:
completed = subprocess.run(
[
str(binary),
"build",
"--input",
str(source_root),
"--output",
str(tmp_archive),
"--key",
str(master_key),
"--sign-secret-key",
str(sign_secret_key),
"--key-id",
str(key_id),
"--json",
],
cwd=REPO_ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if completed.returncode != 0:
detail = completed.stderr.strip() or completed.stdout.strip()
raise RuntimeError(f"m2pack build failed: {detail}")
build_json = None
stdout_text = completed.stdout.strip()
if stdout_text:
try:
build_json = json.loads(stdout_text)
except json.JSONDecodeError:
build_json = None
tmp_archive.replace(target_root)
return {
"ok": True,
"stdout_tail": stdout_text.splitlines()[-10:] if stdout_text else [],
"stderr_tail": completed.stderr.strip().splitlines()[-10:] if completed.stderr.strip() else [],
"json": build_json,
}
finally:
tmp_archive.unlink(missing_ok=True)
def write_login_info(
login_path: Path,
server_host: str,
auth_port: int,
channel_port: int,
account_login: str,
temp_password: str,
slot: int,
) -> tuple[Path | None, list[str]]:
backup_path = None
cleanup: list[str] = []
if login_path.exists():
backup_path = login_path.with_name(login_path.name + ".gmteleportbak")
login_path.rename(backup_path)
cleanup.append(str(backup_path))
login_payload = "\n".join(
[
f"addr={server_host!r}",
f"port={channel_port}",
f"account_addr={server_host!r}",
f"account_port={auth_port}",
f"id={account_login!r}",
f"pwd={temp_password!r}",
f"slot={slot}",
"autoLogin=1",
"autoSelect=1",
"",
]
)
login_path.write_text(login_payload, encoding="utf-8")
return backup_path, cleanup
def restore_login_info(login_path: Path, backup_path: Path | None) -> None:
login_path.unlink(missing_ok=True)
if backup_path and backup_path.exists():
backup_path.rename(login_path)
def main() -> int:
args = parse_args()
runtime_root = args.runtime_root.resolve()
client_repo = args.client_repo.resolve()
content_repo = args.content_repo.resolve()
pack_dir = runtime_root / "pack"
run_script = client_repo / "scripts" / "run-wine-headless.sh"
build_bin = client_repo / "build-mingw64-lld" / "bin"
log_dir = build_bin / "log"
binary = resolve_binary()
if not pack_dir.is_dir():
raise SystemExit(f"runtime pack dir not found: {pack_dir}")
if not run_script.is_file():
raise SystemExit(f"wine runner not found: {run_script}")
if not (build_bin / "Metin2_RelWithDebInfo.exe").is_file():
raise SystemExit(f"client binary not found: {build_bin / 'Metin2_RelWithDebInfo.exe'}")
default_host, default_auth_port, default_channel_port = load_server_defaults(runtime_root)
server_host = args.server_host or default_host
auth_port = args.auth_port or default_auth_port
channel_port = args.channel_port or default_channel_port
map_names = args.maps or list(DEFAULT_MAPS)
packs = args.packs or list(DEFAULT_PACKS)
steps = load_steps(runtime_root, map_names)
temp_password = ("Tmp" + secrets.token_hex(6))[:PASS_MAX_NUM]
login_path = build_bin / "loginInfo.py"
backup_login_path: Path | None = None
moved: list[tuple[Path, Path]] = []
original_password_hash = ""
build_result: dict[str, object] | None = None
original_character_state = load_character_state(args.account_login, args.slot)
try:
build_result = rebuild_root_pack(
binary=binary,
content_repo=content_repo,
pack_dir=pack_dir,
master_key=args.master_key.resolve(),
sign_secret_key=args.sign_secret_key.resolve(),
key_id=args.key_id,
)
for pack_name in packs:
activate_pack(pack_dir, pack_name, moved)
log_dir.mkdir(parents=True, exist_ok=True)
for file_path in log_dir.glob("*.txt"):
file_path.unlink(missing_ok=True)
for file_path in [build_bin / "syserr.txt", build_bin / "ErrorLog.txt"]:
file_path.unlink(missing_ok=True)
original_password_hash = run_mysql(
f"SELECT password FROM account WHERE login={sql_quote(args.account_login)} LIMIT 1;"
)
if not original_password_hash:
raise SystemExit(f"account not found or missing password hash: {args.account_login}")
run_mysql(
f"UPDATE account SET password=PASSWORD({sql_quote(temp_password)}) WHERE login={sql_quote(args.account_login)};"
)
backup_login_path, _cleanup = write_login_info(
login_path=login_path,
server_host=server_host,
auth_port=auth_port,
channel_port=channel_port,
account_login=args.account_login,
temp_password=temp_password,
slot=args.slot,
)
env = os.environ.copy()
env["M2PACK_MASTER_KEY_HEX"] = args.master_key.read_text(encoding="utf-8").strip()
env["M2PACK_KEY_ID"] = args.key_id
env["M2_TIMEOUT"] = str(args.timeout)
env["M2_HEADLESS_SCENARIO"] = "gm_teleport"
env["M2_HEADLESS_WARP_STEPS"] = serialize_steps(steps)
env["M2_HEADLESS_COMMAND_DELAY"] = str(args.command_delay)
env["M2_HEADLESS_SETTLE_DELAY"] = str(args.settle_delay)
env["M2_HEADLESS_WARP_TIMEOUT"] = str(args.warp_timeout)
env.setdefault("WINEDEBUG", "-all")
completed = subprocess.run(
[str(run_script), str(build_bin)],
cwd=client_repo,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
headless_trace_path = log_dir / "headless_gm_teleport_trace.txt"
headless_trace_text = headless_trace_path.read_text(encoding="utf-8", errors="ignore") if headless_trace_path.exists() else ""
headless_trace_lines = headless_trace_text.strip().splitlines() if headless_trace_text.strip() else []
prototype_trace = tail_lines(log_dir / "prototype_trace.txt")
system_trace = tail_lines(log_dir / "system_py_trace.txt")
syserr_tail = tail_lines(build_bin / "syserr.txt")
arrivals = {}
for index, step in enumerate(steps):
marker = f"Warp arrived index={index} map={step.name}"
arrivals[step.name] = marker in headless_trace_lines
success_marker = any(line.startswith("Scenario success current_map=") for line in headless_trace_lines)
timeout_marker = any(line.startswith("Warp timeout ") for line in headless_trace_lines)
mismatch_marker = any(line.startswith("Warp open mismatch ") for line in headless_trace_lines)
result = {
"ok": success_marker and all(arrivals.values()),
"returncode": completed.returncode,
"server_host": server_host,
"auth_port": auth_port,
"channel_port": channel_port,
"account_login": args.account_login,
"slot": args.slot,
"packs": packs,
"maps": [step.name for step in steps],
"steps": [
{
"map_name": step.name,
"global_x": step.global_x,
"global_y": step.global_y,
"meter_x": step.meter_x,
"meter_y": step.meter_y,
}
for step in steps
],
"markers": {
"arrivals": arrivals,
"success": success_marker,
"timeout": timeout_marker,
"mismatch": mismatch_marker,
},
"build_root_pack": build_result,
"headless_trace_tail": headless_trace_lines[-30:],
"prototype_tail": prototype_trace,
"system_tail": system_trace,
"syserr_tail": syserr_tail,
"stdout_tail": completed.stdout.strip().splitlines()[-10:] if completed.stdout.strip() else [],
"stderr_tail": completed.stderr.strip().splitlines()[-10:] if completed.stderr.strip() else [],
}
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"gm_teleport ok={result['ok']} returncode={completed.returncode} maps={','.join(result['maps'])}")
for line in result["headless_trace_tail"]:
print(f" {line}")
return 0 if result["ok"] else 1
finally:
cleanup_errors: list[str] = []
if original_character_state:
try:
time.sleep(1.0)
restore_character_state(original_character_state)
except Exception as exc:
cleanup_errors.append(f"restore character failed: {exc}")
if original_password_hash:
try:
run_mysql(
f"UPDATE account SET password={sql_quote(original_password_hash)} WHERE login={sql_quote(args.account_login)};"
)
except Exception as exc:
cleanup_errors.append(f"restore password failed: {exc}")
try:
restore_login_info(login_path, backup_login_path)
except Exception as exc:
cleanup_errors.append(f"restore loginInfo failed: {exc}")
try:
restore_moves(moved)
except Exception as exc:
cleanup_errors.append(f"restore pack moves failed: {exc}")
if cleanup_errors:
raise RuntimeError("; ".join(cleanup_errors))
if __name__ == "__main__":
sys.exit(main())