cli: implement m2pack build and verify wrappers
Shell out to m2pack-secure with --json, parse its envelope, and translate into the standard metin-release Result envelope under data.m2pack. Non-zero exit and non-JSON output map to SubprocessError with m2pack_failed / m2pack_invalid_json / m2pack_empty_output codes.
This commit is contained in:
78
src/metin_release/commands/_m2pack_runner.py
Normal file
78
src/metin_release/commands/_m2pack_runner.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""Shared helper to invoke the real m2pack CLI and translate its JSON.
|
||||
|
||||
m2pack-secure emits its own JSON envelopes with ``--json``. Their shape is
|
||||
not identical to our :class:`~metin_release.result.Result` envelope, so we
|
||||
wrap the raw dict under ``data["m2pack"]`` and promote a few well-known
|
||||
fields (artifact paths, counts) where it makes sense per command.
|
||||
|
||||
If m2pack exits non-zero or prints non-JSON, we raise
|
||||
:class:`~metin_release.errors.SubprocessError` so the CLI exits with code
|
||||
1 and a readable error envelope.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from ..errors import SubprocessError
|
||||
from ..log import get_logger
|
||||
from ..m2pack_binary import resolve_m2pack_binary
|
||||
|
||||
|
||||
def run_m2pack(subcommand: str, argv: list[str]) -> dict[str, Any]:
|
||||
"""Invoke ``m2pack <subcommand> [argv...] --json`` and return parsed JSON.
|
||||
|
||||
Raises :class:`SubprocessError` (exit code 1) on any failure: binary
|
||||
missing, non-zero exit, empty stdout, or non-JSON stdout. Missing
|
||||
binary is handled inside :func:`resolve_m2pack_binary` by raising a
|
||||
:class:`~metin_release.errors.ValidationError` which the dispatcher
|
||||
already converts into the standard error envelope.
|
||||
"""
|
||||
log = get_logger()
|
||||
binary: Path = resolve_m2pack_binary()
|
||||
cmd = [str(binary), subcommand, *argv, "--json"]
|
||||
log.debug("running m2pack: %s", " ".join(cmd))
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
cmd, capture_output=True, text=True, check=False
|
||||
)
|
||||
except OSError as exc:
|
||||
raise SubprocessError(
|
||||
f"failed to spawn m2pack: {exc}",
|
||||
error_code="m2pack_spawn_failed",
|
||||
) from exc
|
||||
|
||||
stdout = (proc.stdout or "").strip()
|
||||
stderr = (proc.stderr or "").strip()
|
||||
|
||||
if proc.returncode != 0:
|
||||
detail = stderr or stdout or f"exit code {proc.returncode}"
|
||||
raise SubprocessError(
|
||||
f"m2pack {subcommand} failed (rc={proc.returncode}): {detail}",
|
||||
error_code="m2pack_failed",
|
||||
)
|
||||
|
||||
if not stdout:
|
||||
raise SubprocessError(
|
||||
f"m2pack {subcommand} produced no stdout; stderr={stderr!r}",
|
||||
error_code="m2pack_empty_output",
|
||||
)
|
||||
|
||||
try:
|
||||
parsed = json.loads(stdout)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise SubprocessError(
|
||||
f"m2pack {subcommand} returned non-JSON output: {exc}; "
|
||||
f"first 200 chars={stdout[:200]!r}",
|
||||
error_code="m2pack_invalid_json",
|
||||
) from exc
|
||||
|
||||
if not isinstance(parsed, dict):
|
||||
raise SubprocessError(
|
||||
f"m2pack {subcommand} JSON was not an object: {type(parsed).__name__}",
|
||||
error_code="m2pack_invalid_json",
|
||||
)
|
||||
return parsed
|
||||
47
src/metin_release/commands/m2pack_build.py
Normal file
47
src/metin_release/commands/m2pack_build.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""m2pack build: wrap ``m2pack build`` to produce a signed .m2p archive."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from ..result import Result
|
||||
from ._m2pack_runner import run_m2pack
|
||||
|
||||
|
||||
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
p = sub.add_parser("build", help="Build a signed .m2p archive from a source directory.")
|
||||
p.add_argument("--input", required=True, type=Path, help="Client asset source directory.")
|
||||
p.add_argument("--output", required=True, type=Path, help="Output .m2p archive path.")
|
||||
p.add_argument("--key", required=True, type=Path, help="Master content key file.")
|
||||
p.add_argument(
|
||||
"--sign-secret-key",
|
||||
required=True,
|
||||
type=Path,
|
||||
help="Ed25519 signing secret key file.",
|
||||
)
|
||||
p.add_argument("--key-id", type=int, help="Content key id (default 1).")
|
||||
return p
|
||||
|
||||
|
||||
def run(ctx, args: argparse.Namespace) -> Result:
|
||||
argv = [
|
||||
"--input", str(args.input),
|
||||
"--output", str(args.output),
|
||||
"--key", str(args.key),
|
||||
"--sign-secret-key", str(args.sign_secret_key),
|
||||
]
|
||||
if args.key_id is not None:
|
||||
argv.extend(["--key-id", str(args.key_id)])
|
||||
|
||||
raw = run_m2pack("build", argv)
|
||||
status = "built" if raw.get("ok", True) else "failed"
|
||||
return Result(
|
||||
command="m2pack build",
|
||||
ok=bool(raw.get("ok", True)),
|
||||
status=status,
|
||||
data={
|
||||
"artifacts": {"archive_path": str(args.output)},
|
||||
"m2pack": raw,
|
||||
},
|
||||
)
|
||||
45
src/metin_release/commands/m2pack_verify.py
Normal file
45
src/metin_release/commands/m2pack_verify.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""m2pack verify: wrap ``m2pack verify`` to validate manifest + optional decrypt."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from ..result import Result
|
||||
from ._m2pack_runner import run_m2pack
|
||||
|
||||
|
||||
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
|
||||
p = sub.add_parser("verify", help="Verify an .m2p archive (manifest + signature).")
|
||||
p.add_argument("--archive", required=True, type=Path, help="Path to .m2p archive.")
|
||||
p.add_argument(
|
||||
"--public-key",
|
||||
type=Path,
|
||||
help="Ed25519 public key file (for signature verification).",
|
||||
)
|
||||
p.add_argument(
|
||||
"--key",
|
||||
type=Path,
|
||||
help="Master content key file (enables full-decrypt verification).",
|
||||
)
|
||||
return p
|
||||
|
||||
|
||||
def run(ctx, args: argparse.Namespace) -> Result:
|
||||
argv = ["--archive", str(args.archive)]
|
||||
if args.public_key is not None:
|
||||
argv.extend(["--public-key", str(args.public_key)])
|
||||
if args.key is not None:
|
||||
argv.extend(["--key", str(args.key)])
|
||||
|
||||
raw = run_m2pack("verify", argv)
|
||||
ok = bool(raw.get("ok", True))
|
||||
return Result(
|
||||
command="m2pack verify",
|
||||
ok=ok,
|
||||
status="verified" if ok else "failed",
|
||||
data={
|
||||
"artifacts": {"archive_path": str(args.archive)},
|
||||
"m2pack": raw,
|
||||
},
|
||||
)
|
||||
Reference in New Issue
Block a user