172 lines
3.9 KiB
Python
172 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent
|
|
DEFAULT_BINARY = REPO_ROOT / "build" / "m2pack"
|
|
ENV_BINARY = "M2PACK_BINARY"
|
|
|
|
mcp = FastMCP("m2pack-secure")
|
|
|
|
|
|
def _resolve_binary() -> Path:
|
|
env_value = os.environ.get(ENV_BINARY)
|
|
if env_value:
|
|
candidate = Path(env_value).expanduser()
|
|
if candidate.is_file():
|
|
return candidate
|
|
|
|
if DEFAULT_BINARY.is_file():
|
|
return DEFAULT_BINARY
|
|
|
|
which = shutil.which("m2pack")
|
|
if which:
|
|
return Path(which)
|
|
|
|
raise FileNotFoundError(
|
|
f"m2pack binary not found. Build {DEFAULT_BINARY} or set {ENV_BINARY}."
|
|
)
|
|
|
|
|
|
def _run_cli(*args: str) -> dict[str, Any]:
|
|
binary = _resolve_binary()
|
|
cmd = [str(binary), *args, "--json"]
|
|
proc = subprocess.run(
|
|
cmd,
|
|
cwd=REPO_ROOT,
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
|
|
if proc.returncode != 0:
|
|
detail = proc.stderr.strip() or proc.stdout.strip() or f"exit code {proc.returncode}"
|
|
raise RuntimeError(f"m2pack failed: {detail}")
|
|
|
|
stdout = proc.stdout.strip()
|
|
if not stdout:
|
|
return {"ok": True}
|
|
|
|
try:
|
|
return json.loads(stdout)
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError(f"m2pack returned non-JSON output: {stdout}") from exc
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_keygen(out_dir: str) -> dict[str, Any]:
|
|
"""Generate a new master content key and Ed25519 signing keypair."""
|
|
return _run_cli("keygen", "--out-dir", out_dir)
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_build(
|
|
input_dir: str,
|
|
output_archive: str,
|
|
key_file: str,
|
|
signing_secret_key_file: str,
|
|
) -> dict[str, Any]:
|
|
"""Build an .m2p archive from a source directory."""
|
|
return _run_cli(
|
|
"build",
|
|
"--input",
|
|
input_dir,
|
|
"--output",
|
|
output_archive,
|
|
"--key",
|
|
key_file,
|
|
"--sign-secret-key",
|
|
signing_secret_key_file,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_diff(left: str, right: str) -> dict[str, Any]:
|
|
"""Diff two directories and/or .m2p archives using normalized paths and plaintext hashes."""
|
|
return _run_cli("diff", "--left", left, "--right", right)
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_list(archive: str) -> dict[str, Any]:
|
|
"""List entries in an .m2p archive."""
|
|
return _run_cli("list", "--archive", archive)
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_verify(
|
|
archive: str,
|
|
public_key_file: str | None = None,
|
|
key_file: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Verify archive manifest integrity and optionally decrypt all entries."""
|
|
args = ["verify", "--archive", archive]
|
|
if public_key_file:
|
|
args.extend(["--public-key", public_key_file])
|
|
if key_file:
|
|
args.extend(["--key", key_file])
|
|
return _run_cli(*args)
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_extract(
|
|
archive: str,
|
|
output_dir: str,
|
|
key_file: str,
|
|
) -> dict[str, Any]:
|
|
"""Extract an .m2p archive into a directory."""
|
|
return _run_cli(
|
|
"extract",
|
|
"--archive",
|
|
archive,
|
|
"--output",
|
|
output_dir,
|
|
"--key",
|
|
key_file,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_export_client_config(
|
|
key_file: str,
|
|
public_key_file: str,
|
|
output_header: str,
|
|
) -> dict[str, Any]:
|
|
"""Generate M2PackKeys.h for the Windows client tree."""
|
|
return _run_cli(
|
|
"export-client-config",
|
|
"--key",
|
|
key_file,
|
|
"--public-key",
|
|
public_key_file,
|
|
"--output",
|
|
output_header,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def pack_binary_info() -> dict[str, Any]:
|
|
"""Report which m2pack binary the server will execute."""
|
|
binary = _resolve_binary()
|
|
return {
|
|
"ok": True,
|
|
"binary": str(binary),
|
|
"repo_root": str(REPO_ROOT),
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
mcp.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|