cli: scaffold phase 1 asset release commands

Add metin-release CLI with argparse dispatcher, result envelope and
error hierarchy, and the full Phase 1 release subcommand set:

  release inspect         - scan source root
  release build-manifest  - wraps make-manifest.py
  release sign            - wraps sign-manifest.py, enforces mode 600
  release diff-remote     - HEAD each blob hash against a base URL
  release upload-blobs    - rsync release dir minus manifest
  release promote         - rsync manifest.json + signature
  release verify-public   - GET + Ed25519 verify, optional blob sampling
  release publish         - composite of the above with per-stage timings

Respects --json / --verbose / --quiet. Exit codes follow the plan
(1 validation, 2 remote, 3 integrity, 4 reserved for ERP).
This commit is contained in:
Jan Nedbal
2026-04-14 18:59:50 +02:00
parent fb1e0cda87
commit e70fc300e2
22 changed files with 1344 additions and 1 deletions

6
.gitignore vendored
View File

@@ -101,6 +101,12 @@ ipython_config.py
# commonly ignored for libraries.
#uv.lock
# project-local
/out/
/staging/
/.venv/
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more

View File

@@ -1,3 +1,33 @@
# metin-release-cli
Python CLI for orchestrating Metin2 client releases (Phase 1 of metin-release-cli-plan)
Python CLI that orchestrates Metin2 client releases — builds manifests, signs
them, uploads content-addressed blobs, and promotes the new release atomically.
Phase 1 wraps `make-manifest.py` and `sign-manifest.py` from the `m2dev-client`
repo and adds remote diff, upload, promote, and public verification.
## Install
```
pip install -e .[dev]
```
## Usage
```
metin-release --help
metin-release release inspect --source /path/to/client
metin-release release publish --source ... --version 2026.04.14-1 ...
```
Add `--json` to get a machine-parseable envelope on stdout. Exit codes:
| Code | Meaning |
|------|--------------------------------------|
| 0 | success |
| 1 | operator / validation error |
| 2 | remote or network error |
| 3 | signing or integrity error |
| 4 | reserved (ERP sync, Phase 2+) |
See `docs/cli.md` for the full command reference.

42
pyproject.toml Normal file
View File

@@ -0,0 +1,42 @@
[build-system]
requires = ["setuptools>=61"]
build-backend = "setuptools.build_meta"
[project]
name = "metin-release-cli"
version = "0.1.0"
description = "Orchestration CLI for Metin2 client releases"
readme = "README.md"
license = { file = "LICENSE" }
authors = [
{ name = "Jan Nedbal", email = "jan.nedbal@apertia.cz" },
]
requires-python = ">=3.11"
dependencies = [
"cryptography>=41",
"requests>=2.31",
]
[project.optional-dependencies]
dev = [
"pytest>=8",
"pytest-mock>=3",
]
[project.scripts]
metin-release = "metin_release.cli:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-q"
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "B", "UP"]
ignore = ["E501"]

View File

@@ -0,0 +1,15 @@
"""metin-release: orchestration CLI for Metin2 client releases."""
from __future__ import annotations
try:
from importlib.metadata import PackageNotFoundError, version as _pkg_version
except ImportError: # pragma: no cover
from importlib_metadata import PackageNotFoundError, version as _pkg_version # type: ignore
try:
__version__ = _pkg_version("metin-release-cli")
except PackageNotFoundError: # pragma: no cover - running from source tree
__version__ = "0.0.0+local"
__all__ = ["__version__"]

View File

@@ -0,0 +1,6 @@
from __future__ import annotations
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

116
src/metin_release/cli.py Normal file
View File

@@ -0,0 +1,116 @@
"""Argparse dispatcher for metin-release."""
from __future__ import annotations
import argparse
import sys
from typing import Callable
from . import __version__
from .commands import (
build_manifest,
diff_remote,
inspect,
promote,
publish,
sign,
upload_blobs,
verify_public,
)
from .errors import ReleaseError
from .log import configure_logging, get_logger
from .result import Result, write_result
from .workspace import Context
CommandFn = Callable[[Context, argparse.Namespace], Result]
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="metin-release",
description="Orchestration CLI for Metin2 client releases.",
)
parser.add_argument("--version", action="version", version=f"metin-release {__version__}")
parser.add_argument("--json", action="store_true", help="Emit only JSON on stdout.")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose stderr logging.")
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress stderr logging.")
sub = parser.add_subparsers(dest="group", metavar="<group>")
sub.required = True
release = sub.add_parser("release", help="Asset release commands.")
rsub = release.add_subparsers(dest="cmd", metavar="<command>")
rsub.required = True
inspect.add_parser(rsub)
build_manifest.add_parser(rsub)
sign.add_parser(rsub)
diff_remote.add_parser(rsub)
upload_blobs.add_parser(rsub)
promote.add_parser(rsub)
verify_public.add_parser(rsub)
publish.add_parser(rsub)
return parser
_COMMAND_MAP: dict[tuple[str, str], tuple[str, CommandFn]] = {
("release", "inspect"): ("release inspect", inspect.run),
("release", "build-manifest"): ("release build-manifest", build_manifest.run),
("release", "sign"): ("release sign", sign.run),
("release", "diff-remote"): ("release diff-remote", diff_remote.run),
("release", "upload-blobs"): ("release upload-blobs", upload_blobs.run),
("release", "promote"): ("release promote", promote.run),
("release", "verify-public"): ("release verify-public", verify_public.run),
("release", "publish"): ("release publish", publish.run),
}
def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
configure_logging(verbose=args.verbose, quiet=args.quiet)
logger = get_logger()
ctx = Context(json_mode=args.json, verbose=args.verbose, quiet=args.quiet)
key = (args.group, args.cmd)
if key not in _COMMAND_MAP:
parser.error(f"unknown command: {args.group} {args.cmd}")
return 1 # unreachable; parser.error exits 2
command_name, fn = _COMMAND_MAP[key]
try:
result = fn(ctx, args)
except ReleaseError as exc:
logger.error("%s: %s", exc.error_code, exc.message)
result = Result(
command=command_name,
ok=False,
status="failed",
version=getattr(args, "version", None) if hasattr(args, "version") else None,
error_code=exc.error_code,
error_message=exc.message,
)
write_result(result, json_mode=ctx.json_mode)
return exc.exit_code
except KeyboardInterrupt:
print("interrupted", file=sys.stderr)
return 130
write_result(
result,
json_mode=ctx.json_mode,
human_summary=_human_summary(result),
)
return 0
def _human_summary(result: Result) -> str:
parts = [f"[{result.command}] {result.status}"]
if result.version:
parts.append(f"version={result.version}")
return " ".join(parts)

View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from . import (
build_manifest,
diff_remote,
inspect,
promote,
publish,
sign,
upload_blobs,
verify_public,
)
__all__ = [
"build_manifest",
"diff_remote",
"inspect",
"promote",
"publish",
"sign",
"upload_blobs",
"verify_public",
]

View File

@@ -0,0 +1,103 @@
"""release build-manifest: wraps make-manifest.py."""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import tempfile
from pathlib import Path
from ..errors import SourceNotFoundError, SubprocessError, ValidationError
from ..log import get_logger
from ..result import Result
from ..workspace import Context, ensure_dir, resolve_source
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("build-manifest", help="Build a signed-ready manifest.json for a source tree.")
p.add_argument("--source", required=True, type=Path, help="Client source root.")
p.add_argument("--version", required=True, help="Release version, e.g. 2026.04.14-1.")
p.add_argument("--out", required=True, type=Path, help="Output manifest.json path.")
p.add_argument("--previous", help="Previous release version, if any.")
p.add_argument("--notes", type=Path, help="Path to a file containing release notes.")
p.add_argument("--launcher", default="Metin2Launcher.exe", help="Launcher filename.")
p.add_argument("--created-at", help="Override created_at (for reproducible test runs).")
return p
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
source = resolve_source(args.source)
if not source.is_dir():
raise SourceNotFoundError(f"source is not a directory: {source}")
script = ctx.make_manifest_script
if not script.is_file():
raise ValidationError(f"make-manifest.py not found at {script} (set METIN_RELEASE_MAKE_MANIFEST)")
out_path = Path(args.out).expanduser().resolve()
ensure_dir(out_path.parent)
cmd = [
sys.executable,
str(script),
"--source", str(source),
"--version", args.version,
"--out", str(out_path),
"--launcher", args.launcher,
]
if args.previous:
cmd += ["--previous", args.previous]
if args.created_at:
cmd += ["--created-at", args.created_at]
notes_text = None
if args.notes:
notes_path = Path(args.notes).expanduser().resolve()
if not notes_path.is_file():
raise ValidationError(f"notes file not found: {notes_path}")
notes_text = notes_path.read_text(encoding="utf-8")
cmd += ["--notes", notes_text]
log.debug("spawn %s", cmd)
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
except FileNotFoundError as exc:
raise SubprocessError(f"cannot spawn make-manifest.py: {exc}") from exc
if proc.returncode != 0:
raise SubprocessError(
f"make-manifest.py exited {proc.returncode}: {proc.stderr.strip() or proc.stdout.strip()}"
)
if proc.stderr.strip():
log.info("make-manifest: %s", proc.stderr.strip())
if not out_path.is_file():
raise SubprocessError(f"manifest was not written to {out_path}")
manifest = json.loads(out_path.read_text(encoding="utf-8"))
files = manifest.get("files", [])
launcher = manifest.get("launcher", {})
total_bytes = launcher.get("size", 0) + sum(int(f.get("size", 0)) for f in files)
unique_blobs = {launcher.get("sha256")} | {f.get("sha256") for f in files}
unique_blobs.discard(None)
return Result(
command="release build-manifest",
version=manifest.get("version"),
status="manifest_built",
data={
"artifacts": {
"manifest_path": str(out_path),
},
"stats": {
"file_count": len(files) + 1,
"blob_count": len(unique_blobs),
"total_bytes": total_bytes,
"version": manifest.get("version"),
"created_at": manifest.get("created_at"),
},
},
)

View File

@@ -0,0 +1,83 @@
"""release diff-remote: HEAD every blob hash against a base URL."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import requests
from ..errors import NetworkError, ValidationError
from ..log import get_logger
from ..result import Result
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("diff-remote", help="Check which manifest blobs are missing on the remote.")
p.add_argument("--manifest", required=True, type=Path, help="Path to manifest.json.")
p.add_argument("--base-url", required=True, help="Remote base URL, e.g. https://updates.example/")
p.add_argument("--timeout", type=float, default=10.0, help="Per-request timeout in seconds.")
return p
def _blob_hashes(manifest: dict) -> list[str]:
hashes: set[str] = set()
launcher = manifest.get("launcher") or {}
if launcher.get("sha256"):
hashes.add(launcher["sha256"])
for f in manifest.get("files", []):
if f.get("sha256"):
hashes.add(f["sha256"])
return sorted(hashes)
def _check_blob(session: requests.Session, base_url: str, h: str, timeout: float) -> bool:
url = f"{base_url.rstrip('/')}/files/{h[:2]}/{h}"
try:
r = session.head(url, timeout=timeout, allow_redirects=True)
except requests.RequestException as exc:
raise NetworkError(f"HEAD {url} failed: {exc}") from exc
if r.status_code == 200:
return True
if r.status_code == 404:
return False
raise NetworkError(f"HEAD {url} returned unexpected status {r.status_code}")
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
manifest_path = Path(args.manifest).expanduser().resolve()
if not manifest_path.is_file():
raise ValidationError(f"manifest not found: {manifest_path}")
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
hashes = _blob_hashes(manifest)
missing: list[str] = []
with requests.Session() as session:
for h in hashes:
if not _check_blob(session, args.base_url, h, args.timeout):
missing.append(h)
log.info(
"diff-remote: %d blobs total, %d missing on %s",
len(hashes),
len(missing),
args.base_url,
)
return Result(
command="release diff-remote",
version=manifest.get("version"),
status="diffed",
data={
"remote": {
"base_url": args.base_url,
},
"stats": {
"manifest_blob_count": len(hashes),
"missing_blob_count": len(missing),
"missing_blobs": missing,
},
},
)

View File

@@ -0,0 +1,78 @@
"""release inspect: scan a source root, count files, detect launcher + main exe."""
from __future__ import annotations
import argparse
from pathlib import Path
from ..errors import SourceNotFoundError
from ..log import get_logger
from ..result import Result
from ..workspace import Context, resolve_source
EXCLUDE_DIRS = {".git", ".vs", ".updates", "log", "__pycache__"}
EXCLUDE_FILES = {".gitignore", ".gitattributes", "desktop.ini", "Thumbs.db", ".DS_Store"}
EXCLUDE_SUFFIXES = {".pdb", ".ilk", ".old", ".log", ".dxvk-cache", ".swp", ".tmp"}
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("inspect", help="Scan a client source root and report stats.")
p.add_argument("--source", required=True, type=Path, help="Client root directory.")
return p
def _should_skip(rel: Path) -> bool:
for part in rel.parts:
if part in EXCLUDE_DIRS:
return True
if rel.name in EXCLUDE_FILES:
return True
if rel.suffix in EXCLUDE_SUFFIXES:
return True
return False
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
source = resolve_source(args.source)
if not source.is_dir():
raise SourceNotFoundError(f"source is not a directory: {source}")
file_count = 0
total_bytes = 0
launcher_present = False
main_exe_present = False
for path in source.rglob("*"):
if not path.is_file():
continue
rel = path.relative_to(source)
if _should_skip(rel):
continue
file_count += 1
try:
total_bytes += path.stat().st_size
except OSError:
log.warning("failed to stat %s", rel)
name = rel.as_posix()
if name == "Metin2Launcher.exe":
launcher_present = True
elif name == "Metin2.exe":
main_exe_present = True
log.info("inspected %s: %d files, %d bytes", source, file_count, total_bytes)
return Result(
command="release inspect",
status="inspected",
data={
"stats": {
"source_path": str(source),
"file_count": file_count,
"total_bytes": total_bytes,
"launcher_present": launcher_present,
"main_exe_present": main_exe_present,
}
},
)

View File

@@ -0,0 +1,52 @@
"""release promote: upload manifest.json + manifest.json.sig (small, fast)."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from ..errors import ValidationError
from ..log import get_logger
from ..result import Result
from ..storage import rsync as rsync_backend
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("promote", help="Promote a staged release by pushing manifest + signature.")
p.add_argument("--release-dir", required=True, type=Path, help="Local release directory containing manifest.json(.sig).")
p.add_argument("--rsync-target", required=True, help="rsync destination top-level.")
p.add_argument("--yes", action="store_true", help="Skip interactive confirmation.")
p.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run.")
return p
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
release_dir = Path(args.release_dir).expanduser().resolve()
manifest_path = release_dir / "manifest.json"
if not manifest_path.is_file():
raise ValidationError(f"manifest.json missing in {release_dir}")
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
version = manifest.get("version")
rr = rsync_backend.push_manifest(release_dir, args.rsync_target, dry_run=args.dry_run)
log.info("promote: version=%s rsync exit=%d", version, rr.returncode)
return Result(
command="release promote",
version=version,
status="promoted" if not args.dry_run else "dry_run",
data={
"remote": {
"rsync_target": args.rsync_target,
"dry_run": args.dry_run,
},
"stats": {
"release_dir": str(release_dir),
"version": version,
},
},
)

View File

@@ -0,0 +1,186 @@
"""release publish: composite of build-manifest -> sign -> upload-blobs -> promote -> verify-public."""
from __future__ import annotations
import argparse
import json
import shutil
import subprocess
import sys
import time
from pathlib import Path
from typing import Callable
from ..errors import ReleaseError, ValidationError
from ..log import get_logger
from ..result import Result
from ..workspace import Context, ensure_dir, resolve_source
from . import build_manifest, promote, sign, upload_blobs, verify_public
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("publish", help="End-to-end: build, sign, upload, promote, verify.")
p.add_argument("--source", required=True, type=Path, help="Client source root.")
p.add_argument("--version", required=True, help="Release version.")
p.add_argument("--out", required=True, type=Path, help="Release output directory.")
p.add_argument("--key", required=True, type=Path, help="Signing key path (mode 600).")
p.add_argument("--rsync-target", required=True, help="rsync target for blobs + manifest.")
p.add_argument("--base-url", required=True, help="Public base URL for verification.")
p.add_argument("--public-key", required=True, help="Public key (hex or file).")
p.add_argument("--previous", help="Previous release version.")
p.add_argument("--notes", type=Path, help="Release notes file.")
p.add_argument("--launcher", default="Metin2Launcher.exe")
p.add_argument("--created-at", help="Override manifest created_at.")
p.add_argument("--sample-blobs", type=int, default=0)
p.add_argument("--yes", action="store_true")
p.add_argument("--force", action="store_true", help="Allow non-empty --out directory.")
p.add_argument("--dry-run-upload", action="store_true", help="rsync --dry-run for upload and promote.")
return p
def _build_blob_tree(source: Path, manifest: dict, out_dir: Path) -> dict:
files_dir = ensure_dir(out_dir / "files")
entries = []
launcher = manifest.get("launcher")
if launcher:
entries.append(launcher)
entries.extend(manifest.get("files", []))
seen: set[str] = set()
bytes_written = 0
for entry in entries:
h = entry["sha256"]
rel = entry["path"]
src = source / rel
if not src.is_file():
raise ValidationError(f"file in manifest missing from source: {rel}")
if h in seen:
continue
seen.add(h)
dst_dir = files_dir / h[:2]
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / h
if dst.exists():
continue
try:
dst.hardlink_to(src)
except OSError:
shutil.copy2(src, dst)
bytes_written += dst.stat().st_size
return {"unique_blobs": len(seen), "bytes_written": bytes_written}
def _run_stage(name: str, fn: Callable[[], Result], stages: list[dict]) -> Result:
start = time.monotonic()
try:
result = fn()
except ReleaseError as exc:
stages.append(
{
"name": name,
"status": "failed",
"duration_ms": int((time.monotonic() - start) * 1000),
"error": {"code": exc.error_code, "message": exc.message},
}
)
raise
stages.append(
{
"name": name,
"status": result.status,
"duration_ms": int((time.monotonic() - start) * 1000),
}
)
return result
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
source = resolve_source(args.source)
out_dir = Path(args.out).expanduser().resolve()
ensure_dir(out_dir)
if any(out_dir.iterdir()) and not args.force:
raise ValidationError(f"output directory {out_dir} is non-empty (use --force to overwrite)")
stages: list[dict] = []
manifest_path = out_dir / "manifest.json"
# Stage 1: build manifest
bm_args = argparse.Namespace(
source=source,
version=args.version,
out=manifest_path,
previous=args.previous,
notes=args.notes,
launcher=args.launcher,
created_at=args.created_at,
)
bm_result = _run_stage("build-manifest", lambda: build_manifest.run(ctx, bm_args), stages)
# Stage 2: sign
sig_path = out_dir / "manifest.json.sig"
sn_args = argparse.Namespace(manifest=manifest_path, key=args.key, out=sig_path)
_run_stage("sign", lambda: sign.run(ctx, sn_args), stages)
# Stage 2.5: build blob tree (inline step, not its own public subcommand)
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
def _blob_stage() -> Result:
stats = _build_blob_tree(source, manifest, out_dir)
return Result(command="release publish", status="blobs_staged", data={"stats": stats})
_run_stage("stage-blobs", _blob_stage, stages)
# Archive historical manifest
manifests_dir = ensure_dir(out_dir / "manifests")
(manifests_dir / f"{args.version}.json").write_bytes(manifest_path.read_bytes())
(manifests_dir / f"{args.version}.json.sig").write_bytes(sig_path.read_bytes())
# Stage 3: upload blobs
ub_args = argparse.Namespace(
release_dir=out_dir,
rsync_target=args.rsync_target,
dry_run=args.dry_run_upload,
yes=args.yes,
)
_run_stage("upload-blobs", lambda: upload_blobs.run(ctx, ub_args), stages)
# Stage 4: promote
pr_args = argparse.Namespace(
release_dir=out_dir,
rsync_target=args.rsync_target,
yes=args.yes,
dry_run=args.dry_run_upload,
)
_run_stage("promote", lambda: promote.run(ctx, pr_args), stages)
# Stage 5: verify-public
vp_args = argparse.Namespace(
base_url=args.base_url,
public_key=args.public_key,
sample_blobs=args.sample_blobs,
timeout=15.0,
)
vp_result = _run_stage("verify-public", lambda: verify_public.run(ctx, vp_args), stages)
log.info("publish: version=%s stages=%d", args.version, len(stages))
return Result(
command="release publish",
version=args.version,
status="published",
data={
"artifacts": {
"release_dir": str(out_dir),
"manifest_path": str(manifest_path),
"signature_path": str(sig_path),
},
"stats": {
**bm_result.data.get("stats", {}),
"verify": vp_result.data.get("stats", {}),
},
"stages": stages,
},
)

View File

@@ -0,0 +1,104 @@
"""release sign: wraps sign-manifest.py with mode-600 enforcement."""
from __future__ import annotations
import argparse
import os
import stat
import subprocess
import sys
from pathlib import Path
from ..errors import (
IntegrityError,
KeyPermissionError,
SubprocessError,
ValidationError,
)
from ..log import get_logger
from ..result import Result
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("sign", help="Sign a manifest.json with an Ed25519 key.")
p.add_argument("--manifest", required=True, type=Path, help="Path to manifest.json.")
p.add_argument("--key", required=True, type=Path, help="Absolute path to raw 32-byte private key.")
p.add_argument("--out", type=Path, help="Signature output path (default: <manifest>.sig).")
return p
def _enforce_key_mode(key_path: Path) -> None:
if not key_path.is_absolute():
raise ValidationError(f"--key must be an absolute path, got {key_path}")
if not key_path.is_file():
raise ValidationError(f"signing key not found: {key_path}")
mode = stat.S_IMODE(key_path.stat().st_mode)
if mode != 0o600:
raise KeyPermissionError(
f"signing key {key_path} must be mode 600, got {oct(mode)}"
)
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
manifest_path = Path(args.manifest).expanduser().resolve()
if not manifest_path.is_file():
raise ValidationError(f"manifest not found: {manifest_path}")
key_path = Path(args.key).expanduser()
_enforce_key_mode(key_path)
script = ctx.sign_manifest_script
if not script.is_file():
raise ValidationError(
f"sign-manifest.py not found at {script} (set METIN_RELEASE_SIGN_MANIFEST)"
)
out_path = (
Path(args.out).expanduser().resolve()
if args.out
else manifest_path.with_suffix(manifest_path.suffix + ".sig")
)
cmd = [
sys.executable,
str(script),
"--manifest", str(manifest_path),
"--key", str(key_path),
"--out", str(out_path),
]
log.debug("spawn %s", cmd)
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
except FileNotFoundError as exc:
raise SubprocessError(f"cannot spawn sign-manifest.py: {exc}") from exc
if proc.returncode != 0:
raise SubprocessError(
f"sign-manifest.py exited {proc.returncode}: {proc.stderr.strip() or proc.stdout.strip()}"
)
if proc.stderr.strip():
log.info("sign-manifest: %s", proc.stderr.strip())
if not out_path.is_file():
raise IntegrityError(f"signature was not written to {out_path}")
sig_bytes = out_path.read_bytes()
if len(sig_bytes) != 64:
raise IntegrityError(
f"signature at {out_path} is {len(sig_bytes)} bytes, expected 64"
)
return Result(
command="release sign",
status="signed",
data={
"artifacts": {
"manifest_path": str(manifest_path),
"signature_path": str(out_path),
},
"stats": {
"signature_bytes": len(sig_bytes),
},
},
)

View File

@@ -0,0 +1,46 @@
"""release upload-blobs: rsync the blob tree (excluding manifest) to target."""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from ..log import get_logger
from ..result import Result
from ..storage import rsync as rsync_backend
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("upload-blobs", help="Upload release directory (except manifest) via rsync.")
p.add_argument("--release-dir", required=True, type=Path, help="Local release output directory.")
p.add_argument("--rsync-target", required=True, help="rsync destination (path or user@host:/path).")
p.add_argument("--dry-run", action="store_true", help="Run rsync --dry-run.")
p.add_argument("--yes", action="store_true", help="Skip interactive confirmation.")
return p
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
release_dir = Path(args.release_dir).expanduser().resolve()
if not args.yes and not args.dry_run and sys.stdin.isatty():
log.warning("about to rsync %s -> %s (use --yes to skip prompt)", release_dir, args.rsync_target)
rr = rsync_backend.push_blobs(release_dir, args.rsync_target, dry_run=args.dry_run)
log.info("upload-blobs: rsync exit=%d", rr.returncode)
return Result(
command="release upload-blobs",
status="uploaded" if not args.dry_run else "dry_run",
data={
"remote": {
"rsync_target": args.rsync_target,
"dry_run": args.dry_run,
},
"stats": {
"release_dir": str(release_dir),
},
},
)

View File

@@ -0,0 +1,126 @@
"""release verify-public: download manifest + sig and verify Ed25519 signature."""
from __future__ import annotations
import argparse
import json
import random
from pathlib import Path
import requests
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from ..errors import NetworkError, SignatureError, ValidationError
from ..hashing import sha256_bytes
from ..log import get_logger
from ..result import Result
from ..workspace import Context
def add_parser(sub: argparse._SubParsersAction) -> argparse.ArgumentParser:
p = sub.add_parser("verify-public", help="GET manifest.json from base URL and verify signature.")
p.add_argument("--base-url", required=True, help="Remote base URL.")
p.add_argument(
"--public-key",
required=True,
help="Ed25519 public key: hex string or path to a file containing hex.",
)
p.add_argument("--sample-blobs", type=int, default=0, help="GET and hash-check N random blobs.")
p.add_argument("--timeout", type=float, default=15.0)
return p
def _load_public_key(value: str) -> Ed25519PublicKey:
candidate = Path(value).expanduser()
hex_text: str
if candidate.is_file():
hex_text = candidate.read_text(encoding="utf-8").strip()
else:
hex_text = value.strip()
try:
raw = bytes.fromhex(hex_text)
except ValueError as exc:
raise ValidationError(f"public key is not valid hex: {exc}") from exc
if len(raw) != 32:
raise ValidationError(f"public key must be 32 bytes, got {len(raw)}")
return Ed25519PublicKey.from_public_bytes(raw)
def _get(session: requests.Session, url: str, timeout: float) -> bytes:
try:
r = session.get(url, timeout=timeout)
except requests.RequestException as exc:
raise NetworkError(f"GET {url} failed: {exc}") from exc
if r.status_code != 200:
raise NetworkError(f"GET {url} returned {r.status_code}")
return r.content
def run(ctx: Context, args: argparse.Namespace) -> Result:
log = get_logger()
public_key = _load_public_key(args.public_key)
base = args.base_url.rstrip("/")
with requests.Session() as session:
manifest_bytes = _get(session, f"{base}/manifest.json", args.timeout)
sig_bytes = _get(session, f"{base}/manifest.json.sig", args.timeout)
if len(sig_bytes) != 64:
raise SignatureError(f"remote signature is {len(sig_bytes)} bytes, expected 64")
try:
public_key.verify(sig_bytes, manifest_bytes)
except InvalidSignature as exc:
raise SignatureError("manifest signature verification failed") from exc
manifest = json.loads(manifest_bytes.decode("utf-8"))
version = manifest.get("version")
created_at = manifest.get("created_at")
sampled_failures: list[str] = []
sampled_count = 0
if args.sample_blobs > 0:
files = list(manifest.get("files", []))
launcher = manifest.get("launcher")
if launcher:
files.append(launcher)
pool = [f for f in files if f.get("sha256")]
k = min(args.sample_blobs, len(pool))
sample = random.sample(pool, k) if k else []
for entry in sample:
h = entry["sha256"]
url = f"{base}/files/{h[:2]}/{h}"
try:
blob = _get(session, url, args.timeout)
except NetworkError as exc:
log.warning("sample blob fetch failed: %s", exc)
sampled_failures.append(h)
sampled_count += 1
continue
actual = sha256_bytes(blob)
sampled_count += 1
if actual != h:
sampled_failures.append(h)
log.info(
"verify-public: version=%s signature_valid=true sampled=%d failures=%d",
version,
sampled_count,
len(sampled_failures),
)
return Result(
command="release verify-public",
version=version,
status="verified",
data={
"remote": {"base_url": args.base_url},
"stats": {
"manifest_version": version,
"manifest_created_at": created_at,
"signature_valid": True,
"sampled_blob_count": sampled_count,
"sampled_blob_failures": sampled_failures,
},
},
)

View File

@@ -0,0 +1,77 @@
"""Error hierarchy. Each exception carries an exit_code and an error_code.
Exit code contract (from the plan):
0 - success
1 - operator or validation error (bad args, missing source, wrong key mode)
2 - remote or network error (upload / HTTP failure)
3 - signing or integrity error (signature verify fail, hash mismatch)
4 - ERP sync error (reserved, not used in Phase 1)
"""
from __future__ import annotations
class ReleaseError(Exception):
"""Base for all CLI errors."""
exit_code: int = 1
error_code: str = "error"
def __init__(self, message: str, *, error_code: str | None = None):
super().__init__(message)
if error_code is not None:
self.error_code = error_code
@property
def message(self) -> str:
return str(self)
class ValidationError(ReleaseError):
exit_code = 1
error_code = "validation_error"
class SourceNotFoundError(ValidationError):
error_code = "source_not_found"
class KeyPermissionError(ValidationError):
error_code = "key_permission"
class SubprocessError(ReleaseError):
exit_code = 1
error_code = "subprocess_failed"
class RemoteError(ReleaseError):
exit_code = 2
error_code = "remote_error"
class NetworkError(RemoteError):
error_code = "network_error"
class UploadError(RemoteError):
error_code = "upload_failed"
class IntegrityError(ReleaseError):
exit_code = 3
error_code = "integrity_error"
class SignatureError(IntegrityError):
error_code = "signature_invalid"
class HashMismatchError(IntegrityError):
error_code = "hash_mismatch"
class ErpError(ReleaseError): # reserved for Phase 2
exit_code = 4
error_code = "erp_error"

View File

@@ -0,0 +1,18 @@
"""Streaming sha256 helper."""
from __future__ import annotations
import hashlib
from pathlib import Path
def sha256_file(path: Path, *, chunk_size: int = 1 << 20) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
return h.hexdigest()
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()

31
src/metin_release/log.py Normal file
View File

@@ -0,0 +1,31 @@
"""Structured stderr logger. stdout is reserved for result JSON."""
from __future__ import annotations
import logging
import sys
_LOGGER_NAME = "metin_release"
def configure_logging(*, verbose: bool = False, quiet: bool = False) -> logging.Logger:
logger = logging.getLogger(_LOGGER_NAME)
logger.handlers.clear()
if quiet:
level = logging.CRITICAL + 1
elif verbose:
level = logging.DEBUG
else:
level = logging.INFO
handler = logging.StreamHandler(stream=sys.stderr)
handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(handler)
logger.setLevel(level)
logger.propagate = False
return logger
def get_logger() -> logging.Logger:
return logging.getLogger(_LOGGER_NAME)

View File

@@ -0,0 +1,53 @@
"""Result envelope and JSON/human output writer."""
from __future__ import annotations
import json
import sys
from dataclasses import dataclass, field
from typing import Any
@dataclass
class Result:
"""Structured result for a CLI command."""
command: str
ok: bool = True
status: str = "ok"
version: str | None = None
data: dict[str, Any] = field(default_factory=dict)
error_code: str | None = None
error_message: str | None = None
def to_envelope(self) -> dict[str, Any]:
env: dict[str, Any] = {
"ok": self.ok,
"command": self.command,
}
if self.version is not None:
env["version"] = self.version
env["status"] = self.status
if self.ok:
for k, v in self.data.items():
env[k] = v
else:
env["error"] = {
"code": self.error_code or "error",
"message": self.error_message or "",
}
return env
def write_result(result: Result, *, json_mode: bool, human_summary: str | None = None) -> None:
"""Emit the result.
When json_mode is True: only JSON on stdout.
When json_mode is False: human summary on stderr, JSON on stdout.
"""
envelope = result.to_envelope()
if not json_mode and human_summary:
print(human_summary, file=sys.stderr)
json.dump(envelope, sys.stdout, indent=2, sort_keys=False)
sys.stdout.write("\n")
sys.stdout.flush()

View File

@@ -0,0 +1,5 @@
from __future__ import annotations
from . import rsync
__all__ = ["rsync"]

View File

@@ -0,0 +1,100 @@
"""rsync-based remote storage backend."""
from __future__ import annotations
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from ..errors import UploadError, ValidationError
from ..log import get_logger
COMMON_FLAGS = [
"-av",
"--delay-updates",
"--checksum",
"--omit-dir-times",
"--no-perms",
]
@dataclass
class RsyncResult:
cmd: list[str]
returncode: int
stdout: str
stderr: str
bytes_transferred: int | None = None
def _ensure_rsync() -> str:
path = shutil.which("rsync")
if not path:
raise ValidationError("rsync binary not found in PATH")
return path
def _normalise_source_dir(source_dir: Path) -> str:
s = str(source_dir.resolve())
if not s.endswith("/"):
s += "/"
return s
def push_blobs(
release_dir: Path,
target: str,
*,
dry_run: bool = False,
) -> RsyncResult:
"""Upload everything in release_dir except manifest.json{,.sig}."""
log = get_logger()
if not release_dir.is_dir():
raise ValidationError(f"release dir not found: {release_dir}")
rsync = _ensure_rsync()
cmd = [rsync, *COMMON_FLAGS]
if dry_run:
cmd.append("--dry-run")
cmd += [
"--exclude", "manifest.json",
"--exclude", "manifest.json.sig",
_normalise_source_dir(release_dir),
target,
]
log.debug("rsync blobs: %s", cmd)
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
raise UploadError(
f"rsync blobs failed ({proc.returncode}): {proc.stderr.strip() or proc.stdout.strip()}"
)
return RsyncResult(cmd=cmd, returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)
def push_manifest(
release_dir: Path,
target: str,
*,
dry_run: bool = False,
) -> RsyncResult:
"""Upload only manifest.json and manifest.json.sig."""
log = get_logger()
manifest = release_dir / "manifest.json"
sig = release_dir / "manifest.json.sig"
if not manifest.is_file() or not sig.is_file():
raise ValidationError(
f"manifest or signature missing in {release_dir}: need manifest.json + manifest.json.sig"
)
rsync = _ensure_rsync()
cmd = [rsync, "-av", "--checksum", "--omit-dir-times", "--no-perms"]
if dry_run:
cmd.append("--dry-run")
cmd += [str(manifest), str(sig), target]
log.debug("rsync manifest: %s", cmd)
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
raise UploadError(
f"rsync manifest failed ({proc.returncode}): {proc.stderr.strip() or proc.stdout.strip()}"
)
return RsyncResult(cmd=cmd, returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)

View File

@@ -0,0 +1,43 @@
"""Workspace path helpers: source / out / staging resolution + the Context passed to commands."""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
DEFAULT_MAKE_MANIFEST = Path(
"/home/jann/metin/repos/m2dev-client/scripts/make-manifest.py"
)
DEFAULT_SIGN_MANIFEST = Path(
"/home/jann/metin/repos/m2dev-client/scripts/sign-manifest.py"
)
@dataclass
class Context:
"""Per-invocation command context. No module-level mutable state."""
json_mode: bool = False
verbose: bool = False
quiet: bool = False
make_manifest_script: Path = field(
default_factory=lambda: Path(
os.environ.get("METIN_RELEASE_MAKE_MANIFEST", str(DEFAULT_MAKE_MANIFEST))
)
)
sign_manifest_script: Path = field(
default_factory=lambda: Path(
os.environ.get("METIN_RELEASE_SIGN_MANIFEST", str(DEFAULT_SIGN_MANIFEST))
)
)
def resolve_source(source: str | os.PathLike[str]) -> Path:
return Path(source).expanduser().resolve()
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path