20 Commits

Author SHA1 Message Date
Jan Nedbal
5c32224ff0 Merge branch 'claude/phase-4-m2pack'
Add Phase 4 m2pack CLI + MCP wrapping. New top-level 'metin-release
m2pack' command group with build, verify, diff, and export-runtime-key
subcommands that shell out to the real m2pack-secure binary (resolved
via M2PACK_BINARY env or PATH). Each wrapper translates m2pack's own
JSON envelope into the canonical Result shape, keeping the raw m2pack
output under data.m2pack for callers that need the untranslated version.
The MCP server exposes the same four as m2pack_* tools via the existing
ToolSpec catalogue; all 12 tools (8 release_* + 4 m2pack_*) appear in
--list-tools and pass the parametrised schema mirror test.

93 tests green (71 -> 93), no new dependencies.
2026-04-14 22:33:13 +02:00
Jan Nedbal
22f67a0589 docs: document m2pack subcommands
Add a Phase 4 'm2pack commands' section to docs/cli.md with each
subcommand's flags and a pointer at the m2pack-secure repo for
installation. Update README.md with a short m2pack paragraph and
append the Phase 4 entry to the Unreleased CHANGELOG section.
2026-04-14 22:31:16 +02:00
Jan Nedbal
1201ec50d2 tests: cover m2pack cli and mcp additions
Add discovery-helper tests (env var override, PATH fallback, missing
binary) and command tests that point M2PACK_BINARY at a Python stub
script echoing canned JSON per subcommand. Cover success paths,
non-zero exit, non-JSON output, JSON-array output, and missing binary.
Extend the MCP schema mirror test to cover all 12 tools and add
dispatch tests for the new m2pack_* argv translation.
2026-04-14 22:31:11 +02:00
Jan Nedbal
c4c65e2fe7 mcp: expose m2pack_* tools in the mcp server
Add four ToolSpec entries (m2pack_build, m2pack_verify, m2pack_diff,
m2pack_export_runtime_key) mirroring the CLI argparse signatures.
server.py auto-enumerates TOOL_SPECS so no wiring change needed.
2026-04-14 22:31:05 +02:00
Jan Nedbal
a289cd7c25 cli: add m2pack command group to dispatcher
Register the m2pack subparser and wire build/verify/diff/export-runtime-key
into _COMMAND_MAP alongside the existing release group.
2026-04-14 22:31:00 +02:00
Jan Nedbal
197c5ba8a2 cli: implement m2pack diff and export-runtime-key wrappers
diff promotes m2pack's added/removed/changed/unchanged counts into
data.stats when m2pack reports them as lists or ints. export-runtime-key
follows the real binary's --key/--public-key/--output/--key-id/--format
surface (not the original plan's --pack/--master-key).
2026-04-14 22:30:56 +02:00
Jan Nedbal
ae0cbb7e9b cli: implement m2pack build and verify wrappers
Shell out to m2pack-secure with --json, parse its envelope, and
translate into the standard metin-release Result envelope under
data.m2pack. Non-zero exit and non-JSON output map to SubprocessError
with m2pack_failed / m2pack_invalid_json / m2pack_empty_output codes.
2026-04-14 22:30:51 +02:00
Jan Nedbal
70d20f0f18 cli: add m2pack binary discovery helper
Resolve the m2pack-secure binary via M2PACK_BINARY env var or PATH,
raising ValidationError(m2pack_not_found) when neither works.
2026-04-14 22:30:45 +02:00
Jan Nedbal
50ea80d64b Merge branch 'claude/phase-3-mcp'
Add Phase 3 metin-release-mcp — a thin MCP stdio server wrapping the
Phase 1 CLI as 8 release_* tools. Zero business logic: every tool
spawns metin-release --json, parses stdout, returns the envelope
verbatim. Tool schemas mirror the argparse signatures and are covered
by a parametrised test so drift fails loudly. Keeps existing Phase 1
code untouched.
2026-04-14 19:36:18 +02:00
Jan Nedbal
b2283b0c3f docs: document mcp server usage 2026-04-14 19:33:48 +02:00
Jan Nedbal
a8ae85c0a4 tests: add mcp wrapper test suite 2026-04-14 19:33:41 +02:00
Jan Nedbal
79df538226 mcp: add server stdio entry and --help 2026-04-14 19:33:33 +02:00
Jan Nedbal
7117d25a9a mcp: register release_* tools mapping 1:1 to cli subcommands 2026-04-14 19:33:25 +02:00
Jan Nedbal
860face1d1 mcp: add cli runner that spawns metin-release and parses json 2026-04-14 19:33:18 +02:00
Jan Nedbal
d55291e75e mcp: scaffold metin_release_mcp package and pyproject entry 2026-04-14 19:33:00 +02:00
Jan Nedbal
6e71ddb635 docs: add changelog with phase 1 notes
Seed CHANGELOG.md in Keep a Changelog format, document every Phase 1
subcommand plus the --json flag placement fix as the initial 0.1.0
entry. Future minors and patches add sections above this one.
2026-04-14 19:21:55 +02:00
Jan Nedbal
362bd6ae7c cli: accept --json/-v/-q on every subcommand, not only top level
Before this change, only the top-level parser defined --json, -v and -q.
Argparse processes arguments left-to-right and hands off to the subparser
after seeing the subcommand name, so the idiomatic

    metin-release release inspect --source X --json

failed with 'unrecognized arguments: --json'. Users had to write

    metin-release --json release inspect --source X

which is the opposite of what every modern CLI does. Attach a shared
--json/-v/-q flag set to every subparser via a small helper. Same dest
means the last occurrence on the command line wins, which is the intuitive
behaviour. Both placements are now accepted; tests unchanged.
2026-04-14 19:04:55 +02:00
Jan Nedbal
e4f91c9cd0 docs: add phase 1 cli reference 2026-04-14 18:59:50 +02:00
Jan Nedbal
d2dd2c88b6 tests: cover phase 1 commands end to end
Pytest suite with a tiny_client fixture, an ephemeral Ed25519 keypair
fixture, and a threaded HTTPServer helper. Exercises cli dispatch,
inspect (including excluded-path handling), build-manifest and sign
against the real m2dev-client scripts, diff-remote via a local server,
and the full release publish composite against a local rsync target.
2026-04-14 18:59:50 +02:00
Jan Nedbal
e70fc300e2 cli: scaffold phase 1 asset release commands
Add metin-release CLI with argparse dispatcher, result envelope and
error hierarchy, and the full Phase 1 release subcommand set:

  release inspect         - scan source root
  release build-manifest  - wraps make-manifest.py
  release sign            - wraps sign-manifest.py, enforces mode 600
  release diff-remote     - HEAD each blob hash against a base URL
  release upload-blobs    - rsync release dir minus manifest
  release promote         - rsync manifest.json + signature
  release verify-public   - GET + Ed25519 verify, optional blob sampling
  release publish         - composite of the above with per-stage timings

Respects --json / --verbose / --quiet. Exit codes follow the plan
(1 validation, 2 remote, 3 integrity, 4 reserved for ERP).
2026-04-14 18:59:50 +02:00
57 changed files with 4193 additions and 1 deletions

6
.gitignore vendored
View File

@@ -101,6 +101,12 @@ ipython_config.py
# commonly ignored for libraries.
#uv.lock
# project-local
/out/
/staging/
/.venv/
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more

74
CHANGELOG.md Normal file
View File

@@ -0,0 +1,74 @@
# Changelog
All notable changes to `metin-release-cli` are documented here.
Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). Versions
follow semantic versioning: major for incompatible CLI or JSON contract changes,
minor for new subcommands or additive flags, patch for fixes that preserve the
contract.
## [Unreleased]
### Added
- `metin-release m2pack <build|verify|diff|export-runtime-key>` subcommands that wrap the m2pack-secure binary.
- `m2pack_build`, `m2pack_verify`, `m2pack_diff`, `m2pack_export_runtime_key` MCP tools mirroring the CLI surface.
- `src/metin_release/m2pack_binary.py` binary-discovery helper using `M2PACK_BINARY` env var or `PATH`.
- `metin_release_mcp` package and `metin-release-mcp` console script — a
thin Model Context Protocol stdio server that wraps the Phase 1
`release …` subcommands as eight MCP tools (`release_inspect`,
`release_build_manifest`, `release_sign`, `release_diff_remote`,
`release_upload_blobs`, `release_promote`, `release_verify_public`,
`release_publish`). Each tool's input schema mirrors the CLI's
argparse signature 1:1; the wrapper shells out to
`metin-release <subcommand> --json …` and returns the parsed envelope
verbatim with zero duplicated business logic.
- `[mcp]` optional dependency group pulling in the official `mcp`
Python SDK.
- `docs/mcp.md` — MCP server usage guide.
- `tests/mcp/` — 45 new tests covering the CLI runner (success path,
error-envelope passthrough, unparseable output, missing binary),
tool schemas (mirror checks against each command's argparse), and
dispatch translation (booleans, optionals, paths with spaces).
## [0.1.0] - 2026-04-14
First Phase 1 drop. Implements the minimum asset release path the
`metin-release-cli-plan.md` calls for — everything the launcher contract needs
to publish a signed manifest and content-addressed blob tree.
### Added
- `metin-release` command, `python -m metin_release` module entry, and
`metin-release --version` resolving via `importlib.metadata`
- Top-level and per-subcommand `--json`, `-v`, `-q` flags (both placements work)
- Structured result envelope and exit-code contract (`0/1/2/3/4`) from the plan
- `release inspect` — scan a client source root and report counts + flags
- `release build-manifest` — wrap `make-manifest.py` and report manifest stats
- `release sign` — wrap `sign-manifest.py` and verify 64-byte output
- `release diff-remote` — HEAD blob URLs and report the missing set
- `release upload-blobs` — rsync push blobs + archive to a target, with
`--dry-run` and `--yes`, excluding `manifest.json*` so promote stays atomic
- `release promote` — upload only `manifest.json` and `manifest.json.sig`
- `release verify-public` — fetch + Ed25519-verify the published manifest,
optional `--sample-blobs N` sha256 re-check
- `release publish` — composite of build-manifest → sign → upload-blobs →
promote → verify-public with per-stage timings and short-circuit on failure
- Error hierarchy in `errors.py` mapping each exception class to the right
exit code and a stable `error_code` string for machine parsing
- pytest suite with 18 tests covering dispatch, inspect, build-manifest, sign,
diff-remote (via a local HTTPServer fixture), and the full publish composite
against a local rsync target
- `docs/cli.md` man-page-style reference
### Notes
- Runtime deps pinned to `cryptography` and `requests` only. Dev deps are
`pytest` + `pytest-mock`.
- `make-manifest.py` and `sign-manifest.py` are discovered by convention at
`scripts/make-manifest.py` / `scripts/sign-manifest.py` relative to the CLI
repo, or overridden via `METIN_RELEASE_MAKE_MANIFEST` and
`METIN_RELEASE_SIGN_MANIFEST` env vars.
- Phase 2 (`erp …` subcommands), Phase 3 (MCP wrapper), Phase 4 (`m2pack …`
subcommands) and Phase 5 (`launcher publish`) are explicitly out of scope for
this release and will land in future minors.

View File

@@ -1,3 +1,59 @@
# metin-release-cli
Python CLI for orchestrating Metin2 client releases (Phase 1 of metin-release-cli-plan)
Python CLI that orchestrates Metin2 client releases — builds manifests, signs
them, uploads content-addressed blobs, and promotes the new release atomically.
Phase 1 wraps `make-manifest.py` and `sign-manifest.py` from the `m2dev-client`
repo and adds remote diff, upload, promote, and public verification.
## Install
```
pip install -e .[dev]
```
## Usage
```
metin-release --help
metin-release release inspect --source /path/to/client
metin-release release publish --source ... --version 2026.04.14-1 ...
```
Add `--json` to get a machine-parseable envelope on stdout. Exit codes:
| Code | Meaning |
|------|--------------------------------------|
| 0 | success |
| 1 | operator / validation error |
| 2 | remote or network error |
| 3 | signing or integrity error |
| 4 | reserved (ERP sync, Phase 2+) |
See `docs/cli.md` for the full command reference.
## m2pack commands
Phase 4 adds a `metin-release m2pack …` command group that wraps the
[`m2pack-secure`](https://gitea.jakubkadlec.dev/metin-server/m2pack-secure)
binary for building, verifying, diffing, and runtime-key-exporting
signed `.m2p` archives. The binary is not bundled — build it from the
m2pack-secure repo and either put it on `PATH` or set `M2PACK_BINARY`
to an absolute path.
```
metin-release m2pack build --input ... --output a.m2p --key ck --sign-secret-key sk
metin-release m2pack verify --archive a.m2p --public-key pk
```
## MCP server
The `metin-release-mcp` console script (Phase 3) exposes each Phase 1
subcommand as an MCP tool over stdio. Install with the `mcp` extra:
```
pip install -e .[mcp]
metin-release-mcp --help
```
See `docs/mcp.md` for tool list, client wiring, and error handling.

220
docs/cli.md Normal file
View File

@@ -0,0 +1,220 @@
# metin-release — CLI reference
Phase 1 `release …` commands and Phase 4 `m2pack …` commands. All
subcommands share the top-level flags.
## Top-level flags
| Flag | Description |
|---|---|
| `--version` | Print package version and exit. |
| `--json` | Emit only the JSON envelope on stdout (stderr may still carry logs). |
| `-v`, `--verbose` | Verbose stderr logging. |
| `-q`, `--quiet` | Suppress stderr logging entirely. |
## Output envelope
Every command writes a JSON envelope on stdout:
```json
{
"ok": true,
"command": "release inspect",
"status": "inspected",
"stats": { "...": "..." }
}
```
On failure:
```json
{
"ok": false,
"command": "release inspect",
"status": "failed",
"error": { "code": "source_not_found", "message": "..." }
}
```
## Exit codes
| Code | Meaning |
|---|---|
| 0 | Success |
| 1 | Operator or validation error |
| 2 | Remote / network error |
| 3 | Signing or integrity error |
| 4 | Reserved (ERP sync, Phase 2+) |
## `release inspect`
Scan a client source root and report file counts plus launcher/main-exe detection.
```
metin-release release inspect --source /path/to/client
```
No writes. JSON stats: `source_path`, `file_count`, `total_bytes`, `launcher_present`, `main_exe_present`.
## `release build-manifest`
Wraps `make-manifest.py` to produce `manifest.json` for a source tree.
```
metin-release release build-manifest \
--source /path/to/client \
--version 2026.04.14-1 \
--out /tmp/release/manifest.json \
[--previous 2026.04.13-3] \
[--notes release-notes.md] \
[--launcher Metin2Launcher.exe] \
[--created-at 2026-04-14T12:00:00Z]
```
Override the wrapped script path via the `METIN_RELEASE_MAKE_MANIFEST` env var.
## `release sign`
Wraps `sign-manifest.py`. Requires an absolute path to a chmod-600 raw 32-byte Ed25519 key.
```
metin-release release sign \
--manifest /tmp/release/manifest.json \
--key /home/you/.config/metin/launcher-signing-key \
[--out /tmp/release/manifest.json.sig]
```
Override the wrapped script path via `METIN_RELEASE_SIGN_MANIFEST`.
## `release diff-remote`
HEADs every unique blob hash from a manifest against `<base-url>/files/<hh>/<hash>`.
```
metin-release release diff-remote \
--manifest /tmp/release/manifest.json \
--base-url https://updates.example.com
```
## `release upload-blobs`
rsyncs the release directory (excluding `manifest.json` and `.sig`) to a target.
```
metin-release release upload-blobs \
--release-dir /tmp/release \
--rsync-target user@host:/var/www/updates/ \
[--dry-run] [--yes]
```
## `release promote`
Pushes only `manifest.json` + `manifest.json.sig` to the target top-level — makes the new release live.
```
metin-release release promote \
--release-dir /tmp/release \
--rsync-target user@host:/var/www/updates/ \
[--dry-run] [--yes]
```
## `release verify-public`
GETs `manifest.json` + `manifest.json.sig` from a public URL and verifies the Ed25519 signature. Optionally spot-checks random blobs with `--sample-blobs N`.
```
metin-release release verify-public \
--base-url https://updates.example.com \
--public-key <hex-or-path-to-hex-file> \
[--sample-blobs 5]
```
## `release publish`
Composite: build-manifest → sign → stage blob tree → upload-blobs → promote → verify-public. Short-circuits on the first failure; the JSON envelope includes a `stages` array with `{name, status, duration_ms, error?}` per step.
```
metin-release release publish \
--source /path/to/client \
--version 2026.04.14-1 \
--out /tmp/release \
--key /home/you/.config/metin/launcher-signing-key \
--rsync-target user@host:/var/www/updates/ \
--base-url https://updates.example.com \
--public-key <hex-or-path-to-hex-file> \
[--previous ...] [--notes ...] [--launcher ...] \
[--created-at ...] [--sample-blobs N] \
[--yes] [--force] [--dry-run-upload]
```
## m2pack commands
Phase 4 subcommands wrap the `m2pack-secure` binary and translate its
JSON envelopes into the standard metin-release result envelope. The
binary is **not** shipped with this CLI — build it from
[`metin-server/m2pack-secure`](https://gitea.jakubkadlec.dev/metin-server/m2pack-secure)
and either put it on `PATH` or point at it via the `M2PACK_BINARY`
environment variable.
All m2pack commands pass through `--json` to the real tool, so the
raw m2pack envelope is always available under `data.m2pack`. When
m2pack exits non-zero or emits non-JSON output the wrapper raises a
subprocess error with `m2pack_failed` / `m2pack_invalid_json` /
`m2pack_empty_output` error codes.
### `m2pack build`
Build a signed `.m2p` archive from a client asset directory.
```
metin-release m2pack build \
--input /path/to/client-assets \
--output /path/to/out.m2p \
--key /path/to/content.key \
--sign-secret-key /path/to/signing.sk \
[--key-id N]
```
### `m2pack verify`
Verify an `.m2p` archive's signature (and optionally full-decrypt it).
```
metin-release m2pack verify \
--archive /path/to/a.m2p \
[--public-key /path/to/signing.pub] \
[--key /path/to/content.key]
```
Passing `--key` enables full-decrypt verification; omitting it only
checks manifest structure and signature.
### `m2pack diff`
Diff two directories and/or `.m2p` archives. Either side can be a
directory or an archive; m2pack figures it out.
```
metin-release m2pack diff --left /old --right /new.m2p
```
The wrapper promotes m2pack's added/removed/changed/unchanged counts
into `data.stats` when available.
### `m2pack export-runtime-key`
Export a launcher runtime-key payload (json or raw blob) from a master
content key + signing public key. Used to seed the launcher's bundled
runtime-key file during release workflows.
```
metin-release m2pack export-runtime-key \
--key /path/to/content.key \
--public-key /path/to/signing.pub \
--output /path/to/runtime-key.json \
[--key-id N] [--format json|blob]
```
See `docs/key-rotation.md` in `m2pack-secure` for when to re-export
runtime keys.

120
docs/mcp.md Normal file
View File

@@ -0,0 +1,120 @@
# metin-release-mcp
Thin [Model Context Protocol](https://modelcontextprotocol.io/) server that
wraps the Phase 1 `metin-release` CLI. Each `release …` subcommand is exposed
as an MCP tool. The server contains no release business logic: it shells out
to the real CLI with `--json` and returns the parsed envelope verbatim.
## Install
The server ships as an optional extra alongside the main CLI:
```
pip install -e '.[mcp]'
```
This installs the `mcp` Python SDK and adds a `metin-release-mcp` console
script plus a `python -m metin_release_mcp` module entry.
## Running
The server speaks MCP over stdio, so you wire it into an MCP-capable client
(Claude Desktop, Claude Code, etc.) as a stdio command. Example client entry:
```json
{
"mcpServers": {
"metin-release": {
"command": "metin-release-mcp",
"env": {
"METIN_RELEASE_BINARY": "/usr/local/bin/metin-release"
}
}
}
}
```
You can also poke at it directly:
- `metin-release-mcp --help` — list registered tools
- `metin-release-mcp --list-tools` — dump the full tool JSON schemas
- `metin-release-mcp --version`
## Tools
| Tool | CLI subcommand |
|---|---|
| `release_inspect` | `metin-release release inspect` |
| `release_build_manifest` | `metin-release release build-manifest` |
| `release_sign` | `metin-release release sign` |
| `release_diff_remote` | `metin-release release diff-remote` |
| `release_upload_blobs` | `metin-release release upload-blobs` |
| `release_promote` | `metin-release release promote` |
| `release_verify_public` | `metin-release release verify-public` |
| `release_publish` | `metin-release release publish` |
Tool input keys match CLI flag names with `_` instead of `-`
(`--base-url``base_url`, `--dry-run``dry_run`). Boolean fields
correspond to argparse `store_true` flags: pass `true` to set the flag,
omit or pass `false` to leave it off.
### Example invocation
```json
{
"name": "release_inspect",
"arguments": {
"source": "/srv/metin/client"
}
}
```
The server runs `metin-release release inspect --json --source /srv/metin/client`
and returns the full JSON envelope:
```json
{
"ok": true,
"command": "release inspect",
"status": "inspected",
"stats": {
"source_path": "/srv/metin/client",
"file_count": 9166,
"total_bytes": 3523473920,
"launcher_present": true,
"main_exe_present": true
}
}
```
## CLI resolution
On every tool call the server resolves the `metin-release` binary in this order:
1. `METIN_RELEASE_BINARY` environment variable, if set and non-empty
2. `shutil.which("metin-release")` against `PATH`
If neither resolves, the tool call returns a wrapper-level error envelope
(`error.code = "cli_not_found"`).
## Error handling
The server never invents its own release-level errors. There are three paths:
- **Success** — CLI exits 0 with a valid JSON envelope → envelope returned as-is
- **CLI-level failure** — CLI exits non-zero with an `{"ok": false, "error": …}`
envelope → that envelope is returned as-is, plus the CLI's stderr is
attached as a diagnostic text block
- **Wrapper failure** — binary missing, unparseable stdout, unknown tool,
invalid input → synthetic envelope with one of
`cli_not_found`, `cli_unparseable_output`, `unknown_tool`,
`invalid_tool_input`
## Environment variables
| Variable | Purpose |
|---|---|
| `METIN_RELEASE_BINARY` | Override the `metin-release` binary path. |
Any env vars the wrapped CLI honours (`METIN_RELEASE_MAKE_MANIFEST`,
`METIN_RELEASE_SIGN_MANIFEST`) are inherited by the subprocess unchanged.

46
pyproject.toml Normal file
View File

@@ -0,0 +1,46 @@
[build-system]
requires = ["setuptools>=61"]
build-backend = "setuptools.build_meta"
[project]
name = "metin-release-cli"
version = "0.1.0"
description = "Orchestration CLI for Metin2 client releases"
readme = "README.md"
license = { file = "LICENSE" }
authors = [
{ name = "Jan Nedbal", email = "jan.nedbal@apertia.cz" },
]
requires-python = ">=3.11"
dependencies = [
"cryptography>=41",
"requests>=2.31",
]
[project.optional-dependencies]
dev = [
"pytest>=8",
"pytest-mock>=3",
]
mcp = [
"mcp>=1.0",
]
[project.scripts]
metin-release = "metin_release.cli:main"
metin-release-mcp = "metin_release_mcp.server:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-q"
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "B", "UP"]
ignore = ["E501"]

View File

@@ -0,0 +1,15 @@
"""metin-release: orchestration CLI for Metin2 client releases."""
from __future__ import annotations
try:
from importlib.metadata import PackageNotFoundError, version as _pkg_version
except ImportError: # pragma: no cover
from importlib_metadata import PackageNotFoundError, version as _pkg_version # type: ignore
try:
__version__ = _pkg_version("metin-release-cli")
except PackageNotFoundError: # pragma: no cover - running from source tree
__version__ = "0.0.0+local"
__all__ = ["__version__"]

View File

@@ -0,0 +1,6 @@
from __future__ import annotations
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

155
src/metin_release/cli.py Normal file
View File

@@ -0,0 +1,155 @@
"""Argparse dispatcher for metin-release."""
from __future__ import annotations
import argparse
import sys
from typing import Callable
from . import __version__
from .commands import (
build_manifest,
diff_remote,
inspect,
m2pack_build,
m2pack_diff,
m2pack_export_runtime_key,
m2pack_verify,
promote,
publish,
sign,
upload_blobs,
verify_public,
)
from .errors import ReleaseError
from .log import configure_logging, get_logger
from .result import Result, write_result
from .workspace import Context
CommandFn = Callable[[Context, argparse.Namespace], Result]
def _add_common_flags(p: argparse.ArgumentParser) -> None:
"""Attach --json / -v / -q to a parser.
These flags are accepted both at the top level and on each subcommand so
that users can write either `metin-release --json release inspect ...` or
the more common `metin-release release inspect ... --json`. The shared
`dest` means whichever occurrence argparse sees last wins, which is the
intuitive "last flag on the command line" behaviour.
"""
p.add_argument("--json", action="store_true", help="Emit only JSON on stdout.")
p.add_argument("-v", "--verbose", action="store_true", help="Verbose stderr logging.")
p.add_argument("-q", "--quiet", action="store_true", help="Suppress stderr logging.")
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="metin-release",
description="Orchestration CLI for Metin2 client releases.",
)
parser.add_argument("--version", action="version", version=f"metin-release {__version__}")
_add_common_flags(parser)
sub = parser.add_subparsers(dest="group", metavar="<group>")
sub.required = True
release = sub.add_parser("release", help="Asset release commands.")
rsub = release.add_subparsers(dest="cmd", metavar="<command>")
rsub.required = True
for mod in (
inspect,
build_manifest,
sign,
diff_remote,
upload_blobs,
promote,
verify_public,
publish,
):
sp = mod.add_parser(rsub)
_add_common_flags(sp)
m2pack = sub.add_parser("m2pack", help="m2pack-secure archive commands.")
msub = m2pack.add_subparsers(dest="cmd", metavar="<command>")
msub.required = True
for mod in (
m2pack_build,
m2pack_verify,
m2pack_diff,
m2pack_export_runtime_key,
):
sp = mod.add_parser(msub)
_add_common_flags(sp)
return parser
_COMMAND_MAP: dict[tuple[str, str], tuple[str, CommandFn]] = {
("release", "inspect"): ("release inspect", inspect.run),
("release", "build-manifest"): ("release build-manifest", build_manifest.run),
("release", "sign"): ("release sign", sign.run),
("release", "diff-remote"): ("release diff-remote", diff_remote.run),
("release", "upload-blobs"): ("release upload-blobs", upload_blobs.run),
("release", "promote"): ("release promote", promote.run),
("release", "verify-public"): ("release verify-public", verify_public.run),
("release", "publish"): ("release publish", publish.run),
("m2pack", "build"): ("m2pack build", m2pack_build.run),
("m2pack", "verify"): ("m2pack verify", m2pack_verify.run),
("m2pack", "diff"): ("m2pack diff", m2pack_diff.run),
("m2pack", "export-runtime-key"): (
"m2pack export-runtime-key",
m2pack_export_runtime_key.run,
),
}
def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
configure_logging(verbose=args.verbose, quiet=args.quiet)
logger = get_logger()
ctx = Context(json_mode=args.json, verbose=args.verbose, quiet=args.quiet)
key = (args.group, args.cmd)
if key not in _COMMAND_MAP:
parser.error(f"unknown command: {args.group} {args.cmd}")
return 1 # unreachable; parser.error exits 2
command_name, fn = _COMMAND_MAP[key]
try:
result = fn(ctx, args)
except ReleaseError as exc:
logger.error("%s: %s", exc.error_code, exc.message)
result = Result(
command=command_name,
ok=False,
status="failed",
version=getattr(args, "version", None) if hasattr(args, "version") else None,
error_code=exc.error_code,
error_message=exc.message,
)
write_result(result, json_mode=ctx.json_mode)
return exc.exit_code
except KeyboardInterrupt:
print("interrupted", file=sys.stderr)
return 130
write_result(
result,
json_mode=ctx.json_mode,
human_summary=_human_summary(result),
)
return 0
def _human_summary(result: Result) -> str:
parts = [f"[{result.command}] {result.status}"]
if result.version:
parts.append(f"version={result.version}")
return " ".join(parts)

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from . import (
build_manifest,
diff_remote,
inspect,
m2pack_build,
m2pack_diff,
m2pack_export_runtime_key,
m2pack_verify,
promote,
publish,
sign,
upload_blobs,
verify_public,
)
__all__ = [
"build_manifest",
"diff_remote",
"inspect",
"m2pack_build",
"m2pack_diff",
"m2pack_export_runtime_key",
"m2pack_verify",
"promote",
"publish",
"sign",
"upload_blobs",
"verify_public",
]

View File

@@ -0,0 +1,78 @@
"""Shared helper to invoke the real m2pack CLI and translate its JSON.
m2pack-secure emits its own JSON envelopes with ``--json``. Their shape is
not identical to our :class:`~metin_release.result.Result` envelope, so we
wrap the raw dict under ``data["m2pack"]`` and promote a few well-known
fields (artifact paths, counts) where it makes sense per command.
If m2pack exits non-zero or prints non-JSON, we raise
:class:`~metin_release.errors.SubprocessError` so the CLI exits with code
1 and a readable error envelope.
"""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import Any
from ..errors import SubprocessError
from ..log import get_logger
from ..m2pack_binary import resolve_m2pack_binary
def run_m2pack(subcommand: str, argv: list[str]) -> dict[str, Any]:
"""Invoke ``m2pack <subcommand> [argv...] --json`` and return parsed JSON.
Raises :class:`SubprocessError` (exit code 1) on any failure: binary
missing, non-zero exit, empty stdout, or non-JSON stdout. Missing
binary is handled inside :func:`resolve_m2pack_binary` by raising a
:class:`~metin_release.errors.ValidationError` which the dispatcher
already converts into the standard error envelope.
"""
log = get_logger()
binary: Path = resolve_m2pack_binary()
cmd = [str(binary), subcommand, *argv, "--json"]
log.debug("running m2pack: %s", " ".join(cmd))
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, check=False
)
except OSError as exc:
raise SubprocessError(
f"failed to spawn m2pack: {exc}",
error_code="m2pack_spawn_failed",
) from exc
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
if proc.returncode != 0:
detail = stderr or stdout or f"exit code {proc.returncode}"
raise SubprocessError(
f"m2pack {subcommand} failed (rc={proc.returncode}): {detail}",
error_code="m2pack_failed",
)
if not stdout:
raise SubprocessError(
f"m2pack {subcommand} produced no stdout; stderr={stderr!r}",
error_code="m2pack_empty_output",
)
try:
parsed = json.loads(stdout)
except json.JSONDecodeError as exc:
raise SubprocessError(
f"m2pack {subcommand} returned non-JSON output: {exc}; "
f"first 200 chars={stdout[:200]!r}",
error_code="m2pack_invalid_json",
) from exc
if not isinstance(parsed, dict):
raise SubprocessError(
f"m2pack {subcommand} JSON was not an object: {type(parsed).__name__}",
error_code="m2pack_invalid_json",
)
return parsed

View File

@@ -0,0 +1,103 @@
"""release build-manifest: wraps make-manifest.py."""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import tempfile
from pathlib import Path
from ..errors import SourceNotFoundError, SubprocessError, ValidationError
from ..log import get_logger
from ..result import Result
from ..workspace import Context, ensure_dir, resolve_source
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("build-manifest", help="Build a signed-ready manifest.json for a source tree.")
p.add_argument("--source", required=True, type=Path, help="Client source root.")
p.add_argument("--version", required=True, help="Release version, e.g. 2026.04.14-1.")
p.add_argument("--out", required=True, type=Path, help="Output manifest.json path.")
p.add_argument("--previous", help="Previous release version, if any.")
p.add_argument("--notes", type=Path, help="Path to a file containing release notes.")
p.add_argument("--launcher", default="Metin2Launcher.exe", help="Launcher filename.")
p.add_argument("--created-at", help="Override created_at (for reproducible test runs).")
return p
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
source = resolve_source(args.source)
if not source.is_dir():
raise SourceNotFoundError(f"source is not a directory: {source}")
script = ctx.make_manifest_script
if not script.is_file():
raise ValidationError(f"make-manifest.py not found at {script} (set METIN_RELEASE_MAKE_MANIFEST)")
out_path = Path(args.out).expanduser().resolve()
ensure_dir(out_path.parent)
cmd = [
sys.executable,
str(script),
"--source", str(source),
"--version", args.version,
"--out", str(out_path),
"--launcher", args.launcher,
]
if args.previous:
cmd += ["--previous", args.previous]
if args.created_at:
cmd += ["--created-at", args.created_at]
notes_text = None
if args.notes:
notes_path = Path(args.notes).expanduser().resolve()
if not notes_path.is_file():
raise ValidationError(f"notes file not found: {notes_path}")
notes_text = notes_path.read_text(encoding="utf-8")
cmd += ["--notes", notes_text]
log.debug("spawn %s", cmd)
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
except FileNotFoundError as exc:
raise SubprocessError(f"cannot spawn make-manifest.py: {exc}") from exc
if proc.returncode != 0:
raise SubprocessError(
f"make-manifest.py exited {proc.returncode}: {proc.stderr.strip() or proc.stdout.strip()}"
)
if proc.stderr.strip():
log.info("make-manifest: %s", proc.stderr.strip())
if not out_path.is_file():
raise SubprocessError(f"manifest was not written to {out_path}")
manifest = json.loads(out_path.read_text(encoding="utf-8"))
files = manifest.get("files", [])
launcher = manifest.get("launcher", {})
total_bytes = launcher.get("size", 0) + sum(int(f.get("size", 0)) for f in files)
unique_blobs = {launcher.get("sha256")} | {f.get("sha256") for f in files}
unique_blobs.discard(None)
return Result(
command="release build-manifest",
version=manifest.get("version"),
status="manifest_built",
data={
"artifacts": {
"manifest_path": str(out_path),
},
"stats": {
"file_count": len(files) + 1,
"blob_count": len(unique_blobs),
"total_bytes": total_bytes,
"version": manifest.get("version"),
"created_at": manifest.get("created_at"),
},
},
)

View File

@@ -0,0 +1,83 @@
"""release diff-remote: HEAD every blob hash against a base URL."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import requests
from ..errors import NetworkError, ValidationError
from ..log import get_logger
from ..result import Result
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("diff-remote", help="Check which manifest blobs are missing on the remote.")
p.add_argument("--manifest", required=True, type=Path, help="Path to manifest.json.")
p.add_argument("--base-url", required=True, help="Remote base URL, e.g. https://updates.example/")
p.add_argument("--timeout", type=float, default=10.0, help="Per-request timeout in seconds.")
return p
def _blob_hashes(manifest: dict) -> list[str]:
hashes: set[str] = set()
launcher = manifest.get("launcher") or {}
if launcher.get("sha256"):
hashes.add(launcher["sha256"])
for f in manifest.get("files", []):
if f.get("sha256"):
hashes.add(f["sha256"])
return sorted(hashes)
def _check_blob(session: requests.Session, base_url: str, h: str, timeout: float) -> bool:
url = f"{base_url.rstrip('/')}/files/{h[:2]}/{h}"
try:
r = session.head(url, timeout=timeout, allow_redirects=True)
except requests.RequestException as exc:
raise NetworkError(f"HEAD {url} failed: {exc}") from exc
if r.status_code == 200:
return True
if r.status_code == 404:
return False
raise NetworkError(f"HEAD {url} returned unexpected status {r.status_code}")
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
manifest_path = Path(args.manifest).expanduser().resolve()
if not manifest_path.is_file():
raise ValidationError(f"manifest not found: {manifest_path}")
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
hashes = _blob_hashes(manifest)
missing: list[str] = []
with requests.Session() as session:
for h in hashes:
if not _check_blob(session, args.base_url, h, args.timeout):
missing.append(h)
log.info(
"diff-remote: %d blobs total, %d missing on %s",
len(hashes),
len(missing),
args.base_url,
)
return Result(
command="release diff-remote",
version=manifest.get("version"),
status="diffed",
data={
"remote": {
"base_url": args.base_url,
},
"stats": {
"manifest_blob_count": len(hashes),
"missing_blob_count": len(missing),
"missing_blobs": missing,
},
},
)

View File

@@ -0,0 +1,78 @@
"""release inspect: scan a source root, count files, detect launcher + main exe."""
from __future__ import annotations
import argparse
from pathlib import Path
from ..errors import SourceNotFoundError
from ..log import get_logger
from ..result import Result
from ..workspace import Context, resolve_source
EXCLUDE_DIRS = {".git", ".vs", ".updates", "log", "__pycache__"}
EXCLUDE_FILES = {".gitignore", ".gitattributes", "desktop.ini", "Thumbs.db", ".DS_Store"}
EXCLUDE_SUFFIXES = {".pdb", ".ilk", ".old", ".log", ".dxvk-cache", ".swp", ".tmp"}
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("inspect", help="Scan a client source root and report stats.")
p.add_argument("--source", required=True, type=Path, help="Client root directory.")
return p
def _should_skip(rel: Path) -> bool:
for part in rel.parts:
if part in EXCLUDE_DIRS:
return True
if rel.name in EXCLUDE_FILES:
return True
if rel.suffix in EXCLUDE_SUFFIXES:
return True
return False
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
source = resolve_source(args.source)
if not source.is_dir():
raise SourceNotFoundError(f"source is not a directory: {source}")
file_count = 0
total_bytes = 0
launcher_present = False
main_exe_present = False
for path in source.rglob("*"):
if not path.is_file():
continue
rel = path.relative_to(source)
if _should_skip(rel):
continue
file_count += 1
try:
total_bytes += path.stat().st_size
except OSError:
log.warning("failed to stat %s", rel)
name = rel.as_posix()
if name == "Metin2Launcher.exe":
launcher_present = True
elif name == "Metin2.exe":
main_exe_present = True
log.info("inspected %s: %d files, %d bytes", source, file_count, total_bytes)
return Result(
command="release inspect",
status="inspected",
data={
"stats": {
"source_path": str(source),
"file_count": file_count,
"total_bytes": total_bytes,
"launcher_present": launcher_present,
"main_exe_present": main_exe_present,
}
},
)

View File

@@ -0,0 +1,47 @@
"""m2pack build: wrap ``m2pack build`` to produce a signed .m2p archive."""
from __future__ import annotations
import argparse
from pathlib import Path
from ..result import Result
from ._m2pack_runner import run_m2pack
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("build", help="Build a signed .m2p archive from a source directory.")
p.add_argument("--input", required=True, type=Path, help="Client asset source directory.")
p.add_argument("--output", required=True, type=Path, help="Output .m2p archive path.")
p.add_argument("--key", required=True, type=Path, help="Master content key file.")
p.add_argument(
"--sign-secret-key",
required=True,
type=Path,
help="Ed25519 signing secret key file.",
)
p.add_argument("--key-id", type=int, help="Content key id (default 1).")
return p
def run(ctx, args: argparse.Namespace) -> Result:
argv = [
"--input", str(args.input),
"--output", str(args.output),
"--key", str(args.key),
"--sign-secret-key", str(args.sign_secret_key),
]
if args.key_id is not None:
argv.extend(["--key-id", str(args.key_id)])
raw = run_m2pack("build", argv)
status = "built" if raw.get("ok", True) else "failed"
return Result(
command="m2pack build",
ok=bool(raw.get("ok", True)),
status=status,
data={
"artifacts": {"archive_path": str(args.output)},
"m2pack": raw,
},
)

View File

@@ -0,0 +1,45 @@
"""m2pack diff: wrap ``m2pack diff`` to compare directories and/or .m2p archives."""
from __future__ import annotations
import argparse
from pathlib import Path
from ..result import Result
from ._m2pack_runner import run_m2pack
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser(
"diff",
help="Diff two directories and/or .m2p archives (left vs right).",
)
p.add_argument("--left", required=True, type=Path, help="Left side: directory or .m2p archive.")
p.add_argument("--right", required=True, type=Path, help="Right side: directory or .m2p archive.")
return p
def run(ctx, args: argparse.Namespace) -> Result:
argv = ["--left", str(args.left), "--right", str(args.right)]
raw = run_m2pack("diff", argv)
# Best-effort promotion of the diff counts m2pack reports. Fall back
# gracefully when the key is missing or the shape differs.
stats: dict[str, object] = {}
for key in ("added", "removed", "changed", "unchanged"):
value = raw.get(key)
if isinstance(value, int):
stats[f"{key}_count"] = value
elif isinstance(value, list):
stats[f"{key}_count"] = len(value)
ok = bool(raw.get("ok", True))
return Result(
command="m2pack diff",
ok=ok,
status="diffed" if ok else "failed",
data={
"stats": stats,
"m2pack": raw,
},
)

View File

@@ -0,0 +1,57 @@
"""m2pack export-runtime-key: wrap ``m2pack export-runtime-key``.
Emits a launcher runtime-key payload in either ``json`` or ``blob`` form
from the master content key + signing public key. The plan originally
described this as ``--pack --master-key --out``; the real m2pack CLI
uses ``--key --public-key --key-id --format --output``, so we follow the
real tool.
"""
from __future__ import annotations
import argparse
from pathlib import Path
from ..result import Result
from ._m2pack_runner import run_m2pack
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser(
"export-runtime-key",
help="Export a launcher runtime-key payload (json or blob form).",
)
p.add_argument("--key", required=True, type=Path, help="Master content key file.")
p.add_argument("--public-key", required=True, type=Path, help="Ed25519 public key file.")
p.add_argument("--output", required=True, type=Path, help="Output payload path.")
p.add_argument("--key-id", type=int, help="Content key id (default 1).")
p.add_argument(
"--format",
choices=("json", "blob"),
help="Payload format (default json).",
)
return p
def run(ctx, args: argparse.Namespace) -> Result:
argv = [
"--key", str(args.key),
"--public-key", str(args.public_key),
"--output", str(args.output),
]
if args.key_id is not None:
argv.extend(["--key-id", str(args.key_id)])
if args.format is not None:
argv.extend(["--format", args.format])
raw = run_m2pack("export-runtime-key", argv)
ok = bool(raw.get("ok", True))
return Result(
command="m2pack export-runtime-key",
ok=ok,
status="exported" if ok else "failed",
data={
"artifacts": {"runtime_key_path": str(args.output)},
"m2pack": raw,
},
)

View File

@@ -0,0 +1,45 @@
"""m2pack verify: wrap ``m2pack verify`` to validate manifest + optional decrypt."""
from __future__ import annotations
import argparse
from pathlib import Path
from ..result import Result
from ._m2pack_runner import run_m2pack
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("verify", help="Verify an .m2p archive (manifest + signature).")
p.add_argument("--archive", required=True, type=Path, help="Path to .m2p archive.")
p.add_argument(
"--public-key",
type=Path,
help="Ed25519 public key file (for signature verification).",
)
p.add_argument(
"--key",
type=Path,
help="Master content key file (enables full-decrypt verification).",
)
return p
def run(ctx, args: argparse.Namespace) -> Result:
argv = ["--archive", str(args.archive)]
if args.public_key is not None:
argv.extend(["--public-key", str(args.public_key)])
if args.key is not None:
argv.extend(["--key", str(args.key)])
raw = run_m2pack("verify", argv)
ok = bool(raw.get("ok", True))
return Result(
command="m2pack verify",
ok=ok,
status="verified" if ok else "failed",
data={
"artifacts": {"archive_path": str(args.archive)},
"m2pack": raw,
},
)

View File

@@ -0,0 +1,52 @@
"""release promote: upload manifest.json + manifest.json.sig (small, fast)."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from ..errors import ValidationError
from ..log import get_logger
from ..result import Result
from ..storage import rsync as rsync_backend
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("promote", help="Promote a staged release by pushing manifest + signature.")
p.add_argument("--release-dir", required=True, type=Path, help="Local release directory containing manifest.json(.sig).")
p.add_argument("--rsync-target", required=True, help="rsync destination top-level.")
p.add_argument("--yes", action="store_true", help="Skip interactive confirmation.")
p.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run.")
return p
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
release_dir = Path(args.release_dir).expanduser().resolve()
manifest_path = release_dir / "manifest.json"
if not manifest_path.is_file():
raise ValidationError(f"manifest.json missing in {release_dir}")
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
version = manifest.get("version")
rr = rsync_backend.push_manifest(release_dir, args.rsync_target, dry_run=args.dry_run)
log.info("promote: version=%s rsync exit=%d", version, rr.returncode)
return Result(
command="release promote",
version=version,
status="promoted" if not args.dry_run else "dry_run",
data={
"remote": {
"rsync_target": args.rsync_target,
"dry_run": args.dry_run,
},
"stats": {
"release_dir": str(release_dir),
"version": version,
},
},
)

View File

@@ -0,0 +1,186 @@
"""release publish: composite of build-manifest -> sign -> upload-blobs -> promote -> verify-public."""
from __future__ import annotations
import argparse
import json
import shutil
import subprocess
import sys
import time
from pathlib import Path
from typing import Callable
from ..errors import ReleaseError, ValidationError
from ..log import get_logger
from ..result import Result
from ..workspace import Context, ensure_dir, resolve_source
from . import build_manifest, promote, sign, upload_blobs, verify_public
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("publish", help="End-to-end: build, sign, upload, promote, verify.")
p.add_argument("--source", required=True, type=Path, help="Client source root.")
p.add_argument("--version", required=True, help="Release version.")
p.add_argument("--out", required=True, type=Path, help="Release output directory.")
p.add_argument("--key", required=True, type=Path, help="Signing key path (mode 600).")
p.add_argument("--rsync-target", required=True, help="rsync target for blobs + manifest.")
p.add_argument("--base-url", required=True, help="Public base URL for verification.")
p.add_argument("--public-key", required=True, help="Public key (hex or file).")
p.add_argument("--previous", help="Previous release version.")
p.add_argument("--notes", type=Path, help="Release notes file.")
p.add_argument("--launcher", default="Metin2Launcher.exe")
p.add_argument("--created-at", help="Override manifest created_at.")
p.add_argument("--sample-blobs", type=int, default=0)
p.add_argument("--yes", action="store_true")
p.add_argument("--force", action="store_true", help="Allow non-empty --out directory.")
p.add_argument("--dry-run-upload", action="store_true", help="rsync --dry-run for upload and promote.")
return p
def _build_blob_tree(source: Path, manifest: dict, out_dir: Path) -> dict:
files_dir = ensure_dir(out_dir / "files")
entries = []
launcher = manifest.get("launcher")
if launcher:
entries.append(launcher)
entries.extend(manifest.get("files", []))
seen: set[str] = set()
bytes_written = 0
for entry in entries:
h = entry["sha256"]
rel = entry["path"]
src = source / rel
if not src.is_file():
raise ValidationError(f"file in manifest missing from source: {rel}")
if h in seen:
continue
seen.add(h)
dst_dir = files_dir / h[:2]
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / h
if dst.exists():
continue
try:
dst.hardlink_to(src)
except OSError:
shutil.copy2(src, dst)
bytes_written += dst.stat().st_size
return {"unique_blobs": len(seen), "bytes_written": bytes_written}
def _run_stage(name: str, fn: Callable[[], Result], stages: list[dict]) -> Result:
start = time.monotonic()
try:
result = fn()
except ReleaseError as exc:
stages.append(
{
"name": name,
"status": "failed",
"duration_ms": int((time.monotonic() - start) * 1000),
"error": {"code": exc.error_code, "message": exc.message},
}
)
raise
stages.append(
{
"name": name,
"status": result.status,
"duration_ms": int((time.monotonic() - start) * 1000),
}
)
return result
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
source = resolve_source(args.source)
out_dir = Path(args.out).expanduser().resolve()
ensure_dir(out_dir)
if any(out_dir.iterdir()) and not args.force:
raise ValidationError(f"output directory {out_dir} is non-empty (use --force to overwrite)")
stages: list[dict] = []
manifest_path = out_dir / "manifest.json"
# Stage 1: build manifest
bm_args = argparse.Namespace(
source=source,
version=args.version,
out=manifest_path,
previous=args.previous,
notes=args.notes,
launcher=args.launcher,
created_at=args.created_at,
)
bm_result = _run_stage("build-manifest", lambda: build_manifest.run(ctx, bm_args), stages)
# Stage 2: sign
sig_path = out_dir / "manifest.json.sig"
sn_args = argparse.Namespace(manifest=manifest_path, key=args.key, out=sig_path)
_run_stage("sign", lambda: sign.run(ctx, sn_args), stages)
# Stage 2.5: build blob tree (inline step, not its own public subcommand)
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
def _blob_stage() -> Result:
stats = _build_blob_tree(source, manifest, out_dir)
return Result(command="release publish", status="blobs_staged", data={"stats": stats})
_run_stage("stage-blobs", _blob_stage, stages)
# Archive historical manifest
manifests_dir = ensure_dir(out_dir / "manifests")
(manifests_dir / f"{args.version}.json").write_bytes(manifest_path.read_bytes())
(manifests_dir / f"{args.version}.json.sig").write_bytes(sig_path.read_bytes())
# Stage 3: upload blobs
ub_args = argparse.Namespace(
release_dir=out_dir,
rsync_target=args.rsync_target,
dry_run=args.dry_run_upload,
yes=args.yes,
)
_run_stage("upload-blobs", lambda: upload_blobs.run(ctx, ub_args), stages)
# Stage 4: promote
pr_args = argparse.Namespace(
release_dir=out_dir,
rsync_target=args.rsync_target,
yes=args.yes,
dry_run=args.dry_run_upload,
)
_run_stage("promote", lambda: promote.run(ctx, pr_args), stages)
# Stage 5: verify-public
vp_args = argparse.Namespace(
base_url=args.base_url,
public_key=args.public_key,
sample_blobs=args.sample_blobs,
timeout=15.0,
)
vp_result = _run_stage("verify-public", lambda: verify_public.run(ctx, vp_args), stages)
log.info("publish: version=%s stages=%d", args.version, len(stages))
return Result(
command="release publish",
version=args.version,
status="published",
data={
"artifacts": {
"release_dir": str(out_dir),
"manifest_path": str(manifest_path),
"signature_path": str(sig_path),
},
"stats": {
**bm_result.data.get("stats", {}),
"verify": vp_result.data.get("stats", {}),
},
"stages": stages,
},
)

View File

@@ -0,0 +1,104 @@
"""release sign: wraps sign-manifest.py with mode-600 enforcement."""
from __future__ import annotations
import argparse
import os
import stat
import subprocess
import sys
from pathlib import Path
from ..errors import (
IntegrityError,
KeyPermissionError,
SubprocessError,
ValidationError,
)
from ..log import get_logger
from ..result import Result
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("sign", help="Sign a manifest.json with an Ed25519 key.")
p.add_argument("--manifest", required=True, type=Path, help="Path to manifest.json.")
p.add_argument("--key", required=True, type=Path, help="Absolute path to raw 32-byte private key.")
p.add_argument("--out", type=Path, help="Signature output path (default: <manifest>.sig).")
return p
def _enforce_key_mode(key_path: Path) -> None:
if not key_path.is_absolute():
raise ValidationError(f"--key must be an absolute path, got {key_path}")
if not key_path.is_file():
raise ValidationError(f"signing key not found: {key_path}")
mode = stat.S_IMODE(key_path.stat().st_mode)
if mode != 0o600:
raise KeyPermissionError(
f"signing key {key_path} must be mode 600, got {oct(mode)}"
)
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
manifest_path = Path(args.manifest).expanduser().resolve()
if not manifest_path.is_file():
raise ValidationError(f"manifest not found: {manifest_path}")
key_path = Path(args.key).expanduser()
_enforce_key_mode(key_path)
script = ctx.sign_manifest_script
if not script.is_file():
raise ValidationError(
f"sign-manifest.py not found at {script} (set METIN_RELEASE_SIGN_MANIFEST)"
)
out_path = (
Path(args.out).expanduser().resolve()
if args.out
else manifest_path.with_suffix(manifest_path.suffix + ".sig")
)
cmd = [
sys.executable,
str(script),
"--manifest", str(manifest_path),
"--key", str(key_path),
"--out", str(out_path),
]
log.debug("spawn %s", cmd)
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
except FileNotFoundError as exc:
raise SubprocessError(f"cannot spawn sign-manifest.py: {exc}") from exc
if proc.returncode != 0:
raise SubprocessError(
f"sign-manifest.py exited {proc.returncode}: {proc.stderr.strip() or proc.stdout.strip()}"
)
if proc.stderr.strip():
log.info("sign-manifest: %s", proc.stderr.strip())
if not out_path.is_file():
raise IntegrityError(f"signature was not written to {out_path}")
sig_bytes = out_path.read_bytes()
if len(sig_bytes) != 64:
raise IntegrityError(
f"signature at {out_path} is {len(sig_bytes)} bytes, expected 64"
)
return Result(
command="release sign",
status="signed",
data={
"artifacts": {
"manifest_path": str(manifest_path),
"signature_path": str(out_path),
},
"stats": {
"signature_bytes": len(sig_bytes),
},
},
)

View File

@@ -0,0 +1,46 @@
"""release upload-blobs: rsync the blob tree (excluding manifest) to target."""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from ..log import get_logger
from ..result import Result
from ..storage import rsync as rsync_backend
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("upload-blobs", help="Upload release directory (except manifest) via rsync.")
p.add_argument("--release-dir", required=True, type=Path, help="Local release output directory.")
p.add_argument("--rsync-target", required=True, help="rsync destination (path or user@host:/path).")
p.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run.")
p.add_argument("--yes", action="store_true", help="Skip interactive confirmation.")
return p
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
release_dir = Path(args.release_dir).expanduser().resolve()
if not args.yes and not args.dry_run and sys.stdin.isatty():
log.warning("about to rsync %s -> %s (use --yes to skip prompt)", release_dir, args.rsync_target)
rr = rsync_backend.push_blobs(release_dir, args.rsync_target, dry_run=args.dry_run)
log.info("upload-blobs: rsync exit=%d", rr.returncode)
return Result(
command="release upload-blobs",
status="uploaded" if not args.dry_run else "dry_run",
data={
"remote": {
"rsync_target": args.rsync_target,
"dry_run": args.dry_run,
},
"stats": {
"release_dir": str(release_dir),
},
},
)

View File

@@ -0,0 +1,126 @@
"""release verify-public: download manifest + sig and verify Ed25519 signature."""
from __future__ import annotations
import argparse
import json
import random
from pathlib import Path
import requests
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from ..errors import NetworkError, SignatureError, ValidationError
from ..hashing import sha256_bytes
from ..log import get_logger
from ..result import Result
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("verify-public", help="GET manifest.json from base URL and verify signature.")
p.add_argument("--base-url", required=True, help="Remote base URL.")
p.add_argument(
"--public-key",
required=True,
help="Ed25519 public key: hex string or path to a file containing hex.",
)
p.add_argument("--sample-blobs", type=int, default=0, help="GET and hash-check N random blobs.")
p.add_argument("--timeout", type=float, default=15.0)
return p
def _load_public_key(value: str) -> Ed25519PublicKey:
candidate = Path(value).expanduser()
hex_text: str
if candidate.is_file():
hex_text = candidate.read_text(encoding="utf-8").strip()
else:
hex_text = value.strip()
try:
raw = bytes.fromhex(hex_text)
except ValueError as exc:
raise ValidationError(f"public key is not valid hex: {exc}") from exc
if len(raw) != 32:
raise ValidationError(f"public key must be 32 bytes, got {len(raw)}")
return Ed25519PublicKey.from_public_bytes(raw)
def _get(session: requests.Session, url: str, timeout: float) -> bytes:
try:
r = session.get(url, timeout=timeout)
except requests.RequestException as exc:
raise NetworkError(f"GET {url} failed: {exc}") from exc
if r.status_code != 200:
raise NetworkError(f"GET {url} returned {r.status_code}")
return r.content
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
public_key = _load_public_key(args.public_key)
base = args.base_url.rstrip("/")
with requests.Session() as session:
manifest_bytes = _get(session, f"{base}/manifest.json", args.timeout)
sig_bytes = _get(session, f"{base}/manifest.json.sig", args.timeout)
if len(sig_bytes) != 64:
raise SignatureError(f"remote signature is {len(sig_bytes)} bytes, expected 64")
try:
public_key.verify(sig_bytes, manifest_bytes)
except InvalidSignature as exc:
raise SignatureError("manifest signature verification failed") from exc
manifest = json.loads(manifest_bytes.decode("utf-8"))
version = manifest.get("version")
created_at = manifest.get("created_at")
sampled_failures: list[str] = []
sampled_count = 0
if args.sample_blobs > 0:
files = list(manifest.get("files", []))
launcher = manifest.get("launcher")
if launcher:
files.append(launcher)
pool = [f for f in files if f.get("sha256")]
k = min(args.sample_blobs, len(pool))
sample = random.sample(pool, k) if k else []
for entry in sample:
h = entry["sha256"]
url = f"{base}/files/{h[:2]}/{h}"
try:
blob = _get(session, url, args.timeout)
except NetworkError as exc:
log.warning("sample blob fetch failed: %s", exc)
sampled_failures.append(h)
sampled_count += 1
continue
actual = sha256_bytes(blob)
sampled_count += 1
if actual != h:
sampled_failures.append(h)
log.info(
"verify-public: version=%s signature_valid=true sampled=%d failures=%d",
version,
sampled_count,
len(sampled_failures),
)
return Result(
command="release verify-public",
version=version,
status="verified",
data={
"remote": {"base_url": args.base_url},
"stats": {
"manifest_version": version,
"manifest_created_at": created_at,
"signature_valid": True,
"sampled_blob_count": sampled_count,
"sampled_blob_failures": sampled_failures,
},
},
)

View File

@@ -0,0 +1,77 @@
"""Error hierarchy. Each exception carries an exit_code and an error_code.
Exit code contract (from the plan):
0 - success
1 - operator or validation error (bad args, missing source, wrong key mode)
2 - remote or network error (upload / HTTP failure)
3 - signing or integrity error (signature verify fail, hash mismatch)
4 - ERP sync error (reserved, not used in Phase 1)
"""
from __future__ import annotations
class ReleaseError(Exception):
"""Base for all CLI errors."""
exit_code: int = 1
error_code: str = "error"
def __init__(self, message: str, *, error_code: str | None = None):
super().__init__(message)
if error_code is not None:
self.error_code = error_code
@property
def message(self) -> str:
return str(self)
class ValidationError(ReleaseError):
exit_code = 1
error_code = "validation_error"
class SourceNotFoundError(ValidationError):
error_code = "source_not_found"
class KeyPermissionError(ValidationError):
error_code = "key_permission"
class SubprocessError(ReleaseError):
exit_code = 1
error_code = "subprocess_failed"
class RemoteError(ReleaseError):
exit_code = 2
error_code = "remote_error"
class NetworkError(RemoteError):
error_code = "network_error"
class UploadError(RemoteError):
error_code = "upload_failed"
class IntegrityError(ReleaseError):
exit_code = 3
error_code = "integrity_error"
class SignatureError(IntegrityError):
error_code = "signature_invalid"
class HashMismatchError(IntegrityError):
error_code = "hash_mismatch"
class ErpError(ReleaseError): # reserved for Phase 2
exit_code = 4
error_code = "erp_error"

View File

@@ -0,0 +1,18 @@
"""Streaming sha256 helper."""
from __future__ import annotations
import hashlib
from pathlib import Path
def sha256_file(path: Path, *, chunk_size: int = 1 << 20) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
return h.hexdigest()
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()

31
src/metin_release/log.py Normal file
View File

@@ -0,0 +1,31 @@
"""Structured stderr logger. stdout is reserved for result JSON."""
from __future__ import annotations
import logging
import sys
_LOGGER_NAME = "metin_release"
def configure_logging(*, verbose: bool = False, quiet: bool = False) -> logging.Logger:
logger = logging.getLogger(_LOGGER_NAME)
logger.handlers.clear()
if quiet:
level = logging.CRITICAL + 1
elif verbose:
level = logging.DEBUG
else:
level = logging.INFO
handler = logging.StreamHandler(stream=sys.stderr)
handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(handler)
logger.setLevel(level)
logger.propagate = False
return logger
def get_logger() -> logging.Logger:
return logging.getLogger(_LOGGER_NAME)

View File

@@ -0,0 +1,51 @@
"""Resolve the m2pack binary for the m2pack wrapper subcommands.
Resolution order:
1. ``M2PACK_BINARY`` environment variable, if set, non-empty, and points at
an existing file.
2. :func:`shutil.which` on ``m2pack``.
If neither works we raise :class:`~metin_release.errors.ValidationError` so
the CLI exits 1 with a clear, actionable message. Import of this module
must never trigger filesystem access — all discovery is runtime.
"""
from __future__ import annotations
import os
import shutil
from pathlib import Path
from .errors import ValidationError
ENV_VAR = "M2PACK_BINARY"
_NOT_FOUND_HINT = (
"m2pack binary not found. Set the M2PACK_BINARY environment variable to "
"an absolute path, or make `m2pack` available on PATH. Build it from "
"https://gitea.jakubkadlec.dev/metin-server/m2pack-secure (see its "
"CMakeLists.txt)."
)
def resolve_m2pack_binary(env: dict[str, str] | None = None) -> Path:
"""Return the resolved path to the m2pack binary or raise ValidationError."""
environ = env if env is not None else os.environ
override = (environ.get(ENV_VAR) or "").strip()
if override:
candidate = Path(override).expanduser()
if candidate.is_file():
return candidate.resolve()
raise ValidationError(
f"{ENV_VAR}={override!r} does not point at an existing file. "
f"{_NOT_FOUND_HINT}",
error_code="m2pack_not_found",
)
which = shutil.which("m2pack")
if which:
return Path(which).resolve()
raise ValidationError(_NOT_FOUND_HINT, error_code="m2pack_not_found")

View File

@@ -0,0 +1,53 @@
"""Result envelope and JSON/human output writer."""
from __future__ import annotations
import json
import sys
from dataclasses import dataclass, field
from typing import Any
@dataclass
class Result:
"""Structured result for a CLI command."""
command: str
ok: bool = True
status: str = "ok"
version: str | None = None
data: dict[str, Any] = field(default_factory=dict)
error_code: str | None = None
error_message: str | None = None
def to_envelope(self) -> dict[str, Any]:
env: dict[str, Any] = {
"ok": self.ok,
"command": self.command,
}
if self.version is not None:
env["version"] = self.version
env["status"] = self.status
if self.ok:
for k, v in self.data.items():
env[k] = v
else:
env["error"] = {
"code": self.error_code or "error",
"message": self.error_message or "",
}
return env
def write_result(result: Result, *, json_mode: bool, human_summary: str | None = None) -> None:
"""Emit the result.
When json_mode is True: only JSON on stdout.
When json_mode is False: human summary on stderr, JSON on stdout.
"""
envelope = result.to_envelope()
if not json_mode and human_summary:
print(human_summary, file=sys.stderr)
json.dump(envelope, sys.stdout, indent=2, sort_keys=False)
sys.stdout.write("\n")
sys.stdout.flush()

View File

@@ -0,0 +1,5 @@
from __future__ import annotations
from . import rsync
__all__ = ["rsync"]

View File

@@ -0,0 +1,100 @@
"""rsync-based remote storage backend."""
from __future__ import annotations
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from ..errors import UploadError, ValidationError
from ..log import get_logger
COMMON_FLAGS = [
"-av",
"--delay-updates",
"--checksum",
"--omit-dir-times",
"--no-perms",
]
@dataclass
class RsyncResult:
cmd: list[str]
returncode: int
stdout: str
stderr: str
bytes_transferred: int | None = None
def _ensure_rsync() -> str:
path = shutil.which("rsync")
if not path:
raise ValidationError("rsync binary not found in PATH")
return path
def _normalise_source_dir(source_dir: Path) -> str:
s = str(source_dir.resolve())
if not s.endswith("/"):
s += "/"
return s
def push_blobs(
release_dir: Path,
target: str,
*,
dry_run: bool = False,
) -> RsyncResult:
"""Upload everything in release_dir except manifest.json{,.sig}."""
log = get_logger()
if not release_dir.is_dir():
raise ValidationError(f"release dir not found: {release_dir}")
rsync = _ensure_rsync()
cmd = [rsync, *COMMON_FLAGS]
if dry_run:
cmd.append("--dry-run")
cmd += [
"--exclude", "manifest.json",
"--exclude", "manifest.json.sig",
_normalise_source_dir(release_dir),
target,
]
log.debug("rsync blobs: %s", cmd)
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
raise UploadError(
f"rsync blobs failed ({proc.returncode}): {proc.stderr.strip() or proc.stdout.strip()}"
)
return RsyncResult(cmd=cmd, returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)
def push_manifest(
release_dir: Path,
target: str,
*,
dry_run: bool = False,
) -> RsyncResult:
"""Upload only manifest.json and manifest.json.sig."""
log = get_logger()
manifest = release_dir / "manifest.json"
sig = release_dir / "manifest.json.sig"
if not manifest.is_file() or not sig.is_file():
raise ValidationError(
f"manifest or signature missing in {release_dir}: need manifest.json + manifest.json.sig"
)
rsync = _ensure_rsync()
cmd = [rsync, "-av", "--checksum", "--omit-dir-times", "--no-perms"]
if dry_run:
cmd.append("--dry-run")
cmd += [str(manifest), str(sig), target]
log.debug("rsync manifest: %s", cmd)
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
raise UploadError(
f"rsync manifest failed ({proc.returncode}): {proc.stderr.strip() or proc.stdout.strip()}"
)
return RsyncResult(cmd=cmd, returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)

View File

@@ -0,0 +1,43 @@
"""Workspace path helpers: source / out / staging resolution + the Context passed to commands."""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
DEFAULT_MAKE_MANIFEST = Path(
"/home/jann/metin/repos/m2dev-client/scripts/make-manifest.py"
)
DEFAULT_SIGN_MANIFEST = Path(
"/home/jann/metin/repos/m2dev-client/scripts/sign-manifest.py"
)
@dataclass
class Context:
"""Per-invocation command context. No module-level mutable state."""
json_mode: bool = False
verbose: bool = False
quiet: bool = False
make_manifest_script: Path = field(
default_factory=lambda: Path(
os.environ.get("METIN_RELEASE_MAKE_MANIFEST", str(DEFAULT_MAKE_MANIFEST))
)
)
sign_manifest_script: Path = field(
default_factory=lambda: Path(
os.environ.get("METIN_RELEASE_SIGN_MANIFEST", str(DEFAULT_SIGN_MANIFEST))
)
)
def resolve_source(source: str | os.PathLike[str]) -> Path:
return Path(source).expanduser().resolve()
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path

View File

@@ -0,0 +1,9 @@
"""Thin MCP server wrapping the metin-release CLI.
Exposes each Phase 1 ``release …`` subcommand as an MCP tool. The server
does no business logic of its own: it maps tool input dicts to CLI flags,
spawns ``metin-release <subcommand> --json …`` and returns the parsed JSON
envelope as the tool result.
"""
__version__ = "0.1.0"

View File

@@ -0,0 +1,5 @@
"""Module entry: ``python -m metin_release_mcp``."""
from .server import main
raise SystemExit(main())

View File

@@ -0,0 +1,95 @@
"""Spawns the metin-release CLI and parses its JSON envelope.
This module is the *only* place that knows how to talk to the real
CLI. The rest of the wrapper treats its output as opaque JSON.
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
from dataclasses import dataclass
from typing import Any
from .errors import CliNotFoundError, CliUnparseableOutputError
@dataclass
class CliResult:
"""Parsed outcome of one CLI invocation.
``envelope`` is the parsed JSON dict emitted on stdout. ``stderr``
is the raw stderr text, surfaced for diagnostics. ``returncode`` is
the CLI exit code.
"""
envelope: dict[str, Any]
stderr: str
returncode: int
@property
def ok(self) -> bool:
return self.returncode == 0 and bool(self.envelope.get("ok", False))
def resolve_binary(env: dict[str, str] | None = None) -> str:
"""Locate the ``metin-release`` executable.
Preference order:
1. ``METIN_RELEASE_BINARY`` env var, if set and non-empty
2. :func:`shutil.which` on ``metin-release``
"""
environ = env if env is not None else os.environ
override = environ.get("METIN_RELEASE_BINARY", "").strip()
if override:
return override
found = shutil.which("metin-release")
if not found:
raise CliNotFoundError(
"metin-release binary not found; set METIN_RELEASE_BINARY or install the CLI."
)
return found
def run_cli(
argv_tail: list[str],
*,
binary: str | None = None,
runner: Any = None,
) -> CliResult:
"""Spawn the CLI with ``argv_tail`` and parse its JSON stdout.
``runner`` is an injection seam for tests — if provided, it's called
instead of :func:`subprocess.run` with the same arguments, and must
return an object with ``stdout``, ``stderr``, ``returncode``.
"""
bin_path = binary or resolve_binary()
cmd = [bin_path, *argv_tail]
spawn = runner or subprocess.run
try:
proc = spawn(cmd, capture_output=True, text=True, check=False)
except FileNotFoundError as exc:
raise CliNotFoundError(f"cannot spawn metin-release: {exc}") from exc
stdout = (getattr(proc, "stdout", "") or "").strip()
stderr = getattr(proc, "stderr", "") or ""
rc = int(getattr(proc, "returncode", 1))
if not stdout:
raise CliUnparseableOutputError(
f"metin-release produced no stdout (rc={rc}); stderr={stderr.strip()!r}"
)
try:
envelope = json.loads(stdout)
except json.JSONDecodeError as exc:
raise CliUnparseableOutputError(
f"metin-release stdout was not JSON: {exc}; first 200 chars={stdout[:200]!r}"
) from exc
if not isinstance(envelope, dict):
raise CliUnparseableOutputError(
f"metin-release JSON was not an object: {type(envelope).__name__}"
)
return CliResult(envelope=envelope, stderr=stderr, returncode=rc)

View File

@@ -0,0 +1,45 @@
"""Error shapes surfaced by the MCP wrapper.
The wrapper never invents its own error codes. It either propagates the
``{"ok": false, "error": {...}}`` envelope the CLI produced, or wraps a
wrapper-level failure (binary missing, unparseable output, unknown tool)
in one of the classes below.
"""
from __future__ import annotations
class McpWrapperError(Exception):
"""Base class for errors raised by the MCP wrapper itself.
These are *wrapper* problems (no CLI binary, junk on stdout, bad tool
name). CLI errors are passed through untouched as JSON envelopes.
"""
code: str = "mcp_wrapper_error"
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
def to_envelope(self, *, stderr: str | None = None) -> dict:
err: dict = {"code": self.code, "message": self.message}
if stderr:
err["stderr"] = stderr
return {"ok": False, "error": err}
class CliNotFoundError(McpWrapperError):
code = "cli_not_found"
class CliUnparseableOutputError(McpWrapperError):
code = "cli_unparseable_output"
class UnknownToolError(McpWrapperError):
code = "unknown_tool"
class InvalidToolInputError(McpWrapperError):
code = "invalid_tool_input"

View File

@@ -0,0 +1,172 @@
"""MCP stdio server entry point.
Registers each :data:`~metin_release_mcp.tool_defs.TOOL_SPECS` entry as
an MCP tool. On invocation the handler:
1. Validates input via the tool's JSON schema (enforced by the SDK) and
by :func:`~metin_release_mcp.tool_defs.build_cli_args`.
2. Shells out to ``metin-release <subcommand> --json …`` via
:func:`~metin_release_mcp.cli_runner.run_cli`.
3. Returns the parsed JSON envelope as the MCP tool result.
The server contains **no release business logic**. If the CLI returns a
non-zero exit or an ``ok=false`` envelope, the wrapper passes that
envelope back to the caller verbatim, tagging it with the CLI stderr as
diagnostic text.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import sys
from typing import Any
from . import __version__
from .errors import (
CliNotFoundError,
CliUnparseableOutputError,
InvalidToolInputError,
McpWrapperError,
UnknownToolError,
)
from .cli_runner import run_cli
from .tool_defs import TOOL_SPECS, TOOLS_BY_NAME, build_cli_args, json_schema
def _build_mcp_server(): # pragma: no cover - imported lazily for tests
from mcp.server import Server
from mcp.types import TextContent, Tool
server = Server("metin-release-mcp", version=__version__)
@server.list_tools()
async def _list_tools() -> list[Tool]:
return [
Tool(
name=spec.name,
description=spec.description,
inputSchema=json_schema(spec),
)
for spec in TOOL_SPECS
]
@server.call_tool()
async def _call_tool(name: str, arguments: dict[str, Any] | None):
envelope, stderr = dispatch(name, arguments or {})
text = json.dumps(envelope, indent=2, sort_keys=False)
content = [TextContent(type="text", text=text)]
if stderr.strip():
content.append(
TextContent(type="text", text=f"metin-release stderr:\n{stderr}")
)
return content, envelope
return server
def dispatch(tool_name: str, payload: dict[str, Any]) -> tuple[dict[str, Any], str]:
"""Dispatch a tool call synchronously.
Returns ``(envelope, stderr)``. On wrapper errors the envelope is
shaped like the CLI's own ``{"ok": false, "error": {...}}`` error
envelope so callers can treat both error sources the same way.
"""
spec = TOOLS_BY_NAME.get(tool_name)
if spec is None:
raise UnknownToolError(
f"unknown tool: {tool_name!r} (known: {sorted(TOOLS_BY_NAME)})"
)
try:
argv_tail = build_cli_args(spec, payload)
except InvalidToolInputError as exc:
return exc.to_envelope(), ""
try:
result = run_cli(argv_tail)
except (CliNotFoundError, CliUnparseableOutputError) as exc:
return exc.to_envelope(stderr=None), ""
return result.envelope, result.stderr
def _print_help() -> None:
lines = [
f"metin-release-mcp {__version__}",
"",
"Thin MCP server wrapping the metin-release CLI over stdio.",
"",
"Usage: metin-release-mcp [--help | --version | --list-tools]",
"",
"With no arguments, runs an MCP server on stdio and registers these tools:",
"",
]
for spec in TOOL_SPECS:
lines.append(f" {spec.name:<26} {spec.description}")
lines.append("")
lines.append("Environment:")
lines.append(
" METIN_RELEASE_BINARY Path to metin-release CLI (else resolved via PATH)"
)
print("\n".join(lines))
def _print_tools() -> None:
out = [
{"name": s.name, "description": s.description, "inputSchema": json_schema(s)}
for s in TOOL_SPECS
]
json.dump(out, sys.stdout, indent=2)
sys.stdout.write("\n")
async def _run_stdio() -> None: # pragma: no cover - exercised only end-to-end
from mcp.server.stdio import stdio_server
server = _build_mcp_server()
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="metin-release-mcp",
description="Thin MCP server wrapping the metin-release CLI.",
add_help=False,
)
parser.add_argument("-h", "--help", action="store_true")
parser.add_argument("--version", action="store_true")
parser.add_argument(
"--list-tools",
action="store_true",
help="Print registered tool schemas as JSON and exit.",
)
args = parser.parse_args(argv)
if args.help:
_print_help()
return 0
if args.version:
print(f"metin-release-mcp {__version__}")
return 0
if args.list_tools:
_print_tools()
return 0
try:
asyncio.run(_run_stdio())
except McpWrapperError as exc:
print(json.dumps(exc.to_envelope()), file=sys.stderr)
return 1
except KeyboardInterrupt: # pragma: no cover
return 130
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())

View File

@@ -0,0 +1,281 @@
"""Declarative tool specs for the Phase 3 MCP wrapper.
Each tool corresponds 1:1 to a ``metin-release release …`` subcommand. A
single :class:`ToolSpec` list drives three things:
1. The MCP ``list_tools`` response (via :func:`json_schema`)
2. Input validation for ``call_tool``
3. The CLI flag list emitted by :mod:`cli_runner`
Keeping all three in one place is what lets us guarantee the schema
matches the real argparse signature without duplicating it.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal
FieldType = Literal["string", "integer", "number", "boolean", "path", "array"]
@dataclass(frozen=True)
class FieldSpec:
"""One tool input field.
``name`` is the MCP-facing key (snake_case). ``flag`` is the argparse
flag name; if omitted it's derived from ``name`` by replacing ``_``
with ``-`` and prefixing ``--``. ``kind`` maps to JSON Schema types:
``path`` is a string that the CLI will read as a filesystem path,
``boolean`` is a store_true flag (passed with no value when truthy
and omitted when falsy).
"""
name: str
kind: FieldType
required: bool = False
description: str = ""
flag: str | None = None
default: Any = None
@property
def cli_flag(self) -> str:
return self.flag or f"--{self.name.replace('_', '-')}"
@property
def json_type(self) -> str:
if self.kind == "path":
return "string"
if self.kind == "array":
return "array"
return self.kind
@dataclass(frozen=True)
class ToolSpec:
name: str
subcommand: tuple[str, ...]
description: str
fields: tuple[FieldSpec, ...] = field(default_factory=tuple)
# ---------------------------------------------------------------------------
# Tool catalogue — must stay 1:1 with metin_release.commands.*.add_parser
# ---------------------------------------------------------------------------
_RELEASE = ("release",)
_M2PACK = ("m2pack",)
TOOL_SPECS: tuple[ToolSpec, ...] = (
ToolSpec(
name="release_inspect",
subcommand=_RELEASE + ("inspect",),
description="Scan a client source root and report file/byte counts plus launcher/main-exe presence.",
fields=(
FieldSpec("source", "path", required=True, description="Client root directory."),
),
),
ToolSpec(
name="release_build_manifest",
subcommand=_RELEASE + ("build-manifest",),
description="Build a signed-ready manifest.json for a source tree.",
fields=(
FieldSpec("source", "path", required=True, description="Client source root."),
FieldSpec("version", "string", required=True, description="Release version, e.g. 2026.04.14-1."),
FieldSpec("out", "path", required=True, description="Output manifest.json path."),
FieldSpec("previous", "string", description="Previous release version, if any."),
FieldSpec("notes", "path", description="Path to a release-notes file."),
FieldSpec("launcher", "string", description="Launcher filename (default Metin2Launcher.exe)."),
FieldSpec("created_at", "string", description="Override created_at for reproducible test runs."),
),
),
ToolSpec(
name="release_sign",
subcommand=_RELEASE + ("sign",),
description="Sign a manifest.json with an Ed25519 private key (mode-600 enforced).",
fields=(
FieldSpec("manifest", "path", required=True, description="Path to manifest.json."),
FieldSpec("key", "path", required=True, description="Absolute path to raw 32-byte private key."),
FieldSpec("out", "path", description="Signature output path (default: <manifest>.sig)."),
),
),
ToolSpec(
name="release_diff_remote",
subcommand=_RELEASE + ("diff-remote",),
description="HEAD every manifest blob hash against a base URL and report the missing set.",
fields=(
FieldSpec("manifest", "path", required=True, description="Path to manifest.json."),
FieldSpec("base_url", "string", required=True, description="Remote base URL."),
FieldSpec("timeout", "number", description="Per-request timeout in seconds (default 10)."),
),
),
ToolSpec(
name="release_upload_blobs",
subcommand=_RELEASE + ("upload-blobs",),
description="Rsync a release directory (excluding manifest) to the target.",
fields=(
FieldSpec("release_dir", "path", required=True, description="Local release output directory."),
FieldSpec("rsync_target", "string", required=True, description="rsync destination."),
FieldSpec("dry_run", "boolean", description="Run rsync --dry-run."),
FieldSpec("yes", "boolean", description="Skip interactive confirmation."),
),
),
ToolSpec(
name="release_promote",
subcommand=_RELEASE + ("promote",),
description="Promote a staged release by pushing manifest.json + manifest.json.sig.",
fields=(
FieldSpec("release_dir", "path", required=True, description="Local release directory."),
FieldSpec("rsync_target", "string", required=True, description="rsync destination top-level."),
FieldSpec("yes", "boolean", description="Skip interactive confirmation."),
FieldSpec("dry_run", "boolean", description="Run rsync --dry-run."),
),
),
ToolSpec(
name="release_verify_public",
subcommand=_RELEASE + ("verify-public",),
description="Fetch manifest.json + signature from a base URL and Ed25519-verify.",
fields=(
FieldSpec("base_url", "string", required=True, description="Remote base URL."),
FieldSpec("public_key", "string", required=True, description="Ed25519 public key: hex or path."),
FieldSpec("sample_blobs", "integer", description="GET and hash-check N random blobs."),
FieldSpec("timeout", "number", description="Per-request timeout in seconds (default 15)."),
),
),
ToolSpec(
name="release_publish",
subcommand=_RELEASE + ("publish",),
description="End-to-end release: build-manifest -> sign -> upload-blobs -> promote -> verify-public.",
fields=(
FieldSpec("source", "path", required=True, description="Client source root."),
FieldSpec("version", "string", required=True, description="Release version."),
FieldSpec("out", "path", required=True, description="Release output directory."),
FieldSpec("key", "path", required=True, description="Signing key path (mode 600)."),
FieldSpec("rsync_target", "string", required=True, description="rsync target for blobs + manifest."),
FieldSpec("base_url", "string", required=True, description="Public base URL for verification."),
FieldSpec("public_key", "string", required=True, description="Public key (hex or file)."),
FieldSpec("previous", "string", description="Previous release version."),
FieldSpec("notes", "path", description="Release notes file."),
FieldSpec("launcher", "string", description="Launcher filename."),
FieldSpec("created_at", "string", description="Override manifest created_at."),
FieldSpec("sample_blobs", "integer", description="verify-public sample count."),
FieldSpec("yes", "boolean", description="Skip interactive confirmation."),
FieldSpec("force", "boolean", description="Allow non-empty output directory."),
FieldSpec("dry_run_upload", "boolean", description="rsync --dry-run for upload + promote."),
),
),
ToolSpec(
name="m2pack_build",
subcommand=_M2PACK + ("build",),
description="Build a signed .m2p archive from a client asset directory via m2pack-secure.",
fields=(
FieldSpec("input", "path", required=True, description="Client asset source directory."),
FieldSpec("output", "path", required=True, description="Output .m2p archive path."),
FieldSpec("key", "path", required=True, description="Master content key file."),
FieldSpec(
"sign_secret_key",
"path",
required=True,
description="Ed25519 signing secret key file.",
),
FieldSpec("key_id", "integer", description="Content key id (default 1)."),
),
),
ToolSpec(
name="m2pack_verify",
subcommand=_M2PACK + ("verify",),
description="Verify an .m2p archive's manifest signature and optionally full decrypt.",
fields=(
FieldSpec("archive", "path", required=True, description="Path to .m2p archive."),
FieldSpec("public_key", "path", description="Ed25519 public key file."),
FieldSpec("key", "path", description="Master content key file for full-decrypt verify."),
),
),
ToolSpec(
name="m2pack_diff",
subcommand=_M2PACK + ("diff",),
description="Diff two directories and/or .m2p archives (left vs right).",
fields=(
FieldSpec("left", "path", required=True, description="Left side: directory or .m2p archive."),
FieldSpec("right", "path", required=True, description="Right side: directory or .m2p archive."),
),
),
ToolSpec(
name="m2pack_export_runtime_key",
subcommand=_M2PACK + ("export-runtime-key",),
description="Export a launcher runtime-key payload (json or blob form) from master+public keys.",
fields=(
FieldSpec("key", "path", required=True, description="Master content key file."),
FieldSpec("public_key", "path", required=True, description="Ed25519 public key file."),
FieldSpec("output", "path", required=True, description="Output payload path."),
FieldSpec("key_id", "integer", description="Content key id (default 1)."),
FieldSpec("format", "string", description="Payload format: json or blob (default json)."),
),
),
)
TOOLS_BY_NAME: dict[str, ToolSpec] = {t.name: t for t in TOOL_SPECS}
def json_schema(spec: ToolSpec) -> dict[str, Any]:
"""Build a JSON Schema for one tool's input object."""
props: dict[str, Any] = {}
required: list[str] = []
for f in spec.fields:
prop: dict[str, Any] = {"type": f.json_type}
if f.description:
prop["description"] = f.description
props[f.name] = prop
if f.required:
required.append(f.name)
schema: dict[str, Any] = {
"type": "object",
"properties": props,
"additionalProperties": False,
}
if required:
schema["required"] = required
return schema
def build_cli_args(spec: ToolSpec, payload: dict[str, Any]) -> list[str]:
"""Translate a tool input dict into a ``metin-release`` argv tail.
Output is ``[*subcommand, "--json", *flags]``. Unknown keys raise
:class:`~metin_release_mcp.errors.InvalidToolInputError`. Required
fields that are missing do the same.
"""
from .errors import InvalidToolInputError
known = {f.name for f in spec.fields}
unknown = set(payload) - known
if unknown:
raise InvalidToolInputError(
f"tool {spec.name} got unknown fields: {sorted(unknown)}"
)
argv: list[str] = [*spec.subcommand, "--json"]
for f in spec.fields:
if f.name not in payload or payload[f.name] is None:
if f.required:
raise InvalidToolInputError(
f"tool {spec.name} missing required field: {f.name}"
)
continue
value = payload[f.name]
if f.kind == "boolean":
if bool(value):
argv.append(f.cli_flag)
continue
if f.kind == "array":
if not isinstance(value, list):
raise InvalidToolInputError(
f"field {f.name} must be an array"
)
for item in value:
argv.extend([f.cli_flag, str(item)])
continue
argv.extend([f.cli_flag, str(value)])
return argv

66
tests/conftest.py Normal file
View File

@@ -0,0 +1,66 @@
from __future__ import annotations
import os
import shutil
import sys
from pathlib import Path
import pytest
# Ensure editable install's src path is on sys.path when running from checkout.
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
@pytest.fixture
def tiny_client(tmp_path: Path) -> Path:
"""Copy the checked-in tiny_client fixture into a tmp dir and return it."""
src = Path(__file__).parent / "fixtures" / "tiny_client"
dst = tmp_path / "client"
shutil.copytree(src, dst)
return dst
@pytest.fixture
def make_manifest_script() -> Path:
return Path("/home/jann/metin/repos/m2dev-client/scripts/make-manifest.py")
@pytest.fixture
def sign_manifest_script() -> Path:
return Path("/home/jann/metin/repos/m2dev-client/scripts/sign-manifest.py")
@pytest.fixture(autouse=True)
def reset_env(monkeypatch):
# Don't let ambient env vars bleed into tests.
for key in ("METIN_RELEASE_MAKE_MANIFEST", "METIN_RELEASE_SIGN_MANIFEST"):
monkeypatch.delenv(key, raising=False)
yield
@pytest.fixture
def test_keypair(tmp_path: Path) -> tuple[Path, str]:
"""Generate an ephemeral Ed25519 keypair, write the private key mode 600.
Returns (private_key_path, public_key_hex).
"""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
priv = Ed25519PrivateKey.generate()
raw = priv.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
key_path = tmp_path / "key"
key_path.write_bytes(raw)
os.chmod(key_path, 0o600)
pub_hex = priv.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
).hex()
return key_path, pub_hex

29
tests/fixtures/http_server.py vendored Normal file
View File

@@ -0,0 +1,29 @@
"""Minimal threaded HTTP server for diff-remote / verify-public tests."""
from __future__ import annotations
import contextlib
import threading
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Iterator
class _Handler(SimpleHTTPRequestHandler):
def log_message(self, format, *args): # noqa: A002
pass # silence
@contextlib.contextmanager
def serve_dir(root: Path) -> Iterator[str]:
"""Serve `root` on 127.0.0.1:<free port>, yield the base URL."""
handler = lambda *a, **kw: _Handler(*a, directory=str(root), **kw) # noqa: E731
server = ThreadingHTTPServer(("127.0.0.1", 0), handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
try:
yield f"http://127.0.0.1:{server.server_port}"
finally:
server.shutdown()
server.server_close()
thread.join(timeout=2)

1
tests/fixtures/tiny_client/Metin2.exe vendored Normal file
View File

@@ -0,0 +1 @@
fake main executable content

View File

@@ -0,0 +1 @@
fake launcher content

1
tests/fixtures/tiny_client/readme.txt vendored Normal file
View File

@@ -0,0 +1 @@
tiny client fixture for tests

View File

@@ -0,0 +1 @@
asset payload bytes

View File

@@ -0,0 +1 @@
second asset payload

0
tests/mcp/__init__.py Normal file
View File

32
tests/mcp/conftest.py Normal file
View File

@@ -0,0 +1,32 @@
"""Fixtures for the MCP wrapper test suite."""
from __future__ import annotations
import json
from dataclasses import dataclass
@dataclass
class FakeProc:
stdout: str
stderr: str = ""
returncode: int = 0
def make_runner(envelope: dict | None = None, *, stdout: str | None = None, stderr: str = "", returncode: int = 0):
"""Return a callable compatible with :func:`subprocess.run`.
Records the most recent ``cmd`` on ``runner.calls``.
"""
calls: list[list[str]] = []
def runner(cmd, capture_output=True, text=True, check=False): # noqa: ARG001
calls.append(list(cmd))
if stdout is not None:
s = stdout
else:
s = json.dumps(envelope if envelope is not None else {"ok": True, "command": "x", "status": "ok"})
return FakeProc(stdout=s, stderr=stderr, returncode=returncode)
runner.calls = calls # type: ignore[attr-defined]
return runner

View File

@@ -0,0 +1,86 @@
"""Tests for :mod:`metin_release_mcp.cli_runner`."""
from __future__ import annotations
import json
import pytest
from metin_release_mcp import cli_runner
from metin_release_mcp.errors import CliNotFoundError, CliUnparseableOutputError
from .conftest import make_runner
def test_run_cli_success_parses_envelope():
runner = make_runner({"ok": True, "command": "release inspect", "status": "inspected", "stats": {"file_count": 3}})
result = cli_runner.run_cli(
["release", "inspect", "--json", "--source", "/tmp/x"],
binary="/fake/metin-release",
runner=runner,
)
assert result.ok is True
assert result.envelope["status"] == "inspected"
assert result.envelope["stats"]["file_count"] == 3
assert result.returncode == 0
assert runner.calls[0][0] == "/fake/metin-release"
assert "--json" in runner.calls[0]
def test_run_cli_error_envelope_passed_through():
err = {
"ok": False,
"command": "release sign",
"status": "failed",
"error": {"code": "key_permission", "message": "key must be mode 600"},
}
runner = make_runner(err, stderr="error: key_permission\n", returncode=3)
result = cli_runner.run_cli(["release", "sign"], binary="/fake/mr", runner=runner)
assert result.ok is False
assert result.returncode == 3
assert result.envelope["error"]["code"] == "key_permission"
assert "key_permission" in result.stderr
def test_run_cli_unparseable_output_raises():
runner = make_runner(stdout="not-json-at-all", returncode=0)
with pytest.raises(CliUnparseableOutputError):
cli_runner.run_cli(["release", "inspect"], binary="/fake/mr", runner=runner)
def test_run_cli_empty_stdout_raises():
runner = make_runner(stdout=" \n", stderr="boom", returncode=2)
with pytest.raises(CliUnparseableOutputError):
cli_runner.run_cli(["release", "inspect"], binary="/fake/mr", runner=runner)
def test_run_cli_non_object_json_raises():
runner = make_runner(stdout=json.dumps([1, 2, 3]))
with pytest.raises(CliUnparseableOutputError):
cli_runner.run_cli(["release", "inspect"], binary="/fake/mr", runner=runner)
def test_run_cli_file_not_found_raises():
def boom(*args, **kwargs):
raise FileNotFoundError("no such file")
with pytest.raises(CliNotFoundError):
cli_runner.run_cli(["release", "inspect"], binary="/does/not/exist", runner=boom)
def test_resolve_binary_env_override(monkeypatch):
monkeypatch.setenv("METIN_RELEASE_BINARY", "/custom/path/metin-release")
assert cli_runner.resolve_binary() == "/custom/path/metin-release"
def test_resolve_binary_which_fallback(monkeypatch):
monkeypatch.delenv("METIN_RELEASE_BINARY", raising=False)
monkeypatch.setattr(cli_runner.shutil, "which", lambda name: "/usr/bin/metin-release")
assert cli_runner.resolve_binary() == "/usr/bin/metin-release"
def test_resolve_binary_missing_raises(monkeypatch):
monkeypatch.delenv("METIN_RELEASE_BINARY", raising=False)
monkeypatch.setattr(cli_runner.shutil, "which", lambda name: None)
with pytest.raises(CliNotFoundError):
cli_runner.resolve_binary()

View File

@@ -0,0 +1,285 @@
"""Dispatch tests — input dict to CLI argv translation."""
from __future__ import annotations
import json
import pytest
from metin_release_mcp import cli_runner, server
from metin_release_mcp.tool_defs import TOOLS_BY_NAME, build_cli_args
from .conftest import make_runner
def test_inspect_maps_source_to_flag():
spec = TOOLS_BY_NAME["release_inspect"]
argv = build_cli_args(spec, {"source": "/tmp/client"})
assert argv == ["release", "inspect", "--json", "--source", "/tmp/client"]
def test_build_manifest_full_flags():
spec = TOOLS_BY_NAME["release_build_manifest"]
argv = build_cli_args(
spec,
{
"source": "/src",
"version": "2026.04.14-1",
"out": "/out/manifest.json",
"previous": "2026.04.13-1",
"notes": "/notes.md",
"launcher": "Metin2Launcher.exe",
"created_at": "2026-04-14T00:00:00Z",
},
)
# Required args present in order declared
assert argv[0:3] == ["release", "build-manifest", "--json"]
assert "--source" in argv and argv[argv.index("--source") + 1] == "/src"
assert "--version" in argv and argv[argv.index("--version") + 1] == "2026.04.14-1"
assert "--out" in argv and argv[argv.index("--out") + 1] == "/out/manifest.json"
assert "--previous" in argv and argv[argv.index("--previous") + 1] == "2026.04.13-1"
assert "--created-at" in argv
assert argv[argv.index("--created-at") + 1] == "2026-04-14T00:00:00Z"
def test_build_manifest_omits_missing_optionals():
spec = TOOLS_BY_NAME["release_build_manifest"]
argv = build_cli_args(
spec,
{"source": "/src", "version": "v1", "out": "/m.json"},
)
assert "--previous" not in argv
assert "--notes" not in argv
assert "--launcher" not in argv
assert "--created-at" not in argv
def test_none_valued_optional_is_omitted():
spec = TOOLS_BY_NAME["release_build_manifest"]
argv = build_cli_args(
spec,
{"source": "/s", "version": "v", "out": "/m", "previous": None, "notes": None},
)
assert "--previous" not in argv
assert "--notes" not in argv
def test_boolean_flag_true_adds_flag_without_value():
spec = TOOLS_BY_NAME["release_upload_blobs"]
argv = build_cli_args(
spec,
{
"release_dir": "/rel",
"rsync_target": "user@host:/srv/updates",
"dry_run": True,
"yes": True,
},
)
assert "--dry-run" in argv
assert "--yes" in argv
# No value should follow --dry-run
i = argv.index("--dry-run")
# next element either end of list or another flag
assert i == len(argv) - 1 or argv[i + 1].startswith("--")
def test_boolean_flag_false_omits_flag():
spec = TOOLS_BY_NAME["release_upload_blobs"]
argv = build_cli_args(
spec,
{"release_dir": "/rel", "rsync_target": "tgt", "dry_run": False, "yes": False},
)
assert "--dry-run" not in argv
assert "--yes" not in argv
def test_path_with_spaces_passes_through_as_single_argv_element():
spec = TOOLS_BY_NAME["release_inspect"]
argv = build_cli_args(spec, {"source": "/tmp/has spaces/client root"})
# argv is a list — no shell involved, so spaces stay in one element
assert "/tmp/has spaces/client root" in argv
assert argv.count("--source") == 1
def test_diff_remote_numeric_timeout_serialised_as_string():
spec = TOOLS_BY_NAME["release_diff_remote"]
argv = build_cli_args(
spec,
{"manifest": "/m.json", "base_url": "https://x/", "timeout": 7.5},
)
assert argv[argv.index("--timeout") + 1] == "7.5"
def test_publish_mixes_booleans_and_strings():
spec = TOOLS_BY_NAME["release_publish"]
argv = build_cli_args(
spec,
{
"source": "/src",
"version": "v1",
"out": "/out",
"key": "/k",
"rsync_target": "t",
"base_url": "https://x/",
"public_key": "deadbeef",
"force": True,
"dry_run_upload": True,
"yes": False,
},
)
assert "--force" in argv
assert "--dry-run-upload" in argv
assert "--yes" not in argv
# ---------------------------------------------------------------------------
# server.dispatch integration against a stubbed CLI runner
# ---------------------------------------------------------------------------
@pytest.fixture
def stub_runner(monkeypatch):
def install(envelope=None, *, stdout=None, stderr="", returncode=0):
runner = make_runner(envelope, stdout=stdout, stderr=stderr, returncode=returncode)
monkeypatch.setattr(
server,
"run_cli",
lambda argv_tail: cli_runner.run_cli(
argv_tail, binary="/fake/metin-release", runner=runner
),
)
return runner
return install
def test_dispatch_success_returns_envelope(stub_runner):
runner = stub_runner(
{"ok": True, "command": "release inspect", "status": "inspected", "stats": {"file_count": 42}}
)
envelope, stderr = server.dispatch("release_inspect", {"source": "/tmp/x"})
assert envelope["ok"] is True
assert envelope["stats"]["file_count"] == 42
# the runner saw --json
assert "--json" in runner.calls[0]
assert "--source" in runner.calls[0]
assert "/tmp/x" in runner.calls[0]
def test_dispatch_cli_error_envelope_passes_through(stub_runner):
err = {
"ok": False,
"command": "release sign",
"status": "failed",
"error": {"code": "key_permission", "message": "mode 644"},
}
stub_runner(err, stderr="error\n", returncode=3)
envelope, stderr = server.dispatch(
"release_sign", {"manifest": "/m.json", "key": "/k"}
)
assert envelope == err
assert "error" in stderr
def test_dispatch_cli_binary_missing_returns_wrapper_error(monkeypatch):
def boom(argv_tail):
from metin_release_mcp.errors import CliNotFoundError
raise CliNotFoundError("metin-release binary not found")
monkeypatch.setattr(server, "run_cli", boom)
envelope, stderr = server.dispatch("release_inspect", {"source": "/x"})
assert envelope["ok"] is False
assert envelope["error"]["code"] == "cli_not_found"
def test_dispatch_invalid_input_returns_wrapper_error():
envelope, _ = server.dispatch("release_inspect", {})
assert envelope["ok"] is False
assert envelope["error"]["code"] == "invalid_tool_input"
def test_m2pack_build_translates_all_flags():
spec = TOOLS_BY_NAME["m2pack_build"]
argv = build_cli_args(
spec,
{
"input": "/src",
"output": "/out/a.m2p",
"key": "/ck",
"sign_secret_key": "/sk",
"key_id": 2,
},
)
assert argv[0:3] == ["m2pack", "build", "--json"]
assert "--input" in argv and argv[argv.index("--input") + 1] == "/src"
assert "--output" in argv and argv[argv.index("--output") + 1] == "/out/a.m2p"
assert "--sign-secret-key" in argv
assert argv[argv.index("--sign-secret-key") + 1] == "/sk"
assert "--key-id" in argv and argv[argv.index("--key-id") + 1] == "2"
def test_m2pack_verify_omits_optional_keys():
spec = TOOLS_BY_NAME["m2pack_verify"]
argv = build_cli_args(spec, {"archive": "/a.m2p"})
assert argv == ["m2pack", "verify", "--json", "--archive", "/a.m2p"]
def test_m2pack_diff_requires_both_sides():
from metin_release_mcp.errors import InvalidToolInputError
spec = TOOLS_BY_NAME["m2pack_diff"]
with pytest.raises(InvalidToolInputError):
build_cli_args(spec, {"left": "/L"})
argv = build_cli_args(spec, {"left": "/L", "right": "/R"})
assert argv == ["m2pack", "diff", "--json", "--left", "/L", "--right", "/R"]
def test_m2pack_export_runtime_key_with_format():
spec = TOOLS_BY_NAME["m2pack_export_runtime_key"]
argv = build_cli_args(
spec,
{
"key": "/ck",
"public_key": "/pk",
"output": "/out",
"format": "blob",
"key_id": 4,
},
)
assert argv[0:3] == ["m2pack", "export-runtime-key", "--json"]
assert "--format" in argv and argv[argv.index("--format") + 1] == "blob"
assert "--key-id" in argv and argv[argv.index("--key-id") + 1] == "4"
def test_dispatch_m2pack_build_through_fake_runner(stub_runner):
runner = stub_runner(
{"ok": True, "command": "m2pack build", "status": "built", "artifacts": {"archive_path": "/x.m2p"}}
)
envelope, _ = server.dispatch(
"m2pack_build",
{
"input": "/src",
"output": "/x.m2p",
"key": "/ck",
"sign_secret_key": "/sk",
},
)
assert envelope["ok"] is True
call = runner.calls[0]
assert "m2pack" in call and "build" in call
assert "--json" in call
assert "--input" in call and "/src" in call
assert "--sign-secret-key" in call and "/sk" in call
def test_dispatch_unparseable_output_returns_wrapper_error(monkeypatch):
def boom(argv_tail):
from metin_release_mcp.errors import CliUnparseableOutputError
raise CliUnparseableOutputError("not json")
monkeypatch.setattr(server, "run_cli", boom)
envelope, _ = server.dispatch("release_inspect", {"source": "/x"})
assert envelope["ok"] is False
assert envelope["error"]["code"] == "cli_unparseable_output"

View File

@@ -0,0 +1,152 @@
"""Schema tests — each MCP tool must mirror its CLI subcommand exactly."""
from __future__ import annotations
import argparse
import pytest
from metin_release.commands import (
build_manifest,
diff_remote,
inspect,
m2pack_build,
m2pack_diff,
m2pack_export_runtime_key,
m2pack_verify,
promote,
publish,
sign,
upload_blobs,
verify_public,
)
from metin_release_mcp.errors import UnknownToolError
from metin_release_mcp.server import dispatch
from metin_release_mcp.tool_defs import (
TOOL_SPECS,
TOOLS_BY_NAME,
build_cli_args,
json_schema,
)
EXPECTED_TOOL_NAMES = {
"release_inspect",
"release_build_manifest",
"release_sign",
"release_diff_remote",
"release_upload_blobs",
"release_promote",
"release_verify_public",
"release_publish",
"m2pack_build",
"m2pack_verify",
"m2pack_diff",
"m2pack_export_runtime_key",
}
def test_tool_catalogue_is_exactly_the_phase_one_plus_four_set():
assert {t.name for t in TOOL_SPECS} == EXPECTED_TOOL_NAMES
assert len(TOOL_SPECS) == 12
def test_every_tool_has_known_group_and_description():
for spec in TOOL_SPECS:
assert spec.subcommand[0] in {"release", "m2pack"}
assert spec.description.strip()
schema = json_schema(spec)
assert schema["type"] == "object"
assert schema["additionalProperties"] is False
def _argparse_flags_for(add_parser_fn) -> dict[str, dict]:
"""Introspect a command's argparse signature.
Returns a mapping from flag name (e.g. ``--source``) to a dict with
``required`` and ``kind`` ("boolean" for store_true, else "value").
"""
parser = argparse.ArgumentParser()
sub = parser.add_subparsers()
sp = add_parser_fn(sub)
out: dict[str, dict] = {}
for action in sp._actions:
if not action.option_strings:
continue
if action.dest in ("help",):
continue
flag = action.option_strings[0]
is_bool = isinstance(action, argparse._StoreTrueAction)
out[flag] = {"required": action.required, "kind": "boolean" if is_bool else "value"}
return out
_COMMAND_MODULES = {
"release_inspect": inspect,
"release_build_manifest": build_manifest,
"release_sign": sign,
"release_diff_remote": diff_remote,
"release_upload_blobs": upload_blobs,
"release_promote": promote,
"release_verify_public": verify_public,
"release_publish": publish,
"m2pack_build": m2pack_build,
"m2pack_verify": m2pack_verify,
"m2pack_diff": m2pack_diff,
"m2pack_export_runtime_key": m2pack_export_runtime_key,
}
@pytest.mark.parametrize("tool_name", sorted(EXPECTED_TOOL_NAMES))
def test_schema_mirrors_argparse(tool_name):
spec = TOOLS_BY_NAME[tool_name]
mod = _COMMAND_MODULES[tool_name]
argparse_flags = _argparse_flags_for(mod.add_parser)
spec_flags = {f.cli_flag: f for f in spec.fields}
assert set(spec_flags) == set(argparse_flags), (
f"{tool_name}: spec flags {set(spec_flags)} != argparse flags {set(argparse_flags)}"
)
for flag, info in argparse_flags.items():
field = spec_flags[flag]
assert field.required == info["required"], f"{tool_name} {flag} required mismatch"
if info["kind"] == "boolean":
assert field.kind == "boolean", f"{tool_name} {flag} expected boolean"
else:
assert field.kind != "boolean", f"{tool_name} {flag} should not be boolean"
@pytest.mark.parametrize("tool_name", sorted(EXPECTED_TOOL_NAMES))
def test_required_fields_in_json_schema(tool_name):
spec = TOOLS_BY_NAME[tool_name]
schema = json_schema(spec)
required_in_spec = {f.name for f in spec.fields if f.required}
required_in_schema = set(schema.get("required", []))
assert required_in_schema == required_in_spec
def test_unknown_tool_rejected_by_dispatch():
with pytest.raises(UnknownToolError):
dispatch("nope_not_a_tool", {})
def test_unknown_tool_name_not_in_catalogue():
assert "release_rollback" not in TOOLS_BY_NAME
assert "erp_reserve" not in TOOLS_BY_NAME
assert "launcher_publish" not in TOOLS_BY_NAME
def test_build_cli_args_rejects_missing_required():
from metin_release_mcp.errors import InvalidToolInputError
spec = TOOLS_BY_NAME["release_inspect"]
with pytest.raises(InvalidToolInputError):
build_cli_args(spec, {})
def test_build_cli_args_rejects_unknown_fields():
from metin_release_mcp.errors import InvalidToolInputError
spec = TOOLS_BY_NAME["release_inspect"]
with pytest.raises(InvalidToolInputError):
build_cli_args(spec, {"source": "/x", "nonsense": 1})

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
import pytest
from metin_release.commands import build_manifest
from metin_release.errors import ValidationError
from metin_release.workspace import Context
def _ctx(script: Path) -> Context:
return Context(make_manifest_script=script)
@pytest.mark.skipif(
not Path("/home/jann/metin/repos/m2dev-client/scripts/make-manifest.py").is_file(),
reason="real make-manifest.py not available",
)
def test_build_manifest_real_script(tiny_client: Path, tmp_path: Path, make_manifest_script: Path):
out = tmp_path / "manifest.json"
ctx = _ctx(make_manifest_script)
args = argparse.Namespace(
source=tiny_client,
version="2026.04.14-1",
out=out,
previous=None,
notes=None,
launcher="Metin2Launcher.exe",
created_at="2026-04-14T00:00:00Z",
)
result = build_manifest.run(ctx, args)
assert out.is_file()
manifest = json.loads(out.read_text())
assert manifest["version"] == "2026.04.14-1"
assert manifest["launcher"]["path"] == "Metin2Launcher.exe"
assert len(manifest["files"]) >= 3 # Metin2.exe, readme.txt, subdir/*
stats = result.data["stats"]
assert stats["file_count"] == len(manifest["files"]) + 1
assert stats["blob_count"] >= 1
assert stats["total_bytes"] > 0
def test_build_manifest_missing_script(tiny_client: Path, tmp_path: Path):
ctx = _ctx(tmp_path / "nope.py")
args = argparse.Namespace(
source=tiny_client,
version="v1",
out=tmp_path / "manifest.json",
previous=None,
notes=None,
launcher="Metin2Launcher.exe",
created_at=None,
)
with pytest.raises(ValidationError):
build_manifest.run(ctx, args)
def test_build_manifest_missing_source(tmp_path: Path, make_manifest_script: Path):
ctx = _ctx(make_manifest_script)
args = argparse.Namespace(
source=tmp_path / "nope",
version="v1",
out=tmp_path / "manifest.json",
previous=None,
notes=None,
launcher="Metin2Launcher.exe",
created_at=None,
)
with pytest.raises(ValidationError):
build_manifest.run(ctx, args)

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import json
import subprocess
import sys
def _run(*args: str) -> subprocess.CompletedProcess:
return subprocess.run(
[sys.executable, "-m", "metin_release", *args],
capture_output=True,
text=True,
)
def test_version_flag():
r = _run("--version")
assert r.returncode == 0
assert "metin-release" in r.stdout
def test_help_top_level():
r = _run("--help")
assert r.returncode == 0
assert "release" in r.stdout
def test_help_release_inspect():
r = _run("release", "inspect", "--help")
assert r.returncode == 0
assert "--source" in r.stdout
def test_unknown_subcommand_exits_nonzero():
r = _run("release", "does-not-exist")
assert r.returncode != 0
def test_inspect_json_output_is_pure_json(tmp_path):
(tmp_path / "Metin2.exe").write_text("x")
(tmp_path / "Metin2Launcher.exe").write_text("y")
r = _run("--json", "release", "inspect", "--source", str(tmp_path))
assert r.returncode == 0
envelope = json.loads(r.stdout) # must parse cleanly
assert envelope["ok"] is True
assert envelope["command"] == "release inspect"
assert envelope["stats"]["file_count"] == 2
def test_inspect_human_mode_emits_json_on_stdout_and_summary_on_stderr(tmp_path):
(tmp_path / "Metin2.exe").write_text("x")
r = _run("release", "inspect", "--source", str(tmp_path))
assert r.returncode == 0
envelope = json.loads(r.stdout)
assert envelope["ok"] is True
assert "release inspect" in r.stderr

71
tests/test_diff_remote.py Normal file
View File

@@ -0,0 +1,71 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
import pytest
# Make fixtures importable.
sys.path.insert(0, str(Path(__file__).parent / "fixtures"))
from http_server import serve_dir # noqa: E402
from metin_release.commands import diff_remote # noqa: E402
from metin_release.workspace import Context # noqa: E402
def _write_manifest(path: Path, hashes: list[str]) -> None:
manifest = {
"version": "v1",
"created_at": "2026-04-14T00:00:00Z",
"launcher": {"path": "Metin2Launcher.exe", "sha256": hashes[0], "size": 1},
"files": [
{"path": f"file{i}", "sha256": h, "size": 1}
for i, h in enumerate(hashes[1:])
],
}
path.write_text(json.dumps(manifest, indent=2))
def test_diff_remote_reports_missing(tmp_path: Path):
hashes = [
"a" * 64,
"b" * 64,
"c" * 64,
]
manifest = tmp_path / "manifest.json"
_write_manifest(manifest, hashes)
# Remote serves only two of the three blobs.
remote = tmp_path / "remote"
remote.mkdir()
for h in hashes[:2]:
d = remote / "files" / h[:2]
d.mkdir(parents=True)
(d / h).write_bytes(b"x")
with serve_dir(remote) as base_url:
args = argparse.Namespace(manifest=manifest, base_url=base_url, timeout=5.0)
result = diff_remote.run(Context(), args)
stats = result.data["stats"]
assert stats["manifest_blob_count"] == 3
assert stats["missing_blob_count"] == 1
assert stats["missing_blobs"] == [hashes[2]]
def test_diff_remote_all_present(tmp_path: Path):
hashes = ["d" * 64]
manifest = tmp_path / "manifest.json"
_write_manifest(manifest, hashes)
remote = tmp_path / "remote"
(remote / "files" / "dd").mkdir(parents=True)
(remote / "files" / "dd" / hashes[0]).write_bytes(b"x")
with serve_dir(remote) as base_url:
args = argparse.Namespace(manifest=manifest, base_url=base_url, timeout=5.0)
result = diff_remote.run(Context(), args)
assert result.data["stats"]["missing_blob_count"] == 0

42
tests/test_inspect.py Normal file
View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import argparse
from pathlib import Path
import pytest
from metin_release.commands import inspect
from metin_release.errors import SourceNotFoundError
from metin_release.workspace import Context
def test_inspect_tiny_client(tiny_client: Path):
ctx = Context()
args = argparse.Namespace(source=tiny_client)
result = inspect.run(ctx, args)
stats = result.data["stats"]
assert stats["file_count"] == 5
assert stats["launcher_present"] is True
assert stats["main_exe_present"] is True
assert stats["total_bytes"] > 0
assert stats["source_path"] == str(tiny_client)
def test_inspect_missing_source(tmp_path):
ctx = Context()
args = argparse.Namespace(source=tmp_path / "nope")
with pytest.raises(SourceNotFoundError):
inspect.run(ctx, args)
def test_inspect_skips_excluded(tmp_path):
(tmp_path / "Metin2.exe").write_text("x")
(tmp_path / "Metin2Launcher.exe").write_text("y")
log_dir = tmp_path / "log"
log_dir.mkdir()
(log_dir / "should_skip.txt").write_text("nope")
(tmp_path / "foo.pdb").write_text("nope")
ctx = Context()
args = argparse.Namespace(source=tmp_path)
result = inspect.run(ctx, args)
assert result.data["stats"]["file_count"] == 2

View File

@@ -0,0 +1,68 @@
"""Tests for the m2pack binary discovery helper."""
from __future__ import annotations
import os
import stat
from pathlib import Path
import pytest
from metin_release.errors import ValidationError
from metin_release.m2pack_binary import ENV_VAR, resolve_m2pack_binary
def _make_stub(tmp_path: Path, name: str = "m2pack") -> Path:
stub = tmp_path / name
stub.write_text("#!/bin/sh\necho '{}'\n")
stub.chmod(stub.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
return stub
def test_env_var_override_wins(tmp_path: Path, monkeypatch):
stub = _make_stub(tmp_path)
# Scrub PATH so shutil.which can't resolve m2pack
monkeypatch.setenv("PATH", "/nonexistent")
monkeypatch.setenv(ENV_VAR, str(stub))
resolved = resolve_m2pack_binary()
assert resolved == stub.resolve()
def test_env_var_empty_falls_through_to_path(tmp_path: Path, monkeypatch):
stub = _make_stub(tmp_path)
monkeypatch.setenv(ENV_VAR, " ") # blank-ish
monkeypatch.setenv("PATH", str(tmp_path))
resolved = resolve_m2pack_binary()
assert resolved == stub.resolve()
def test_env_var_pointing_nowhere_raises(tmp_path: Path, monkeypatch):
monkeypatch.setenv(ENV_VAR, str(tmp_path / "does-not-exist"))
monkeypatch.setenv("PATH", "/nonexistent")
with pytest.raises(ValidationError) as exc:
resolve_m2pack_binary()
assert exc.value.error_code == "m2pack_not_found"
def test_path_fallback_used_when_env_unset(tmp_path: Path, monkeypatch):
stub = _make_stub(tmp_path)
monkeypatch.delenv(ENV_VAR, raising=False)
monkeypatch.setenv("PATH", str(tmp_path))
resolved = resolve_m2pack_binary()
assert resolved == stub.resolve()
def test_missing_binary_raises_validation_error(monkeypatch):
monkeypatch.delenv(ENV_VAR, raising=False)
monkeypatch.setenv("PATH", "/nonexistent")
with pytest.raises(ValidationError) as exc:
resolve_m2pack_binary()
assert exc.value.error_code == "m2pack_not_found"
assert "M2PACK_BINARY" in str(exc.value)
def test_custom_env_mapping_parameter(tmp_path: Path):
stub = _make_stub(tmp_path)
fake_env = {ENV_VAR: str(stub)}
resolved = resolve_m2pack_binary(env=fake_env)
assert resolved == stub.resolve()

View File

@@ -0,0 +1,307 @@
"""Tests for the m2pack wrapper subcommands.
All tests use a stub binary — a small Python script that echoes a canned
JSON envelope based on the subcommand name — pointed at via the
``M2PACK_BINARY`` env var. The real m2pack-secure binary is never invoked.
"""
from __future__ import annotations
import json
import stat
import sys
from pathlib import Path
import pytest
from metin_release.cli import main as cli_main
STUB_TEMPLATE = r"""#!{python}
import json
import sys
argv = sys.argv[1:]
sub = argv[0] if argv else "unknown"
MODE = {mode!r}
if MODE == "fail":
sys.stderr.write("boom\n")
sys.exit(2)
if MODE == "nonjson":
sys.stdout.write("not json at all\n")
sys.exit(0)
if MODE == "notobject":
sys.stdout.write(json.dumps([1, 2, 3]) + "\n")
sys.exit(0)
# mode == "ok": echo a canned envelope per subcommand
if sub == "build":
env = {{"ok": True, "command": "build", "stats": {{"files": 3, "bytes": 12345}}}}
elif sub == "verify":
env = {{"ok": True, "command": "verify", "signature": "valid"}}
elif sub == "diff":
env = {{"ok": True, "command": "diff", "added": ["a"], "removed": [], "changed": ["b", "c"], "unchanged": 7}}
elif sub == "export-runtime-key":
env = {{"ok": True, "command": "export-runtime-key", "key_id": 1, "format": "json"}}
else:
env = {{"ok": True, "command": sub}}
# Record argv so the test can assert translation
import os
log = os.environ.get("M2PACK_STUB_LOG")
if log:
with open(log, "a") as fh:
fh.write(json.dumps(argv) + "\n")
sys.stdout.write(json.dumps(env) + "\n")
sys.exit(0)
"""
def _install_stub(tmp_path: Path, monkeypatch, mode: str = "ok") -> tuple[Path, Path]:
stub = tmp_path / "m2pack_stub.py"
stub.write_text(STUB_TEMPLATE.format(python=sys.executable, mode=mode))
stub.chmod(stub.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
log = tmp_path / "stub.log"
monkeypatch.setenv("M2PACK_BINARY", str(stub))
monkeypatch.setenv("M2PACK_STUB_LOG", str(log))
return stub, log
def _run_cli(argv: list[str], capsys) -> dict:
rc = cli_main(argv)
out = capsys.readouterr().out
envelope = json.loads(out)
return {"rc": rc, "env": envelope}
def _read_stub_call(log: Path) -> list[str]:
lines = [ln for ln in log.read_text().splitlines() if ln.strip()]
assert lines, "stub was never invoked"
return json.loads(lines[-1])
# ---------------------------------------------------------------------------
# Success paths
# ---------------------------------------------------------------------------
def test_build_success(tmp_path, monkeypatch, capsys):
_, log = _install_stub(tmp_path, monkeypatch)
(tmp_path / "in").mkdir()
key = tmp_path / "k.hex"
key.write_text("ff")
sk = tmp_path / "sk.hex"
sk.write_text("aa")
out = tmp_path / "out.m2p"
r = _run_cli(
[
"--json",
"m2pack",
"build",
"--input", str(tmp_path / "in"),
"--output", str(out),
"--key", str(key),
"--sign-secret-key", str(sk),
"--key-id", "7",
],
capsys,
)
assert r["rc"] == 0
assert r["env"]["ok"] is True
assert r["env"]["command"] == "m2pack build"
assert r["env"]["status"] == "built"
assert r["env"]["artifacts"]["archive_path"] == str(out)
assert r["env"]["m2pack"]["stats"]["files"] == 3
call = _read_stub_call(log)
assert call[0] == "build"
assert "--input" in call and str(tmp_path / "in") in call
assert "--output" in call and str(out) in call
assert "--key" in call and str(key) in call
assert "--sign-secret-key" in call and str(sk) in call
assert "--key-id" in call and "7" in call
assert "--json" in call
def test_build_omits_optional_key_id(tmp_path, monkeypatch, capsys):
_, log = _install_stub(tmp_path, monkeypatch)
(tmp_path / "in").mkdir()
(tmp_path / "k").write_text("a")
(tmp_path / "sk").write_text("b")
_run_cli(
[
"--json", "m2pack", "build",
"--input", str(tmp_path / "in"),
"--output", str(tmp_path / "o.m2p"),
"--key", str(tmp_path / "k"),
"--sign-secret-key", str(tmp_path / "sk"),
],
capsys,
)
call = _read_stub_call(log)
assert "--key-id" not in call
def test_verify_success_without_keys(tmp_path, monkeypatch, capsys):
_, log = _install_stub(tmp_path, monkeypatch)
archive = tmp_path / "a.m2p"
archive.write_bytes(b"x")
r = _run_cli(
["--json", "m2pack", "verify", "--archive", str(archive)], capsys
)
assert r["rc"] == 0
assert r["env"]["status"] == "verified"
assert r["env"]["m2pack"]["signature"] == "valid"
call = _read_stub_call(log)
assert call[0] == "verify"
assert "--public-key" not in call
assert "--key" not in call
def test_verify_with_public_and_content_keys(tmp_path, monkeypatch, capsys):
_, log = _install_stub(tmp_path, monkeypatch)
archive = tmp_path / "a.m2p"
archive.write_bytes(b"x")
pk = tmp_path / "pub"
pk.write_text("ff")
ck = tmp_path / "ck"
ck.write_text("aa")
_run_cli(
[
"--json", "m2pack", "verify",
"--archive", str(archive),
"--public-key", str(pk),
"--key", str(ck),
],
capsys,
)
call = _read_stub_call(log)
assert "--public-key" in call and str(pk) in call
assert "--key" in call and str(ck) in call
def test_diff_success_promotes_counts(tmp_path, monkeypatch, capsys):
_, log = _install_stub(tmp_path, monkeypatch)
left = tmp_path / "L"
left.mkdir()
right = tmp_path / "R.m2p"
right.write_bytes(b"x")
r = _run_cli(
[
"--json", "m2pack", "diff",
"--left", str(left),
"--right", str(right),
],
capsys,
)
assert r["rc"] == 0
assert r["env"]["status"] == "diffed"
stats = r["env"]["stats"]
assert stats["added_count"] == 1
assert stats["removed_count"] == 0
assert stats["changed_count"] == 2
assert stats["unchanged_count"] == 7
call = _read_stub_call(log)
assert call[0] == "diff"
def test_export_runtime_key_success(tmp_path, monkeypatch, capsys):
_, log = _install_stub(tmp_path, monkeypatch)
key = tmp_path / "ck"
key.write_text("aa")
pk = tmp_path / "pk"
pk.write_text("ff")
out = tmp_path / "runtime.json"
r = _run_cli(
[
"--json", "m2pack", "export-runtime-key",
"--key", str(key),
"--public-key", str(pk),
"--output", str(out),
"--key-id", "3",
"--format", "blob",
],
capsys,
)
assert r["rc"] == 0
assert r["env"]["status"] == "exported"
assert r["env"]["artifacts"]["runtime_key_path"] == str(out)
call = _read_stub_call(log)
assert call[0] == "export-runtime-key"
assert "--format" in call and "blob" in call
assert "--key-id" in call and "3" in call
def test_export_runtime_key_rejects_bad_format(tmp_path, monkeypatch, capsys):
_install_stub(tmp_path, monkeypatch)
with pytest.raises(SystemExit):
cli_main(
[
"--json", "m2pack", "export-runtime-key",
"--key", str(tmp_path / "k"),
"--public-key", str(tmp_path / "p"),
"--output", str(tmp_path / "o"),
"--format", "yaml",
]
)
# ---------------------------------------------------------------------------
# Error paths
# ---------------------------------------------------------------------------
def test_nonzero_exit_maps_to_subprocess_failed(tmp_path, monkeypatch, capsys):
_install_stub(tmp_path, monkeypatch, mode="fail")
(tmp_path / "in").mkdir()
(tmp_path / "k").write_text("a")
(tmp_path / "sk").write_text("b")
rc = cli_main(
[
"--json", "m2pack", "build",
"--input", str(tmp_path / "in"),
"--output", str(tmp_path / "o.m2p"),
"--key", str(tmp_path / "k"),
"--sign-secret-key", str(tmp_path / "sk"),
],
)
assert rc == 1
env = json.loads(capsys.readouterr().out)
assert env["ok"] is False
assert env["error"]["code"] == "m2pack_failed"
def test_nonjson_output_maps_to_invalid_json(tmp_path, monkeypatch, capsys):
_install_stub(tmp_path, monkeypatch, mode="nonjson")
archive = tmp_path / "a.m2p"
archive.write_bytes(b"x")
rc = cli_main(["--json", "m2pack", "verify", "--archive", str(archive)])
assert rc == 1
env = json.loads(capsys.readouterr().out)
assert env["ok"] is False
assert env["error"]["code"] == "m2pack_invalid_json"
def test_json_array_output_maps_to_invalid_json(tmp_path, monkeypatch, capsys):
_install_stub(tmp_path, monkeypatch, mode="notobject")
archive = tmp_path / "a.m2p"
archive.write_bytes(b"x")
rc = cli_main(["--json", "m2pack", "verify", "--archive", str(archive)])
assert rc == 1
env = json.loads(capsys.readouterr().out)
assert env["error"]["code"] == "m2pack_invalid_json"
def test_missing_binary_raises_validation_error(tmp_path, monkeypatch, capsys):
monkeypatch.delenv("M2PACK_BINARY", raising=False)
monkeypatch.setenv("PATH", "/nonexistent")
archive = tmp_path / "a.m2p"
archive.write_bytes(b"x")
rc = cli_main(["--json", "m2pack", "verify", "--archive", str(archive)])
assert rc == 1
env = json.loads(capsys.readouterr().out)
assert env["ok"] is False
assert env["error"]["code"] == "m2pack_not_found"

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent / "fixtures"))
from http_server import serve_dir # noqa: E402
from metin_release.commands import publish # noqa: E402
from metin_release.workspace import Context # noqa: E402
MAKE_MANIFEST = Path("/home/jann/metin/repos/m2dev-client/scripts/make-manifest.py")
SIGN_MANIFEST = Path("/home/jann/metin/repos/m2dev-client/scripts/sign-manifest.py")
@pytest.mark.skipif(
not (MAKE_MANIFEST.is_file() and SIGN_MANIFEST.is_file()),
reason="reference scripts not available",
)
def test_publish_end_to_end_local(tiny_client: Path, tmp_path: Path, test_keypair):
key_path, pub_hex = test_keypair
out_dir = tmp_path / "release"
rsync_target_dir = tmp_path / "remote"
rsync_target_dir.mkdir()
rsync_target = str(rsync_target_dir) + "/"
ctx = Context(make_manifest_script=MAKE_MANIFEST, sign_manifest_script=SIGN_MANIFEST)
# We need verify-public to hit the rsync target as an HTTP server.
# Start HTTP server pointing at rsync target BEFORE publish runs verify-public.
with serve_dir(rsync_target_dir) as base_url:
args = argparse.Namespace(
source=tiny_client,
version="2026.04.14-1",
out=out_dir,
key=key_path,
rsync_target=rsync_target,
base_url=base_url,
public_key=pub_hex,
previous=None,
notes=None,
launcher="Metin2Launcher.exe",
created_at="2026-04-14T00:00:00Z",
sample_blobs=2,
yes=True,
force=False,
dry_run_upload=False,
)
result = publish.run(ctx, args)
assert result.ok is True
assert result.status == "published"
stages = result.data["stages"]
names = [s["name"] for s in stages]
assert names == [
"build-manifest",
"sign",
"stage-blobs",
"upload-blobs",
"promote",
"verify-public",
]
for s in stages:
assert "error" not in s
# Verify that the rsync target now contains manifest + blobs.
assert (rsync_target_dir / "manifest.json").is_file()
assert (rsync_target_dir / "manifest.json.sig").is_file()
assert (rsync_target_dir / "files").is_dir()
blobs = list((rsync_target_dir / "files").rglob("*"))
blob_files = [b for b in blobs if b.is_file()]
assert len(blob_files) >= 3
# Verify sample-blobs count reflected
verify_stats = result.data["stats"]["verify"]
assert verify_stats["signature_valid"] is True
assert verify_stats["sampled_blob_count"] == 2
assert verify_stats["sampled_blob_failures"] == []

73
tests/test_sign.py Normal file
View File

@@ -0,0 +1,73 @@
from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
import pytest
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from metin_release.commands import sign
from metin_release.errors import IntegrityError, KeyPermissionError, ValidationError
from metin_release.workspace import Context
def _ctx(script: Path) -> Context:
return Context(sign_manifest_script=script)
def _write_toy_manifest(path: Path) -> None:
path.write_bytes(json.dumps({"version": "test", "files": []}, indent=2).encode() + b"\n")
@pytest.mark.skipif(
not Path("/home/jann/metin/repos/m2dev-client/scripts/sign-manifest.py").is_file(),
reason="real sign-manifest.py not available",
)
def test_sign_produces_valid_signature(tmp_path: Path, test_keypair, sign_manifest_script: Path):
key_path, pub_hex = test_keypair
manifest_path = tmp_path / "manifest.json"
_write_toy_manifest(manifest_path)
ctx = _ctx(sign_manifest_script)
args = argparse.Namespace(manifest=manifest_path, key=key_path, out=None)
result = sign.run(ctx, args)
sig_path = Path(result.data["artifacts"]["signature_path"])
assert sig_path.is_file()
sig = sig_path.read_bytes()
assert len(sig) == 64
# Verify it actually checks out against the public key.
pub = Ed25519PublicKey.from_public_bytes(bytes.fromhex(pub_hex))
pub.verify(sig, manifest_path.read_bytes())
# Now corrupt the manifest; signature no longer verifies.
manifest_path.write_bytes(b"tampered")
with pytest.raises(InvalidSignature):
pub.verify(sig, manifest_path.read_bytes())
def test_sign_rejects_loose_key_permissions(tmp_path: Path, sign_manifest_script: Path):
key = tmp_path / "key"
key.write_bytes(b"\x00" * 32)
os.chmod(key, 0o644)
manifest = tmp_path / "manifest.json"
_write_toy_manifest(manifest)
ctx = _ctx(sign_manifest_script)
args = argparse.Namespace(manifest=manifest, key=key, out=None)
with pytest.raises(KeyPermissionError):
sign.run(ctx, args)
def test_sign_relative_key_path_rejected(tmp_path: Path, sign_manifest_script: Path):
manifest = tmp_path / "manifest.json"
_write_toy_manifest(manifest)
ctx = _ctx(sign_manifest_script)
args = argparse.Namespace(manifest=manifest, key=Path("relative/key"), out=None)
with pytest.raises(ValidationError):
sign.run(ctx, args)