Merge branch 'claude/phase-3-mcp'

Add Phase 3 metin-release-mcp — a thin MCP stdio server wrapping the
Phase 1 CLI as 8 release_* tools. Zero business logic: every tool
spawns metin-release --json, parses stdout, returns the envelope
verbatim. Tool schemas mirror the argparse signatures and are covered
by a parametrised test so drift fails loudly. Keeps existing Phase 1
code untouched.
This commit is contained in:
Jan Nedbal
2026-04-14 19:36:18 +02:00
15 changed files with 1184 additions and 0 deletions

View File

@@ -7,6 +7,27 @@ follow semantic versioning: major for incompatible CLI or JSON contract changes,
minor for new subcommands or additive flags, patch for fixes that preserve the minor for new subcommands or additive flags, patch for fixes that preserve the
contract. contract.
## [Unreleased]
### Added
- `metin_release_mcp` package and `metin-release-mcp` console script — a
thin Model Context Protocol stdio server that wraps the Phase 1
`release …` subcommands as eight MCP tools (`release_inspect`,
`release_build_manifest`, `release_sign`, `release_diff_remote`,
`release_upload_blobs`, `release_promote`, `release_verify_public`,
`release_publish`). Each tool's input schema mirrors the CLI's
argparse signature 1:1; the wrapper shells out to
`metin-release <subcommand> --json …` and returns the parsed envelope
verbatim with zero duplicated business logic.
- `[mcp]` optional dependency group pulling in the official `mcp`
Python SDK.
- `docs/mcp.md` — MCP server usage guide.
- `tests/mcp/` — 45 new tests covering the CLI runner (success path,
error-envelope passthrough, unparseable output, missing binary),
tool schemas (mirror checks against each command's argparse), and
dispatch translation (booleans, optionals, paths with spaces).
## [0.1.0] - 2026-04-14 ## [0.1.0] - 2026-04-14
First Phase 1 drop. Implements the minimum asset release path the First Phase 1 drop. Implements the minimum asset release path the

View File

@@ -31,3 +31,15 @@ Add `--json` to get a machine-parseable envelope on stdout. Exit codes:
| 4 | reserved (ERP sync, Phase 2+) | | 4 | reserved (ERP sync, Phase 2+) |
See `docs/cli.md` for the full command reference. See `docs/cli.md` for the full command reference.
## MCP server
The `metin-release-mcp` console script (Phase 3) exposes each Phase 1
subcommand as an MCP tool over stdio. Install with the `mcp` extra:
```
pip install -e .[mcp]
metin-release-mcp --help
```
See `docs/mcp.md` for tool list, client wiring, and error handling.

120
docs/mcp.md Normal file
View File

@@ -0,0 +1,120 @@
# metin-release-mcp
Thin [Model Context Protocol](https://modelcontextprotocol.io/) server that
wraps the Phase 1 `metin-release` CLI. Each `release …` subcommand is exposed
as an MCP tool. The server contains no release business logic: it shells out
to the real CLI with `--json` and returns the parsed envelope verbatim.
## Install
The server ships as an optional extra alongside the main CLI:
```
pip install -e '.[mcp]'
```
This installs the `mcp` Python SDK and adds a `metin-release-mcp` console
script plus a `python -m metin_release_mcp` module entry.
## Running
The server speaks MCP over stdio, so you wire it into an MCP-capable client
(Claude Desktop, Claude Code, etc.) as a stdio command. Example client entry:
```json
{
"mcpServers": {
"metin-release": {
"command": "metin-release-mcp",
"env": {
"METIN_RELEASE_BINARY": "/usr/local/bin/metin-release"
}
}
}
}
```
You can also poke at it directly:
- `metin-release-mcp --help` — list registered tools
- `metin-release-mcp --list-tools` — dump the full tool JSON schemas
- `metin-release-mcp --version`
## Tools
| Tool | CLI subcommand |
|---|---|
| `release_inspect` | `metin-release release inspect` |
| `release_build_manifest` | `metin-release release build-manifest` |
| `release_sign` | `metin-release release sign` |
| `release_diff_remote` | `metin-release release diff-remote` |
| `release_upload_blobs` | `metin-release release upload-blobs` |
| `release_promote` | `metin-release release promote` |
| `release_verify_public` | `metin-release release verify-public` |
| `release_publish` | `metin-release release publish` |
Tool input keys match CLI flag names with `_` instead of `-`
(`--base-url``base_url`, `--dry-run``dry_run`). Boolean fields
correspond to argparse `store_true` flags: pass `true` to set the flag,
omit or pass `false` to leave it off.
### Example invocation
```json
{
"name": "release_inspect",
"arguments": {
"source": "/srv/metin/client"
}
}
```
The server runs `metin-release release inspect --json --source /srv/metin/client`
and returns the full JSON envelope:
```json
{
"ok": true,
"command": "release inspect",
"status": "inspected",
"stats": {
"source_path": "/srv/metin/client",
"file_count": 9166,
"total_bytes": 3523473920,
"launcher_present": true,
"main_exe_present": true
}
}
```
## CLI resolution
On every tool call the server resolves the `metin-release` binary in this order:
1. `METIN_RELEASE_BINARY` environment variable, if set and non-empty
2. `shutil.which("metin-release")` against `PATH`
If neither resolves, the tool call returns a wrapper-level error envelope
(`error.code = "cli_not_found"`).
## Error handling
The server never invents its own release-level errors. There are three paths:
- **Success** — CLI exits 0 with a valid JSON envelope → envelope returned as-is
- **CLI-level failure** — CLI exits non-zero with an `{"ok": false, "error": …}`
envelope → that envelope is returned as-is, plus the CLI's stderr is
attached as a diagnostic text block
- **Wrapper failure** — binary missing, unparseable stdout, unknown tool,
invalid input → synthetic envelope with one of
`cli_not_found`, `cli_unparseable_output`, `unknown_tool`,
`invalid_tool_input`
## Environment variables
| Variable | Purpose |
|---|---|
| `METIN_RELEASE_BINARY` | Override the `metin-release` binary path. |
Any env vars the wrapped CLI honours (`METIN_RELEASE_MAKE_MANIFEST`,
`METIN_RELEASE_SIGN_MANIFEST`) are inherited by the subprocess unchanged.

View File

@@ -22,9 +22,13 @@ dev = [
"pytest>=8", "pytest>=8",
"pytest-mock>=3", "pytest-mock>=3",
] ]
mcp = [
"mcp>=1.0",
]
[project.scripts] [project.scripts]
metin-release = "metin_release.cli:main" metin-release = "metin_release.cli:main"
metin-release-mcp = "metin_release_mcp.server:main"
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]

View File

@@ -0,0 +1,9 @@
"""Thin MCP server wrapping the metin-release CLI.
Exposes each Phase 1 ``release …`` subcommand as an MCP tool. The server
does no business logic of its own: it maps tool input dicts to CLI flags,
spawns ``metin-release <subcommand> --json …`` and returns the parsed JSON
envelope as the tool result.
"""
__version__ = "0.1.0"

View File

@@ -0,0 +1,5 @@
"""Module entry: ``python -m metin_release_mcp``."""
from .server import main
raise SystemExit(main())

View File

@@ -0,0 +1,95 @@
"""Spawns the metin-release CLI and parses its JSON envelope.
This module is the *only* place that knows how to talk to the real
CLI. The rest of the wrapper treats its output as opaque JSON.
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
from dataclasses import dataclass
from typing import Any
from .errors import CliNotFoundError, CliUnparseableOutputError
@dataclass
class CliResult:
"""Parsed outcome of one CLI invocation.
``envelope`` is the parsed JSON dict emitted on stdout. ``stderr``
is the raw stderr text, surfaced for diagnostics. ``returncode`` is
the CLI exit code.
"""
envelope: dict[str, Any]
stderr: str
returncode: int
@property
def ok(self) -> bool:
return self.returncode == 0 and bool(self.envelope.get("ok", False))
def resolve_binary(env: dict[str, str] | None = None) -> str:
"""Locate the ``metin-release`` executable.
Preference order:
1. ``METIN_RELEASE_BINARY`` env var, if set and non-empty
2. :func:`shutil.which` on ``metin-release``
"""
environ = env if env is not None else os.environ
override = environ.get("METIN_RELEASE_BINARY", "").strip()
if override:
return override
found = shutil.which("metin-release")
if not found:
raise CliNotFoundError(
"metin-release binary not found; set METIN_RELEASE_BINARY or install the CLI."
)
return found
def run_cli(
argv_tail: list[str],
*,
binary: str | None = None,
runner: Any = None,
) -> CliResult:
"""Spawn the CLI with ``argv_tail`` and parse its JSON stdout.
``runner`` is an injection seam for tests — if provided, it's called
instead of :func:`subprocess.run` with the same arguments, and must
return an object with ``stdout``, ``stderr``, ``returncode``.
"""
bin_path = binary or resolve_binary()
cmd = [bin_path, *argv_tail]
spawn = runner or subprocess.run
try:
proc = spawn(cmd, capture_output=True, text=True, check=False)
except FileNotFoundError as exc:
raise CliNotFoundError(f"cannot spawn metin-release: {exc}") from exc
stdout = (getattr(proc, "stdout", "") or "").strip()
stderr = getattr(proc, "stderr", "") or ""
rc = int(getattr(proc, "returncode", 1))
if not stdout:
raise CliUnparseableOutputError(
f"metin-release produced no stdout (rc={rc}); stderr={stderr.strip()!r}"
)
try:
envelope = json.loads(stdout)
except json.JSONDecodeError as exc:
raise CliUnparseableOutputError(
f"metin-release stdout was not JSON: {exc}; first 200 chars={stdout[:200]!r}"
) from exc
if not isinstance(envelope, dict):
raise CliUnparseableOutputError(
f"metin-release JSON was not an object: {type(envelope).__name__}"
)
return CliResult(envelope=envelope, stderr=stderr, returncode=rc)

View File

@@ -0,0 +1,45 @@
"""Error shapes surfaced by the MCP wrapper.
The wrapper never invents its own error codes. It either propagates the
``{"ok": false, "error": {...}}`` envelope the CLI produced, or wraps a
wrapper-level failure (binary missing, unparseable output, unknown tool)
in one of the classes below.
"""
from __future__ import annotations
class McpWrapperError(Exception):
"""Base class for errors raised by the MCP wrapper itself.
These are *wrapper* problems (no CLI binary, junk on stdout, bad tool
name). CLI errors are passed through untouched as JSON envelopes.
"""
code: str = "mcp_wrapper_error"
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
def to_envelope(self, *, stderr: str | None = None) -> dict:
err: dict = {"code": self.code, "message": self.message}
if stderr:
err["stderr"] = stderr
return {"ok": False, "error": err}
class CliNotFoundError(McpWrapperError):
code = "cli_not_found"
class CliUnparseableOutputError(McpWrapperError):
code = "cli_unparseable_output"
class UnknownToolError(McpWrapperError):
code = "unknown_tool"
class InvalidToolInputError(McpWrapperError):
code = "invalid_tool_input"

View File

@@ -0,0 +1,172 @@
"""MCP stdio server entry point.
Registers each :data:`~metin_release_mcp.tool_defs.TOOL_SPECS` entry as
an MCP tool. On invocation the handler:
1. Validates input via the tool's JSON schema (enforced by the SDK) and
by :func:`~metin_release_mcp.tool_defs.build_cli_args`.
2. Shells out to ``metin-release <subcommand> --json …`` via
:func:`~metin_release_mcp.cli_runner.run_cli`.
3. Returns the parsed JSON envelope as the MCP tool result.
The server contains **no release business logic**. If the CLI returns a
non-zero exit or an ``ok=false`` envelope, the wrapper passes that
envelope back to the caller verbatim, tagging it with the CLI stderr as
diagnostic text.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import sys
from typing import Any
from . import __version__
from .errors import (
CliNotFoundError,
CliUnparseableOutputError,
InvalidToolInputError,
McpWrapperError,
UnknownToolError,
)
from .cli_runner import run_cli
from .tool_defs import TOOL_SPECS, TOOLS_BY_NAME, build_cli_args, json_schema
def _build_mcp_server(): # pragma: no cover - imported lazily for tests
from mcp.server import Server
from mcp.types import TextContent, Tool
server = Server("metin-release-mcp", version=__version__)
@server.list_tools()
async def _list_tools() -> list[Tool]:
return [
Tool(
name=spec.name,
description=spec.description,
inputSchema=json_schema(spec),
)
for spec in TOOL_SPECS
]
@server.call_tool()
async def _call_tool(name: str, arguments: dict[str, Any] | None):
envelope, stderr = dispatch(name, arguments or {})
text = json.dumps(envelope, indent=2, sort_keys=False)
content = [TextContent(type="text", text=text)]
if stderr.strip():
content.append(
TextContent(type="text", text=f"metin-release stderr:\n{stderr}")
)
return content, envelope
return server
def dispatch(tool_name: str, payload: dict[str, Any]) -> tuple[dict[str, Any], str]:
"""Dispatch a tool call synchronously.
Returns ``(envelope, stderr)``. On wrapper errors the envelope is
shaped like the CLI's own ``{"ok": false, "error": {...}}`` error
envelope so callers can treat both error sources the same way.
"""
spec = TOOLS_BY_NAME.get(tool_name)
if spec is None:
raise UnknownToolError(
f"unknown tool: {tool_name!r} (known: {sorted(TOOLS_BY_NAME)})"
)
try:
argv_tail = build_cli_args(spec, payload)
except InvalidToolInputError as exc:
return exc.to_envelope(), ""
try:
result = run_cli(argv_tail)
except (CliNotFoundError, CliUnparseableOutputError) as exc:
return exc.to_envelope(stderr=None), ""
return result.envelope, result.stderr
def _print_help() -> None:
lines = [
f"metin-release-mcp {__version__}",
"",
"Thin MCP server wrapping the metin-release CLI over stdio.",
"",
"Usage: metin-release-mcp [--help | --version | --list-tools]",
"",
"With no arguments, runs an MCP server on stdio and registers these tools:",
"",
]
for spec in TOOL_SPECS:
lines.append(f" {spec.name:<26} {spec.description}")
lines.append("")
lines.append("Environment:")
lines.append(
" METIN_RELEASE_BINARY Path to metin-release CLI (else resolved via PATH)"
)
print("\n".join(lines))
def _print_tools() -> None:
out = [
{"name": s.name, "description": s.description, "inputSchema": json_schema(s)}
for s in TOOL_SPECS
]
json.dump(out, sys.stdout, indent=2)
sys.stdout.write("\n")
async def _run_stdio() -> None: # pragma: no cover - exercised only end-to-end
from mcp.server.stdio import stdio_server
server = _build_mcp_server()
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="metin-release-mcp",
description="Thin MCP server wrapping the metin-release CLI.",
add_help=False,
)
parser.add_argument("-h", "--help", action="store_true")
parser.add_argument("--version", action="store_true")
parser.add_argument(
"--list-tools",
action="store_true",
help="Print registered tool schemas as JSON and exit.",
)
args = parser.parse_args(argv)
if args.help:
_print_help()
return 0
if args.version:
print(f"metin-release-mcp {__version__}")
return 0
if args.list_tools:
_print_tools()
return 0
try:
asyncio.run(_run_stdio())
except McpWrapperError as exc:
print(json.dumps(exc.to_envelope()), file=sys.stderr)
return 1
except KeyboardInterrupt: # pragma: no cover
return 130
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())

View File

@@ -0,0 +1,232 @@
"""Declarative tool specs for the Phase 3 MCP wrapper.
Each tool corresponds 1:1 to a ``metin-release release …`` subcommand. A
single :class:`ToolSpec` list drives three things:
1. The MCP ``list_tools`` response (via :func:`json_schema`)
2. Input validation for ``call_tool``
3. The CLI flag list emitted by :mod:`cli_runner`
Keeping all three in one place is what lets us guarantee the schema
matches the real argparse signature without duplicating it.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal
FieldType = Literal["string", "integer", "number", "boolean", "path", "array"]
@dataclass(frozen=True)
class FieldSpec:
"""One tool input field.
``name`` is the MCP-facing key (snake_case). ``flag`` is the argparse
flag name; if omitted it's derived from ``name`` by replacing ``_``
with ``-`` and prefixing ``--``. ``kind`` maps to JSON Schema types:
``path`` is a string that the CLI will read as a filesystem path,
``boolean`` is a store_true flag (passed with no value when truthy
and omitted when falsy).
"""
name: str
kind: FieldType
required: bool = False
description: str = ""
flag: str | None = None
default: Any = None
@property
def cli_flag(self) -> str:
return self.flag or f"--{self.name.replace('_', '-')}"
@property
def json_type(self) -> str:
if self.kind == "path":
return "string"
if self.kind == "array":
return "array"
return self.kind
@dataclass(frozen=True)
class ToolSpec:
name: str
subcommand: tuple[str, ...]
description: str
fields: tuple[FieldSpec, ...] = field(default_factory=tuple)
# ---------------------------------------------------------------------------
# Tool catalogue — must stay 1:1 with metin_release.commands.*.add_parser
# ---------------------------------------------------------------------------
_RELEASE = ("release",)
TOOL_SPECS: tuple[ToolSpec, ...] = (
ToolSpec(
name="release_inspect",
subcommand=_RELEASE + ("inspect",),
description="Scan a client source root and report file/byte counts plus launcher/main-exe presence.",
fields=(
FieldSpec("source", "path", required=True, description="Client root directory."),
),
),
ToolSpec(
name="release_build_manifest",
subcommand=_RELEASE + ("build-manifest",),
description="Build a signed-ready manifest.json for a source tree.",
fields=(
FieldSpec("source", "path", required=True, description="Client source root."),
FieldSpec("version", "string", required=True, description="Release version, e.g. 2026.04.14-1."),
FieldSpec("out", "path", required=True, description="Output manifest.json path."),
FieldSpec("previous", "string", description="Previous release version, if any."),
FieldSpec("notes", "path", description="Path to a release-notes file."),
FieldSpec("launcher", "string", description="Launcher filename (default Metin2Launcher.exe)."),
FieldSpec("created_at", "string", description="Override created_at for reproducible test runs."),
),
),
ToolSpec(
name="release_sign",
subcommand=_RELEASE + ("sign",),
description="Sign a manifest.json with an Ed25519 private key (mode-600 enforced).",
fields=(
FieldSpec("manifest", "path", required=True, description="Path to manifest.json."),
FieldSpec("key", "path", required=True, description="Absolute path to raw 32-byte private key."),
FieldSpec("out", "path", description="Signature output path (default: <manifest>.sig)."),
),
),
ToolSpec(
name="release_diff_remote",
subcommand=_RELEASE + ("diff-remote",),
description="HEAD every manifest blob hash against a base URL and report the missing set.",
fields=(
FieldSpec("manifest", "path", required=True, description="Path to manifest.json."),
FieldSpec("base_url", "string", required=True, description="Remote base URL."),
FieldSpec("timeout", "number", description="Per-request timeout in seconds (default 10)."),
),
),
ToolSpec(
name="release_upload_blobs",
subcommand=_RELEASE + ("upload-blobs",),
description="Rsync a release directory (excluding manifest) to the target.",
fields=(
FieldSpec("release_dir", "path", required=True, description="Local release output directory."),
FieldSpec("rsync_target", "string", required=True, description="rsync destination."),
FieldSpec("dry_run", "boolean", description="Run rsync --dry-run."),
FieldSpec("yes", "boolean", description="Skip interactive confirmation."),
),
),
ToolSpec(
name="release_promote",
subcommand=_RELEASE + ("promote",),
description="Promote a staged release by pushing manifest.json + manifest.json.sig.",
fields=(
FieldSpec("release_dir", "path", required=True, description="Local release directory."),
FieldSpec("rsync_target", "string", required=True, description="rsync destination top-level."),
FieldSpec("yes", "boolean", description="Skip interactive confirmation."),
FieldSpec("dry_run", "boolean", description="Run rsync --dry-run."),
),
),
ToolSpec(
name="release_verify_public",
subcommand=_RELEASE + ("verify-public",),
description="Fetch manifest.json + signature from a base URL and Ed25519-verify.",
fields=(
FieldSpec("base_url", "string", required=True, description="Remote base URL."),
FieldSpec("public_key", "string", required=True, description="Ed25519 public key: hex or path."),
FieldSpec("sample_blobs", "integer", description="GET and hash-check N random blobs."),
FieldSpec("timeout", "number", description="Per-request timeout in seconds (default 15)."),
),
),
ToolSpec(
name="release_publish",
subcommand=_RELEASE + ("publish",),
description="End-to-end release: build-manifest -> sign -> upload-blobs -> promote -> verify-public.",
fields=(
FieldSpec("source", "path", required=True, description="Client source root."),
FieldSpec("version", "string", required=True, description="Release version."),
FieldSpec("out", "path", required=True, description="Release output directory."),
FieldSpec("key", "path", required=True, description="Signing key path (mode 600)."),
FieldSpec("rsync_target", "string", required=True, description="rsync target for blobs + manifest."),
FieldSpec("base_url", "string", required=True, description="Public base URL for verification."),
FieldSpec("public_key", "string", required=True, description="Public key (hex or file)."),
FieldSpec("previous", "string", description="Previous release version."),
FieldSpec("notes", "path", description="Release notes file."),
FieldSpec("launcher", "string", description="Launcher filename."),
FieldSpec("created_at", "string", description="Override manifest created_at."),
FieldSpec("sample_blobs", "integer", description="verify-public sample count."),
FieldSpec("yes", "boolean", description="Skip interactive confirmation."),
FieldSpec("force", "boolean", description="Allow non-empty output directory."),
FieldSpec("dry_run_upload", "boolean", description="rsync --dry-run for upload + promote."),
),
),
)
TOOLS_BY_NAME: dict[str, ToolSpec] = {t.name: t for t in TOOL_SPECS}
def json_schema(spec: ToolSpec) -> dict[str, Any]:
"""Build a JSON Schema for one tool's input object."""
props: dict[str, Any] = {}
required: list[str] = []
for f in spec.fields:
prop: dict[str, Any] = {"type": f.json_type}
if f.description:
prop["description"] = f.description
props[f.name] = prop
if f.required:
required.append(f.name)
schema: dict[str, Any] = {
"type": "object",
"properties": props,
"additionalProperties": False,
}
if required:
schema["required"] = required
return schema
def build_cli_args(spec: ToolSpec, payload: dict[str, Any]) -> list[str]:
"""Translate a tool input dict into a ``metin-release`` argv tail.
Output is ``[*subcommand, "--json", *flags]``. Unknown keys raise
:class:`~metin_release_mcp.errors.InvalidToolInputError`. Required
fields that are missing do the same.
"""
from .errors import InvalidToolInputError
known = {f.name for f in spec.fields}
unknown = set(payload) - known
if unknown:
raise InvalidToolInputError(
f"tool {spec.name} got unknown fields: {sorted(unknown)}"
)
argv: list[str] = [*spec.subcommand, "--json"]
for f in spec.fields:
if f.name not in payload or payload[f.name] is None:
if f.required:
raise InvalidToolInputError(
f"tool {spec.name} missing required field: {f.name}"
)
continue
value = payload[f.name]
if f.kind == "boolean":
if bool(value):
argv.append(f.cli_flag)
continue
if f.kind == "array":
if not isinstance(value, list):
raise InvalidToolInputError(
f"field {f.name} must be an array"
)
for item in value:
argv.extend([f.cli_flag, str(item)])
continue
argv.extend([f.cli_flag, str(value)])
return argv

0
tests/mcp/__init__.py Normal file
View File

32
tests/mcp/conftest.py Normal file
View File

@@ -0,0 +1,32 @@
"""Fixtures for the MCP wrapper test suite."""
from __future__ import annotations
import json
from dataclasses import dataclass
@dataclass
class FakeProc:
stdout: str
stderr: str = ""
returncode: int = 0
def make_runner(envelope: dict | None = None, *, stdout: str | None = None, stderr: str = "", returncode: int = 0):
"""Return a callable compatible with :func:`subprocess.run`.
Records the most recent ``cmd`` on ``runner.calls``.
"""
calls: list[list[str]] = []
def runner(cmd, capture_output=True, text=True, check=False): # noqa: ARG001
calls.append(list(cmd))
if stdout is not None:
s = stdout
else:
s = json.dumps(envelope if envelope is not None else {"ok": True, "command": "x", "status": "ok"})
return FakeProc(stdout=s, stderr=stderr, returncode=returncode)
runner.calls = calls # type: ignore[attr-defined]
return runner

View File

@@ -0,0 +1,86 @@
"""Tests for :mod:`metin_release_mcp.cli_runner`."""
from __future__ import annotations
import json
import pytest
from metin_release_mcp import cli_runner
from metin_release_mcp.errors import CliNotFoundError, CliUnparseableOutputError
from .conftest import make_runner
def test_run_cli_success_parses_envelope():
runner = make_runner({"ok": True, "command": "release inspect", "status": "inspected", "stats": {"file_count": 3}})
result = cli_runner.run_cli(
["release", "inspect", "--json", "--source", "/tmp/x"],
binary="/fake/metin-release",
runner=runner,
)
assert result.ok is True
assert result.envelope["status"] == "inspected"
assert result.envelope["stats"]["file_count"] == 3
assert result.returncode == 0
assert runner.calls[0][0] == "/fake/metin-release"
assert "--json" in runner.calls[0]
def test_run_cli_error_envelope_passed_through():
err = {
"ok": False,
"command": "release sign",
"status": "failed",
"error": {"code": "key_permission", "message": "key must be mode 600"},
}
runner = make_runner(err, stderr="error: key_permission\n", returncode=3)
result = cli_runner.run_cli(["release", "sign"], binary="/fake/mr", runner=runner)
assert result.ok is False
assert result.returncode == 3
assert result.envelope["error"]["code"] == "key_permission"
assert "key_permission" in result.stderr
def test_run_cli_unparseable_output_raises():
runner = make_runner(stdout="not-json-at-all", returncode=0)
with pytest.raises(CliUnparseableOutputError):
cli_runner.run_cli(["release", "inspect"], binary="/fake/mr", runner=runner)
def test_run_cli_empty_stdout_raises():
runner = make_runner(stdout=" \n", stderr="boom", returncode=2)
with pytest.raises(CliUnparseableOutputError):
cli_runner.run_cli(["release", "inspect"], binary="/fake/mr", runner=runner)
def test_run_cli_non_object_json_raises():
runner = make_runner(stdout=json.dumps([1, 2, 3]))
with pytest.raises(CliUnparseableOutputError):
cli_runner.run_cli(["release", "inspect"], binary="/fake/mr", runner=runner)
def test_run_cli_file_not_found_raises():
def boom(*args, **kwargs):
raise FileNotFoundError("no such file")
with pytest.raises(CliNotFoundError):
cli_runner.run_cli(["release", "inspect"], binary="/does/not/exist", runner=boom)
def test_resolve_binary_env_override(monkeypatch):
monkeypatch.setenv("METIN_RELEASE_BINARY", "/custom/path/metin-release")
assert cli_runner.resolve_binary() == "/custom/path/metin-release"
def test_resolve_binary_which_fallback(monkeypatch):
monkeypatch.delenv("METIN_RELEASE_BINARY", raising=False)
monkeypatch.setattr(cli_runner.shutil, "which", lambda name: "/usr/bin/metin-release")
assert cli_runner.resolve_binary() == "/usr/bin/metin-release"
def test_resolve_binary_missing_raises(monkeypatch):
monkeypatch.delenv("METIN_RELEASE_BINARY", raising=False)
monkeypatch.setattr(cli_runner.shutil, "which", lambda name: None)
with pytest.raises(CliNotFoundError):
cli_runner.resolve_binary()

View File

@@ -0,0 +1,211 @@
"""Dispatch tests — input dict to CLI argv translation."""
from __future__ import annotations
import json
import pytest
from metin_release_mcp import cli_runner, server
from metin_release_mcp.tool_defs import TOOLS_BY_NAME, build_cli_args
from .conftest import make_runner
def test_inspect_maps_source_to_flag():
spec = TOOLS_BY_NAME["release_inspect"]
argv = build_cli_args(spec, {"source": "/tmp/client"})
assert argv == ["release", "inspect", "--json", "--source", "/tmp/client"]
def test_build_manifest_full_flags():
spec = TOOLS_BY_NAME["release_build_manifest"]
argv = build_cli_args(
spec,
{
"source": "/src",
"version": "2026.04.14-1",
"out": "/out/manifest.json",
"previous": "2026.04.13-1",
"notes": "/notes.md",
"launcher": "Metin2Launcher.exe",
"created_at": "2026-04-14T00:00:00Z",
},
)
# Required args present in order declared
assert argv[0:3] == ["release", "build-manifest", "--json"]
assert "--source" in argv and argv[argv.index("--source") + 1] == "/src"
assert "--version" in argv and argv[argv.index("--version") + 1] == "2026.04.14-1"
assert "--out" in argv and argv[argv.index("--out") + 1] == "/out/manifest.json"
assert "--previous" in argv and argv[argv.index("--previous") + 1] == "2026.04.13-1"
assert "--created-at" in argv
assert argv[argv.index("--created-at") + 1] == "2026-04-14T00:00:00Z"
def test_build_manifest_omits_missing_optionals():
spec = TOOLS_BY_NAME["release_build_manifest"]
argv = build_cli_args(
spec,
{"source": "/src", "version": "v1", "out": "/m.json"},
)
assert "--previous" not in argv
assert "--notes" not in argv
assert "--launcher" not in argv
assert "--created-at" not in argv
def test_none_valued_optional_is_omitted():
spec = TOOLS_BY_NAME["release_build_manifest"]
argv = build_cli_args(
spec,
{"source": "/s", "version": "v", "out": "/m", "previous": None, "notes": None},
)
assert "--previous" not in argv
assert "--notes" not in argv
def test_boolean_flag_true_adds_flag_without_value():
spec = TOOLS_BY_NAME["release_upload_blobs"]
argv = build_cli_args(
spec,
{
"release_dir": "/rel",
"rsync_target": "user@host:/srv/updates",
"dry_run": True,
"yes": True,
},
)
assert "--dry-run" in argv
assert "--yes" in argv
# No value should follow --dry-run
i = argv.index("--dry-run")
# next element either end of list or another flag
assert i == len(argv) - 1 or argv[i + 1].startswith("--")
def test_boolean_flag_false_omits_flag():
spec = TOOLS_BY_NAME["release_upload_blobs"]
argv = build_cli_args(
spec,
{"release_dir": "/rel", "rsync_target": "tgt", "dry_run": False, "yes": False},
)
assert "--dry-run" not in argv
assert "--yes" not in argv
def test_path_with_spaces_passes_through_as_single_argv_element():
spec = TOOLS_BY_NAME["release_inspect"]
argv = build_cli_args(spec, {"source": "/tmp/has spaces/client root"})
# argv is a list — no shell involved, so spaces stay in one element
assert "/tmp/has spaces/client root" in argv
assert argv.count("--source") == 1
def test_diff_remote_numeric_timeout_serialised_as_string():
spec = TOOLS_BY_NAME["release_diff_remote"]
argv = build_cli_args(
spec,
{"manifest": "/m.json", "base_url": "https://x/", "timeout": 7.5},
)
assert argv[argv.index("--timeout") + 1] == "7.5"
def test_publish_mixes_booleans_and_strings():
spec = TOOLS_BY_NAME["release_publish"]
argv = build_cli_args(
spec,
{
"source": "/src",
"version": "v1",
"out": "/out",
"key": "/k",
"rsync_target": "t",
"base_url": "https://x/",
"public_key": "deadbeef",
"force": True,
"dry_run_upload": True,
"yes": False,
},
)
assert "--force" in argv
assert "--dry-run-upload" in argv
assert "--yes" not in argv
# ---------------------------------------------------------------------------
# server.dispatch integration against a stubbed CLI runner
# ---------------------------------------------------------------------------
@pytest.fixture
def stub_runner(monkeypatch):
def install(envelope=None, *, stdout=None, stderr="", returncode=0):
runner = make_runner(envelope, stdout=stdout, stderr=stderr, returncode=returncode)
monkeypatch.setattr(
server,
"run_cli",
lambda argv_tail: cli_runner.run_cli(
argv_tail, binary="/fake/metin-release", runner=runner
),
)
return runner
return install
def test_dispatch_success_returns_envelope(stub_runner):
runner = stub_runner(
{"ok": True, "command": "release inspect", "status": "inspected", "stats": {"file_count": 42}}
)
envelope, stderr = server.dispatch("release_inspect", {"source": "/tmp/x"})
assert envelope["ok"] is True
assert envelope["stats"]["file_count"] == 42
# the runner saw --json
assert "--json" in runner.calls[0]
assert "--source" in runner.calls[0]
assert "/tmp/x" in runner.calls[0]
def test_dispatch_cli_error_envelope_passes_through(stub_runner):
err = {
"ok": False,
"command": "release sign",
"status": "failed",
"error": {"code": "key_permission", "message": "mode 644"},
}
stub_runner(err, stderr="error\n", returncode=3)
envelope, stderr = server.dispatch(
"release_sign", {"manifest": "/m.json", "key": "/k"}
)
assert envelope == err
assert "error" in stderr
def test_dispatch_cli_binary_missing_returns_wrapper_error(monkeypatch):
def boom(argv_tail):
from metin_release_mcp.errors import CliNotFoundError
raise CliNotFoundError("metin-release binary not found")
monkeypatch.setattr(server, "run_cli", boom)
envelope, stderr = server.dispatch("release_inspect", {"source": "/x"})
assert envelope["ok"] is False
assert envelope["error"]["code"] == "cli_not_found"
def test_dispatch_invalid_input_returns_wrapper_error():
envelope, _ = server.dispatch("release_inspect", {})
assert envelope["ok"] is False
assert envelope["error"]["code"] == "invalid_tool_input"
def test_dispatch_unparseable_output_returns_wrapper_error(monkeypatch):
def boom(argv_tail):
from metin_release_mcp.errors import CliUnparseableOutputError
raise CliUnparseableOutputError("not json")
monkeypatch.setattr(server, "run_cli", boom)
envelope, _ = server.dispatch("release_inspect", {"source": "/x"})
assert envelope["ok"] is False
assert envelope["error"]["code"] == "cli_unparseable_output"

View File

@@ -0,0 +1,140 @@
"""Schema tests — each MCP tool must mirror its CLI subcommand exactly."""
from __future__ import annotations
import argparse
import pytest
from metin_release.commands import (
build_manifest,
diff_remote,
inspect,
promote,
publish,
sign,
upload_blobs,
verify_public,
)
from metin_release_mcp.errors import UnknownToolError
from metin_release_mcp.server import dispatch
from metin_release_mcp.tool_defs import (
TOOL_SPECS,
TOOLS_BY_NAME,
build_cli_args,
json_schema,
)
EXPECTED_TOOL_NAMES = {
"release_inspect",
"release_build_manifest",
"release_sign",
"release_diff_remote",
"release_upload_blobs",
"release_promote",
"release_verify_public",
"release_publish",
}
def test_tool_catalogue_is_exactly_the_phase_one_set():
assert {t.name for t in TOOL_SPECS} == EXPECTED_TOOL_NAMES
assert len(TOOL_SPECS) == 8
def test_every_tool_has_release_subcommand_and_description():
for spec in TOOL_SPECS:
assert spec.subcommand[0] == "release"
assert spec.description.strip()
schema = json_schema(spec)
assert schema["type"] == "object"
assert schema["additionalProperties"] is False
def _argparse_flags_for(add_parser_fn) -> dict[str, dict]:
"""Introspect a command's argparse signature.
Returns a mapping from flag name (e.g. ``--source``) to a dict with
``required`` and ``kind`` ("boolean" for store_true, else "value").
"""
parser = argparse.ArgumentParser()
sub = parser.add_subparsers()
sp = add_parser_fn(sub)
out: dict[str, dict] = {}
for action in sp._actions:
if not action.option_strings:
continue
if action.dest in ("help",):
continue
flag = action.option_strings[0]
is_bool = isinstance(action, argparse._StoreTrueAction)
out[flag] = {"required": action.required, "kind": "boolean" if is_bool else "value"}
return out
_COMMAND_MODULES = {
"release_inspect": inspect,
"release_build_manifest": build_manifest,
"release_sign": sign,
"release_diff_remote": diff_remote,
"release_upload_blobs": upload_blobs,
"release_promote": promote,
"release_verify_public": verify_public,
"release_publish": publish,
}
@pytest.mark.parametrize("tool_name", sorted(EXPECTED_TOOL_NAMES))
def test_schema_mirrors_argparse(tool_name):
spec = TOOLS_BY_NAME[tool_name]
mod = _COMMAND_MODULES[tool_name]
argparse_flags = _argparse_flags_for(mod.add_parser)
spec_flags = {f.cli_flag: f for f in spec.fields}
assert set(spec_flags) == set(argparse_flags), (
f"{tool_name}: spec flags {set(spec_flags)} != argparse flags {set(argparse_flags)}"
)
for flag, info in argparse_flags.items():
field = spec_flags[flag]
assert field.required == info["required"], f"{tool_name} {flag} required mismatch"
if info["kind"] == "boolean":
assert field.kind == "boolean", f"{tool_name} {flag} expected boolean"
else:
assert field.kind != "boolean", f"{tool_name} {flag} should not be boolean"
@pytest.mark.parametrize("tool_name", sorted(EXPECTED_TOOL_NAMES))
def test_required_fields_in_json_schema(tool_name):
spec = TOOLS_BY_NAME[tool_name]
schema = json_schema(spec)
required_in_spec = {f.name for f in spec.fields if f.required}
required_in_schema = set(schema.get("required", []))
assert required_in_schema == required_in_spec
def test_unknown_tool_rejected_by_dispatch():
with pytest.raises(UnknownToolError):
dispatch("nope_not_a_tool", {})
def test_unknown_tool_name_not_in_catalogue():
assert "release_rollback" not in TOOLS_BY_NAME
assert "erp_reserve" not in TOOLS_BY_NAME
assert "m2pack_build" not in TOOLS_BY_NAME
def test_build_cli_args_rejects_missing_required():
from metin_release_mcp.errors import InvalidToolInputError
spec = TOOLS_BY_NAME["release_inspect"]
with pytest.raises(InvalidToolInputError):
build_cli_args(spec, {})
def test_build_cli_args_rejects_unknown_fields():
from metin_release_mcp.errors import InvalidToolInputError
spec = TOOLS_BY_NAME["release_inspect"]
with pytest.raises(InvalidToolInputError):
build_cli_args(spec, {"source": "/x", "nonsense": 1})