139 lines
4.2 KiB
Python
139 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent
|
|
INVENTORY_PATH = REPO_ROOT / "deploy" / "channel-inventory.json"
|
|
|
|
STACK_UNIT = "metin-server.service"
|
|
DB_UNIT = "metin-db.service"
|
|
DB_READY_UNIT = "metin-db-ready.service"
|
|
AUTH_UNIT = "metin-auth.service"
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def load_inventory() -> dict[str, Any]:
|
|
return json.loads(INVENTORY_PATH.read_text(encoding="utf-8"))
|
|
|
|
|
|
def get_db() -> dict[str, Any]:
|
|
return dict(load_inventory()["db"])
|
|
|
|
|
|
def get_auth() -> dict[str, Any]:
|
|
return dict(load_inventory()["auth"])
|
|
|
|
|
|
def iter_channels() -> list[dict[str, Any]]:
|
|
channels = load_inventory()["channels"]
|
|
return sorted((dict(channel) for channel in channels), key=lambda channel: int(channel["id"]))
|
|
|
|
|
|
def get_channel(channel_id: int) -> dict[str, Any]:
|
|
for channel in iter_channels():
|
|
if int(channel["id"]) == int(channel_id):
|
|
return channel
|
|
raise KeyError(f"Unknown channel id: {channel_id}")
|
|
|
|
|
|
def get_core(channel_id: int, core_id: int) -> dict[str, Any]:
|
|
channel = get_channel(channel_id)
|
|
for core in channel["cores"]:
|
|
if int(core["id"]) == int(core_id):
|
|
return dict(core)
|
|
raise KeyError(f"Unknown core {core_id} for channel {channel_id}")
|
|
|
|
|
|
def get_channel_ids() -> list[int]:
|
|
return [int(channel["id"]) for channel in iter_channels()]
|
|
|
|
|
|
def get_public_channel_ids(
|
|
selected_channel_ids: Iterable[int] | None = None,
|
|
*,
|
|
client_visible_only: bool = False,
|
|
) -> list[int]:
|
|
selected = None if selected_channel_ids is None else {int(channel_id) for channel_id in selected_channel_ids}
|
|
result: list[int] = []
|
|
|
|
for channel in iter_channels():
|
|
channel_id = int(channel["id"])
|
|
if selected is not None and channel_id not in selected:
|
|
continue
|
|
if not channel.get("public"):
|
|
continue
|
|
if client_visible_only and not channel.get("client_visible"):
|
|
continue
|
|
result.append(channel_id)
|
|
|
|
return result
|
|
|
|
|
|
def has_public_channel(
|
|
selected_channel_ids: Iterable[int] | None = None,
|
|
*,
|
|
client_visible_only: bool = False,
|
|
) -> bool:
|
|
return bool(get_public_channel_ids(selected_channel_ids, client_visible_only=client_visible_only))
|
|
|
|
|
|
def get_channel_map() -> dict[int, dict[int, str]]:
|
|
result: dict[int, dict[int, str]] = {}
|
|
for channel in iter_channels():
|
|
result[int(channel["id"])] = {
|
|
int(core["id"]): str(core["map_allow"])
|
|
for core in channel["cores"]
|
|
}
|
|
return result
|
|
|
|
|
|
def instance_name(channel_id: int, core_id: int) -> str:
|
|
return f"channel{int(channel_id)}_core{int(core_id)}"
|
|
|
|
|
|
def game_unit(instance: str) -> str:
|
|
return f"metin-game@{instance}.service"
|
|
|
|
|
|
def get_instances(selected_channel_ids: Iterable[int] | None = None) -> list[str]:
|
|
selected = None if selected_channel_ids is None else {int(channel_id) for channel_id in selected_channel_ids}
|
|
instances: list[str] = []
|
|
for channel in iter_channels():
|
|
channel_id = int(channel["id"])
|
|
if selected is not None and channel_id not in selected:
|
|
continue
|
|
for core in sorted(channel["cores"], key=lambda core: int(core["id"])):
|
|
instances.append(instance_name(channel_id, int(core["id"])))
|
|
return instances
|
|
|
|
|
|
def get_game_units(selected_channel_ids: Iterable[int] | None = None) -> list[str]:
|
|
return [game_unit(instance) for instance in get_instances(selected_channel_ids)]
|
|
|
|
|
|
def resolve_selected_channels(
|
|
channel_limit: int | None = None,
|
|
explicit_channels: Iterable[int] | None = None,
|
|
) -> list[int]:
|
|
available = set(get_channel_ids())
|
|
|
|
if explicit_channels:
|
|
selected = {int(channel_id) for channel_id in explicit_channels}
|
|
elif channel_limit is None or channel_limit == 0:
|
|
selected = set(available)
|
|
else:
|
|
selected = {channel_id for channel_id in available if channel_id <= channel_limit}
|
|
|
|
for channel in iter_channels():
|
|
if channel.get("always_include"):
|
|
selected.add(int(channel["id"]))
|
|
|
|
unknown = sorted(selected - available)
|
|
if unknown:
|
|
raise ValueError(f"Unknown channels requested: {unknown}")
|
|
|
|
return sorted(selected)
|