576 lines
20 KiB
Python
576 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import secrets
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_BINARY = REPO_ROOT / "build" / "m2pack"
|
|
TERRAIN_SIZE = 25600
|
|
PASS_MAX_NUM = 16
|
|
DEFAULT_ACCOUNT_LOGIN = "admin"
|
|
DEFAULT_MAPS = [
|
|
"metin2_map_a1",
|
|
"metin2_map_a3",
|
|
"metin2_map_trent",
|
|
"metin2_map_trent02",
|
|
]
|
|
DEFAULT_PACKS = [
|
|
"root",
|
|
"patch1",
|
|
"patch2",
|
|
"season3_eu",
|
|
"metin2_patch_snow",
|
|
"metin2_patch_snow_dungeon",
|
|
"metin2_patch_etc_costume1",
|
|
"metin2_patch_pet1",
|
|
"metin2_patch_pet2",
|
|
"metin2_patch_ramadan_costume",
|
|
"metin2_patch_flame",
|
|
"metin2_patch_flame_dungeon",
|
|
"locale",
|
|
"uiscript",
|
|
"uiloading",
|
|
"ETC",
|
|
"item",
|
|
"effect",
|
|
"icon",
|
|
"property",
|
|
"terrain",
|
|
"tree",
|
|
"zone",
|
|
"outdoora1",
|
|
"outdoora2",
|
|
"outdoorb1",
|
|
"outdoorc1",
|
|
"outdoorsnow1",
|
|
"pc",
|
|
"pc2",
|
|
"guild",
|
|
"npc",
|
|
"monster2",
|
|
"sound",
|
|
"sound_m",
|
|
"sound2",
|
|
"monster",
|
|
"npc2",
|
|
"textureset",
|
|
"outdoora3",
|
|
"outdoorb3",
|
|
"outdoorc3",
|
|
"outdoordesert1",
|
|
"outdoorflame1",
|
|
"outdoorfielddungeon1",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class AtlasEntry:
|
|
name: str
|
|
base_x: int
|
|
base_y: int
|
|
width: int
|
|
height: int
|
|
|
|
@property
|
|
def center_x(self) -> int:
|
|
return self.base_x + (self.width * TERRAIN_SIZE) // 2
|
|
|
|
@property
|
|
def center_y(self) -> int:
|
|
return self.base_y + (self.height * TERRAIN_SIZE) // 2
|
|
|
|
|
|
@dataclass
|
|
class MapStep:
|
|
name: str
|
|
global_x: int
|
|
global_y: int
|
|
|
|
@property
|
|
def meter_x(self) -> int:
|
|
return self.global_x // 100
|
|
|
|
@property
|
|
def meter_y(self) -> int:
|
|
return self.global_y // 100
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Run a headless Wine GM teleport smoke scenario with real login and map warps."
|
|
)
|
|
parser.add_argument("--runtime-root", type=Path, required=True, help="Client runtime root containing assets/, config/, and pack/.")
|
|
parser.add_argument("--client-repo", type=Path, required=True, help="Path to m2dev-client-src checkout.")
|
|
parser.add_argument("--content-repo", type=Path, required=True, help="Path to m2dev-client asset checkout used to rebuild root.m2p.")
|
|
parser.add_argument("--master-key", type=Path, required=True, help="Path to m2pack runtime master key file.")
|
|
parser.add_argument("--sign-secret-key", type=Path, required=True, help="Path to m2pack signing secret key used to rebuild root.m2p.")
|
|
parser.add_argument("--key-id", type=str, default="1", help="Runtime key_id to inject into the client and archive build.")
|
|
parser.add_argument("--timeout", type=int, default=90, help="Wine run timeout in seconds.")
|
|
parser.add_argument("--server-host", type=str, default=None, help="Server host override. Defaults to runtime assets/root/serverinfo.py.")
|
|
parser.add_argument("--auth-port", type=int, default=None, help="Auth port override. Defaults to runtime assets/root/serverinfo.py.")
|
|
parser.add_argument("--channel-port", type=int, default=None, help="Channel port override. Defaults to runtime assets/root/serverinfo.py.")
|
|
parser.add_argument("--account-login", type=str, default=DEFAULT_ACCOUNT_LOGIN, help="Login name for the GM account.")
|
|
parser.add_argument("--slot", type=int, default=0, help="Character slot to auto-select.")
|
|
parser.add_argument("--command-delay", type=float, default=5.0, help="Delay before sending the first GM command after entering game.")
|
|
parser.add_argument("--settle-delay", type=float, default=2.0, help="Delay between successful warp arrival and the next warp command.")
|
|
parser.add_argument("--warp-timeout", type=float, default=20.0, help="Timeout per warp command in seconds.")
|
|
parser.add_argument("--map", dest="maps", action="append", default=[], help="Atlas map name to visit. Repeat for multiple maps.")
|
|
parser.add_argument("--pack", dest="packs", action="append", default=[], help="Pack basename to activate as .m2p. Repeat for multiple packs.")
|
|
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON result.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_binary() -> Path:
|
|
env_binary = os.environ.get("M2PACK_BINARY")
|
|
if env_binary:
|
|
binary = Path(env_binary)
|
|
else:
|
|
binary = DEFAULT_BINARY
|
|
|
|
if not binary.is_file():
|
|
raise SystemExit(f"m2pack binary not found: {binary}")
|
|
return binary
|
|
|
|
|
|
def tail_lines(path: Path, count: int = 10) -> list[str]:
|
|
if not path.exists():
|
|
return []
|
|
return path.read_text(encoding="utf-8", errors="ignore").strip().splitlines()[-count:]
|
|
|
|
|
|
def sql_quote(value: str) -> str:
|
|
return "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'"
|
|
|
|
|
|
def run_mysql(sql: str, database: str = "account") -> str:
|
|
completed = subprocess.run(
|
|
["mysql", "-B", "-N", database, "-e", sql],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
if completed.returncode != 0:
|
|
detail = completed.stderr.strip() or completed.stdout.strip()
|
|
raise RuntimeError(f"mysql failed: {detail}")
|
|
return completed.stdout.strip()
|
|
|
|
|
|
def load_character_state(account_login: str, slot: int) -> dict[str, object] | None:
|
|
if slot < 0 or slot > 3:
|
|
raise SystemExit(f"unsupported character slot: {slot}")
|
|
|
|
account_id = run_mysql(
|
|
f"SELECT id FROM account WHERE login={sql_quote(account_login)} LIMIT 1;"
|
|
)
|
|
if not account_id:
|
|
return None
|
|
|
|
pid = run_mysql(
|
|
f"SELECT pid{slot + 1} FROM player_index WHERE id={account_id} LIMIT 1;",
|
|
database="player",
|
|
)
|
|
if not pid or pid == "0":
|
|
return None
|
|
|
|
row = run_mysql(
|
|
f"SELECT id, name, map_index, x, y FROM player WHERE id={pid} LIMIT 1;",
|
|
database="player",
|
|
)
|
|
if not row:
|
|
return None
|
|
|
|
parts = row.split("\t")
|
|
if len(parts) != 5:
|
|
return None
|
|
|
|
return {
|
|
"id": int(parts[0]),
|
|
"name": parts[1],
|
|
"map_index": int(parts[2]),
|
|
"x": int(parts[3]),
|
|
"y": int(parts[4]),
|
|
}
|
|
|
|
|
|
def restore_character_state(state: dict[str, object]) -> None:
|
|
run_mysql(
|
|
"UPDATE player SET map_index=%d, x=%d, y=%d WHERE id=%d;"
|
|
% (state["map_index"], state["x"], state["y"], state["id"]),
|
|
database="player",
|
|
)
|
|
|
|
|
|
def activate_pack(pack_dir: Path, pack_name: str, moved: list[tuple[Path, Path]]) -> None:
|
|
offsingle = pack_dir / f"{pack_name}.m2p.offsingle"
|
|
m2p = pack_dir / f"{pack_name}.m2p"
|
|
legacy = pack_dir / f"{pack_name}.pck"
|
|
legacy_backup = pack_dir / f"{pack_name}.pck.testbak"
|
|
|
|
if offsingle.exists():
|
|
offsingle.rename(m2p)
|
|
moved.append((m2p, offsingle))
|
|
|
|
if legacy.exists():
|
|
legacy.rename(legacy_backup)
|
|
moved.append((legacy_backup, legacy))
|
|
|
|
|
|
def restore_moves(moved: list[tuple[Path, Path]]) -> None:
|
|
for src, dst in reversed(moved):
|
|
if src.exists():
|
|
src.rename(dst)
|
|
|
|
|
|
def load_server_defaults(runtime_root: Path) -> tuple[str, int, int]:
|
|
server_info_path = runtime_root / "assets" / "root" / "serverinfo.py"
|
|
defaults = {
|
|
"SERVER_IP": "173.249.9.66",
|
|
"PORT_AUTH": 11000,
|
|
"PORT_CH1": 11011,
|
|
}
|
|
|
|
if not server_info_path.is_file():
|
|
return defaults["SERVER_IP"], defaults["PORT_AUTH"], defaults["PORT_CH1"]
|
|
|
|
namespace: dict[str, object] = {}
|
|
exec(compile(server_info_path.read_text(encoding="utf-8", errors="ignore"), str(server_info_path), "exec"), namespace)
|
|
host = str(namespace.get("SERVER_IP", defaults["SERVER_IP"]))
|
|
auth_port = int(namespace.get("PORT_AUTH", defaults["PORT_AUTH"]))
|
|
channel_port = int(namespace.get("PORT_CH1", defaults["PORT_CH1"]))
|
|
return host, auth_port, channel_port
|
|
|
|
|
|
def load_atlas_entry(runtime_root: Path, map_name: str) -> AtlasEntry:
|
|
atlas_path = runtime_root / "assets" / "root" / "atlasinfo.txt"
|
|
if not atlas_path.is_file():
|
|
raise SystemExit(f"atlasinfo not found: {atlas_path}")
|
|
|
|
for raw_line in atlas_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
parts = raw_line.split()
|
|
if len(parts) < 5:
|
|
continue
|
|
name, base_x, base_y, width, height = parts[:5]
|
|
if name != map_name:
|
|
continue
|
|
try:
|
|
return AtlasEntry(name=name, base_x=int(base_x), base_y=int(base_y), width=int(width), height=int(height))
|
|
except ValueError as exc:
|
|
raise SystemExit(f"invalid atlas entry for {map_name}: {raw_line}") from exc
|
|
|
|
raise SystemExit(f"map not found in atlasinfo.txt: {map_name}")
|
|
|
|
|
|
def load_steps(runtime_root: Path, map_names: list[str]) -> list[MapStep]:
|
|
steps: list[MapStep] = []
|
|
for map_name in map_names:
|
|
atlas = load_atlas_entry(runtime_root, map_name)
|
|
steps.append(MapStep(name=map_name, global_x=atlas.center_x, global_y=atlas.center_y))
|
|
return steps
|
|
|
|
|
|
def serialize_steps(steps: list[MapStep]) -> str:
|
|
return "|".join(f"{step.name},{step.global_x},{step.global_y}" for step in steps)
|
|
|
|
|
|
def rebuild_root_pack(
|
|
binary: Path,
|
|
content_repo: Path,
|
|
pack_dir: Path,
|
|
master_key: Path,
|
|
sign_secret_key: Path,
|
|
key_id: str,
|
|
) -> dict[str, object]:
|
|
source_root = content_repo / "assets" / "root"
|
|
target_root = pack_dir / "root.m2p"
|
|
|
|
if not source_root.is_dir():
|
|
raise SystemExit(f"content root assets not found: {source_root}")
|
|
|
|
fd, tmp_name = tempfile.mkstemp(prefix="root-gm-teleport-", suffix=".m2p", dir=str(pack_dir))
|
|
os.close(fd)
|
|
tmp_archive = Path(tmp_name)
|
|
|
|
try:
|
|
completed = subprocess.run(
|
|
[
|
|
str(binary),
|
|
"build",
|
|
"--input",
|
|
str(source_root),
|
|
"--output",
|
|
str(tmp_archive),
|
|
"--key",
|
|
str(master_key),
|
|
"--sign-secret-key",
|
|
str(sign_secret_key),
|
|
"--key-id",
|
|
str(key_id),
|
|
"--json",
|
|
],
|
|
cwd=REPO_ROOT,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
if completed.returncode != 0:
|
|
detail = completed.stderr.strip() or completed.stdout.strip()
|
|
raise RuntimeError(f"m2pack build failed: {detail}")
|
|
|
|
build_json = None
|
|
stdout_text = completed.stdout.strip()
|
|
if stdout_text:
|
|
try:
|
|
build_json = json.loads(stdout_text)
|
|
except json.JSONDecodeError:
|
|
build_json = None
|
|
|
|
tmp_archive.replace(target_root)
|
|
return {
|
|
"ok": True,
|
|
"stdout_tail": stdout_text.splitlines()[-10:] if stdout_text else [],
|
|
"stderr_tail": completed.stderr.strip().splitlines()[-10:] if completed.stderr.strip() else [],
|
|
"json": build_json,
|
|
}
|
|
finally:
|
|
tmp_archive.unlink(missing_ok=True)
|
|
|
|
|
|
def write_login_info(
|
|
login_path: Path,
|
|
server_host: str,
|
|
auth_port: int,
|
|
channel_port: int,
|
|
account_login: str,
|
|
temp_password: str,
|
|
slot: int,
|
|
) -> tuple[Path | None, list[str]]:
|
|
backup_path = None
|
|
cleanup: list[str] = []
|
|
|
|
if login_path.exists():
|
|
backup_path = login_path.with_name(login_path.name + ".gmteleportbak")
|
|
login_path.rename(backup_path)
|
|
cleanup.append(str(backup_path))
|
|
|
|
login_payload = "\n".join(
|
|
[
|
|
f"addr={server_host!r}",
|
|
f"port={channel_port}",
|
|
f"account_addr={server_host!r}",
|
|
f"account_port={auth_port}",
|
|
f"id={account_login!r}",
|
|
f"pwd={temp_password!r}",
|
|
f"slot={slot}",
|
|
"autoLogin=1",
|
|
"autoSelect=1",
|
|
"",
|
|
]
|
|
)
|
|
login_path.write_text(login_payload, encoding="utf-8")
|
|
return backup_path, cleanup
|
|
|
|
|
|
def restore_login_info(login_path: Path, backup_path: Path | None) -> None:
|
|
login_path.unlink(missing_ok=True)
|
|
if backup_path and backup_path.exists():
|
|
backup_path.rename(login_path)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
|
|
runtime_root = args.runtime_root.resolve()
|
|
client_repo = args.client_repo.resolve()
|
|
content_repo = args.content_repo.resolve()
|
|
pack_dir = runtime_root / "pack"
|
|
run_script = client_repo / "scripts" / "run-wine-headless.sh"
|
|
build_bin = client_repo / "build-mingw64-lld" / "bin"
|
|
log_dir = build_bin / "log"
|
|
binary = resolve_binary()
|
|
|
|
if not pack_dir.is_dir():
|
|
raise SystemExit(f"runtime pack dir not found: {pack_dir}")
|
|
if not run_script.is_file():
|
|
raise SystemExit(f"wine runner not found: {run_script}")
|
|
if not (build_bin / "Metin2_RelWithDebInfo.exe").is_file():
|
|
raise SystemExit(f"client binary not found: {build_bin / 'Metin2_RelWithDebInfo.exe'}")
|
|
|
|
default_host, default_auth_port, default_channel_port = load_server_defaults(runtime_root)
|
|
server_host = args.server_host or default_host
|
|
auth_port = args.auth_port or default_auth_port
|
|
channel_port = args.channel_port or default_channel_port
|
|
map_names = args.maps or list(DEFAULT_MAPS)
|
|
packs = args.packs or list(DEFAULT_PACKS)
|
|
steps = load_steps(runtime_root, map_names)
|
|
|
|
temp_password = ("Tmp" + secrets.token_hex(6))[:PASS_MAX_NUM]
|
|
login_path = build_bin / "loginInfo.py"
|
|
backup_login_path: Path | None = None
|
|
moved: list[tuple[Path, Path]] = []
|
|
original_password_hash = ""
|
|
build_result: dict[str, object] | None = None
|
|
original_character_state = load_character_state(args.account_login, args.slot)
|
|
|
|
try:
|
|
build_result = rebuild_root_pack(
|
|
binary=binary,
|
|
content_repo=content_repo,
|
|
pack_dir=pack_dir,
|
|
master_key=args.master_key.resolve(),
|
|
sign_secret_key=args.sign_secret_key.resolve(),
|
|
key_id=args.key_id,
|
|
)
|
|
|
|
for pack_name in packs:
|
|
activate_pack(pack_dir, pack_name, moved)
|
|
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
for file_path in log_dir.glob("*.txt"):
|
|
file_path.unlink(missing_ok=True)
|
|
for file_path in [build_bin / "syserr.txt", build_bin / "ErrorLog.txt"]:
|
|
file_path.unlink(missing_ok=True)
|
|
|
|
original_password_hash = run_mysql(
|
|
f"SELECT password FROM account WHERE login={sql_quote(args.account_login)} LIMIT 1;"
|
|
)
|
|
if not original_password_hash:
|
|
raise SystemExit(f"account not found or missing password hash: {args.account_login}")
|
|
|
|
run_mysql(
|
|
f"UPDATE account SET password=PASSWORD({sql_quote(temp_password)}) WHERE login={sql_quote(args.account_login)};"
|
|
)
|
|
|
|
backup_login_path, _cleanup = write_login_info(
|
|
login_path=login_path,
|
|
server_host=server_host,
|
|
auth_port=auth_port,
|
|
channel_port=channel_port,
|
|
account_login=args.account_login,
|
|
temp_password=temp_password,
|
|
slot=args.slot,
|
|
)
|
|
|
|
env = os.environ.copy()
|
|
env["M2PACK_MASTER_KEY_HEX"] = args.master_key.read_text(encoding="utf-8").strip()
|
|
env["M2PACK_KEY_ID"] = args.key_id
|
|
env["M2_TIMEOUT"] = str(args.timeout)
|
|
env["M2_HEADLESS_SCENARIO"] = "gm_teleport"
|
|
env["M2_HEADLESS_WARP_STEPS"] = serialize_steps(steps)
|
|
env["M2_HEADLESS_COMMAND_DELAY"] = str(args.command_delay)
|
|
env["M2_HEADLESS_SETTLE_DELAY"] = str(args.settle_delay)
|
|
env["M2_HEADLESS_WARP_TIMEOUT"] = str(args.warp_timeout)
|
|
env.setdefault("WINEDEBUG", "-all")
|
|
|
|
completed = subprocess.run(
|
|
[str(run_script), str(build_bin)],
|
|
cwd=client_repo,
|
|
env=env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
headless_trace_path = log_dir / "headless_gm_teleport_trace.txt"
|
|
headless_trace_text = headless_trace_path.read_text(encoding="utf-8", errors="ignore") if headless_trace_path.exists() else ""
|
|
headless_trace_lines = headless_trace_text.strip().splitlines() if headless_trace_text.strip() else []
|
|
prototype_trace = tail_lines(log_dir / "prototype_trace.txt")
|
|
system_trace = tail_lines(log_dir / "system_py_trace.txt")
|
|
syserr_tail = tail_lines(build_bin / "syserr.txt")
|
|
|
|
arrivals = {}
|
|
for index, step in enumerate(steps):
|
|
marker = f"Warp arrived index={index} map={step.name}"
|
|
arrivals[step.name] = marker in headless_trace_lines
|
|
|
|
success_marker = any(line.startswith("Scenario success current_map=") for line in headless_trace_lines)
|
|
timeout_marker = any(line.startswith("Warp timeout ") for line in headless_trace_lines)
|
|
mismatch_marker = any(line.startswith("Warp open mismatch ") for line in headless_trace_lines)
|
|
|
|
result = {
|
|
"ok": success_marker and all(arrivals.values()),
|
|
"returncode": completed.returncode,
|
|
"server_host": server_host,
|
|
"auth_port": auth_port,
|
|
"channel_port": channel_port,
|
|
"account_login": args.account_login,
|
|
"slot": args.slot,
|
|
"packs": packs,
|
|
"maps": [step.name for step in steps],
|
|
"steps": [
|
|
{
|
|
"map_name": step.name,
|
|
"global_x": step.global_x,
|
|
"global_y": step.global_y,
|
|
"meter_x": step.meter_x,
|
|
"meter_y": step.meter_y,
|
|
}
|
|
for step in steps
|
|
],
|
|
"markers": {
|
|
"arrivals": arrivals,
|
|
"success": success_marker,
|
|
"timeout": timeout_marker,
|
|
"mismatch": mismatch_marker,
|
|
},
|
|
"build_root_pack": build_result,
|
|
"headless_trace_tail": headless_trace_lines[-30:],
|
|
"prototype_tail": prototype_trace,
|
|
"system_tail": system_trace,
|
|
"syserr_tail": syserr_tail,
|
|
"stdout_tail": completed.stdout.strip().splitlines()[-10:] if completed.stdout.strip() else [],
|
|
"stderr_tail": completed.stderr.strip().splitlines()[-10:] if completed.stderr.strip() else [],
|
|
}
|
|
|
|
if args.json:
|
|
print(json.dumps(result, indent=2))
|
|
else:
|
|
print(f"gm_teleport ok={result['ok']} returncode={completed.returncode} maps={','.join(result['maps'])}")
|
|
for line in result["headless_trace_tail"]:
|
|
print(f" {line}")
|
|
|
|
return 0 if result["ok"] else 1
|
|
finally:
|
|
cleanup_errors: list[str] = []
|
|
|
|
if original_character_state:
|
|
try:
|
|
time.sleep(1.0)
|
|
restore_character_state(original_character_state)
|
|
except Exception as exc:
|
|
cleanup_errors.append(f"restore character failed: {exc}")
|
|
|
|
if original_password_hash:
|
|
try:
|
|
run_mysql(
|
|
f"UPDATE account SET password={sql_quote(original_password_hash)} WHERE login={sql_quote(args.account_login)};"
|
|
)
|
|
except Exception as exc:
|
|
cleanup_errors.append(f"restore password failed: {exc}")
|
|
|
|
try:
|
|
restore_login_info(login_path, backup_login_path)
|
|
except Exception as exc:
|
|
cleanup_errors.append(f"restore loginInfo failed: {exc}")
|
|
|
|
try:
|
|
restore_moves(moved)
|
|
except Exception as exc:
|
|
cleanup_errors.append(f"restore pack moves failed: {exc}")
|
|
|
|
if cleanup_errors:
|
|
raise RuntimeError("; ".join(cleanup_errors))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|