diff --git a/src/metin_release/commands/_m2pack_runner.py b/src/metin_release/commands/_m2pack_runner.py new file mode 100644 index 0000000..018d63a --- /dev/null +++ b/src/metin_release/commands/_m2pack_runner.py @@ -0,0 +1,78 @@ +"""Shared helper to invoke the real m2pack CLI and translate its JSON. + +m2pack-secure emits its own JSON envelopes with ``--json``. Their shape is +not identical to our :class:`~metin_release.result.Result` envelope, so we +wrap the raw dict under ``data["m2pack"]`` and promote a few well-known +fields (artifact paths, counts) where it makes sense per command. + +If m2pack exits non-zero or prints non-JSON, we raise +:class:`~metin_release.errors.SubprocessError` so the CLI exits with code +1 and a readable error envelope. +""" + +from __future__ import annotations + +import json +import subprocess +from pathlib import Path +from typing import Any + +from ..errors import SubprocessError +from ..log import get_logger +from ..m2pack_binary import resolve_m2pack_binary + + +def run_m2pack(subcommand: str, argv: list[str]) -> dict[str, Any]: + """Invoke ``m2pack [argv...] --json`` and return parsed JSON. + + Raises :class:`SubprocessError` (exit code 1) on any failure: binary + missing, non-zero exit, empty stdout, or non-JSON stdout. Missing + binary is handled inside :func:`resolve_m2pack_binary` by raising a + :class:`~metin_release.errors.ValidationError` which the dispatcher + already converts into the standard error envelope. + """ + log = get_logger() + binary: Path = resolve_m2pack_binary() + cmd = [str(binary), subcommand, *argv, "--json"] + log.debug("running m2pack: %s", " ".join(cmd)) + try: + proc = subprocess.run( + cmd, capture_output=True, text=True, check=False + ) + except OSError as exc: + raise SubprocessError( + f"failed to spawn m2pack: {exc}", + error_code="m2pack_spawn_failed", + ) from exc + + stdout = (proc.stdout or "").strip() + stderr = (proc.stderr or "").strip() + + if proc.returncode != 0: + detail = stderr or stdout or f"exit code {proc.returncode}" + raise SubprocessError( + f"m2pack {subcommand} failed (rc={proc.returncode}): {detail}", + error_code="m2pack_failed", + ) + + if not stdout: + raise SubprocessError( + f"m2pack {subcommand} produced no stdout; stderr={stderr!r}", + error_code="m2pack_empty_output", + ) + + try: + parsed = json.loads(stdout) + except json.JSONDecodeError as exc: + raise SubprocessError( + f"m2pack {subcommand} returned non-JSON output: {exc}; " + f"first 200 chars={stdout[:200]!r}", + error_code="m2pack_invalid_json", + ) from exc + + if not isinstance(parsed, dict): + raise SubprocessError( + f"m2pack {subcommand} JSON was not an object: {type(parsed).__name__}", + error_code="m2pack_invalid_json", + ) + return parsed diff --git a/src/metin_release/commands/m2pack_build.py b/src/metin_release/commands/m2pack_build.py new file mode 100644 index 0000000..9eb28d5 --- /dev/null +++ b/src/metin_release/commands/m2pack_build.py @@ -0,0 +1,47 @@ +"""m2pack build: wrap ``m2pack build`` to produce a signed .m2p archive.""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +from ..result import Result +from ._m2pack_runner import run_m2pack + + +def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser: + p = sub.add_parser("build", help="Build a signed .m2p archive from a source directory.") + p.add_argument("--input", required=True, type=Path, help="Client asset source directory.") + p.add_argument("--output", required=True, type=Path, help="Output .m2p archive path.") + p.add_argument("--key", required=True, type=Path, help="Master content key file.") + p.add_argument( + "--sign-secret-key", + required=True, + type=Path, + help="Ed25519 signing secret key file.", + ) + p.add_argument("--key-id", type=int, help="Content key id (default 1).") + return p + + +def run(ctx, args: argparse.Namespace) -> Result: + argv = [ + "--input", str(args.input), + "--output", str(args.output), + "--key", str(args.key), + "--sign-secret-key", str(args.sign_secret_key), + ] + if args.key_id is not None: + argv.extend(["--key-id", str(args.key_id)]) + + raw = run_m2pack("build", argv) + status = "built" if raw.get("ok", True) else "failed" + return Result( + command="m2pack build", + ok=bool(raw.get("ok", True)), + status=status, + data={ + "artifacts": {"archive_path": str(args.output)}, + "m2pack": raw, + }, + ) diff --git a/src/metin_release/commands/m2pack_verify.py b/src/metin_release/commands/m2pack_verify.py new file mode 100644 index 0000000..c34d11c --- /dev/null +++ b/src/metin_release/commands/m2pack_verify.py @@ -0,0 +1,45 @@ +"""m2pack verify: wrap ``m2pack verify`` to validate manifest + optional decrypt.""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +from ..result import Result +from ._m2pack_runner import run_m2pack + + +def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser: + p = sub.add_parser("verify", help="Verify an .m2p archive (manifest + signature).") + p.add_argument("--archive", required=True, type=Path, help="Path to .m2p archive.") + p.add_argument( + "--public-key", + type=Path, + help="Ed25519 public key file (for signature verification).", + ) + p.add_argument( + "--key", + type=Path, + help="Master content key file (enables full-decrypt verification).", + ) + return p + + +def run(ctx, args: argparse.Namespace) -> Result: + argv = ["--archive", str(args.archive)] + if args.public_key is not None: + argv.extend(["--public-key", str(args.public_key)]) + if args.key is not None: + argv.extend(["--key", str(args.key)]) + + raw = run_m2pack("verify", argv) + ok = bool(raw.get("ok", True)) + return Result( + command="m2pack verify", + ok=ok, + status="verified" if ok else "failed", + data={ + "artifacts": {"archive_path": str(args.archive)}, + "m2pack": raw, + }, + )