#!/usr/bin/env python3 from __future__ import annotations import json import os import shutil import subprocess from pathlib import Path from typing import Any from mcp.server.fastmcp import FastMCP REPO_ROOT = Path(__file__).resolve().parent DEFAULT_BINARY = REPO_ROOT / "build" / "m2pack" ENV_BINARY = "M2PACK_BINARY" mcp = FastMCP("m2pack-secure") def _resolve_binary() -> Path: env_value = os.environ.get(ENV_BINARY) if env_value: candidate = Path(env_value).expanduser() if candidate.is_file(): return candidate if DEFAULT_BINARY.is_file(): return DEFAULT_BINARY which = shutil.which("m2pack") if which: return Path(which) raise FileNotFoundError( f"m2pack binary not found. Build {DEFAULT_BINARY} or set {ENV_BINARY}." ) def _run_cli(*args: str) -> dict[str, Any]: binary = _resolve_binary() cmd = [str(binary), *args, "--json"] proc = subprocess.run( cmd, cwd=REPO_ROOT, capture_output=True, text=True, check=False, ) if proc.returncode != 0: detail = proc.stderr.strip() or proc.stdout.strip() or f"exit code {proc.returncode}" raise RuntimeError(f"m2pack failed: {detail}") stdout = proc.stdout.strip() if not stdout: return {"ok": True} try: return json.loads(stdout) except json.JSONDecodeError as exc: raise RuntimeError(f"m2pack returned non-JSON output: {stdout}") from exc @mcp.tool() def pack_keygen(out_dir: str) -> dict[str, Any]: """Generate a new master content key and Ed25519 signing keypair.""" return _run_cli("keygen", "--out-dir", out_dir) @mcp.tool() def pack_build( input_dir: str, output_archive: str, key_file: str, signing_secret_key_file: str, key_id: int = 1, ) -> dict[str, Any]: """Build an .m2p archive from a source directory.""" return _run_cli( "build", "--input", input_dir, "--output", output_archive, "--key", key_file, "--sign-secret-key", signing_secret_key_file, "--key-id", str(key_id), ) @mcp.tool() def pack_diff(left: str, right: str) -> dict[str, Any]: """Diff two directories and/or .m2p archives using normalized paths and plaintext hashes.""" return _run_cli("diff", "--left", left, "--right", right) @mcp.tool() def pack_list(archive: str) -> dict[str, Any]: """List entries in an .m2p archive.""" return _run_cli("list", "--archive", archive) @mcp.tool() def pack_verify( archive: str, public_key_file: str | None = None, key_file: str | None = None, ) -> dict[str, Any]: """Verify archive manifest integrity and optionally decrypt all entries.""" args = ["verify", "--archive", archive] if public_key_file: args.extend(["--public-key", public_key_file]) if key_file: args.extend(["--key", key_file]) return _run_cli(*args) @mcp.tool() def pack_extract( archive: str, output_dir: str, key_file: str, ) -> dict[str, Any]: """Extract an .m2p archive into a directory.""" return _run_cli( "extract", "--archive", archive, "--output", output_dir, "--key", key_file, ) @mcp.tool() def pack_export_client_config( key_file: str, public_key_file: str, output_header: str, key_id: int = 1, ) -> dict[str, Any]: """Generate M2PackKeys.h for the Windows client tree.""" return _run_cli( "export-client-config", "--key", key_file, "--public-key", public_key_file, "--key-id", str(key_id), "--output", output_header, ) @mcp.tool() def pack_export_runtime_key( key_file: str, public_key_file: str, output_file: str, key_id: int = 1, format: str = "json", ) -> dict[str, Any]: """Generate a launcher runtime key payload in json or blob form.""" return _run_cli( "export-runtime-key", "--key", key_file, "--public-key", public_key_file, "--key-id", str(key_id), "--format", format, "--output", output_file, ) @mcp.tool() def pack_binary_info() -> dict[str, Any]: """Report which m2pack binary the server will execute.""" binary = _resolve_binary() return { "ok": True, "binary": str(binary), "repo_root": str(REPO_ROOT), } def main() -> None: mcp.run() if __name__ == "__main__": main()