Files
RetroDebian/builder/py/orchestrate.py
2026-04-03 00:58:34 +02:00

344 lines
16 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import subprocess
import sys
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
THIS_DIR = Path(__file__).resolve().parent
if str(THIS_DIR) not in sys.path:
sys.path.insert(0, str(THIS_DIR))
from retrobuilder.context import BuildContext
from retrobuilder.envfile import write_env_file
from retrobuilder.loader import (
load_base_chain,
load_module_entry,
load_module_spec,
load_profile,
resolve_module_config,
resolve_module_docker_override,
)
from retrobuilder.operations import (
apply_profile_common_configuration,
clear_directory_contents,
ensure_live_structure,
inject_module_resources,
profile_finalize,
profile_pre_build,
save_feature_metadata,
save_profile_metadata,
)
from retrobuilder.paths import base_artifacts_dir, base_dir, feature_artifacts_dir, feature_dir, profile_artifacts_dir, project_root_from
ORCHESTRATOR_RUNTIME = "orchestrator"
PACKAGE_BUILDER_RUNTIME = "package-builder"
LIVE_HELPER_RUNTIME = "live-helper"
DEFAULT_RUNTIME_DOCKERFILES = {
ORCHESTRATOR_RUNTIME: "builder/docker/orchestrator/Dockerfile",
PACKAGE_BUILDER_RUNTIME: "builder/docker/package-builder/Dockerfile",
LIVE_HELPER_RUNTIME: "builder/docker/live-helper/Dockerfile",
}
def sh(cmd: list[str], *, cwd: Path | None = None, capture: bool = False) -> str:
proc = subprocess.run(cmd, cwd=cwd, text=True, check=True, stdout=subprocess.PIPE if capture else None)
return proc.stdout.strip() if capture else ""
def root_dir() -> Path:
return project_root_from(Path.cwd())
def list_names(path: Path) -> list[str]:
if not path.exists():
return []
if path.name == "profiles":
return sorted(p.stem for p in path.iterdir() if p.is_file() and p.suffix == ".py")
return sorted(p.name for p in path.iterdir() if p.is_dir())
@dataclass(frozen=True)
class RuntimeConfig:
image: str = ""
dockerfile: str = ""
context: str = "."
privileged: bool = False
class DockerRuntime:
def __init__(self, root: Path, name: str, image: str, privileged: bool = False) -> None:
self.root = root
self.name = name
self.image = image
self.container = f"retrodebian-{name}-{uuid.uuid4().hex[:8]}"
self.started = False
self.privileged = privileged
def start(self) -> None:
if self.started:
return
user = f"{os.getuid()}:{os.getgid()}" if hasattr(os, "getuid") else "0:0"
cmd = [
"docker",
"run",
"-d",
"--name",
self.container,
]
if self.privileged:
cmd.append("--privileged")
cmd.append("--user")
cmd.append(user)
cmd.append("-w")
cmd.append(str(self.root))
cmd.append("-v")
cmd.append(f"{self.root}:{self.root}:rw")
cmd.append(self.image)
cmd.append("sleep")
cmd.append("infinity")
sh(cmd)
self.started = True
def exec(self, argv: list[str], env: dict[str, str] | None = None) -> None:
self.start()
cmd = ["docker", "exec"]
for k, v in (env or {}).items():
cmd += ["-e", f"{k}={v}"]
sh(cmd + [self.container] + argv)
def stop(self) -> None:
if self.started:
try:
sh(["docker", "stop", self.container])
finally:
self.started = False
def rm(self) -> None:
sh(["docker", "rm", "-f", self.container])
class RuntimePool:
def __init__(self, root: Path, configs: dict[str, RuntimeConfig], keep: bool = False) -> None:
self.root = root
self.keep = keep
self.defaults = {name: DockerRuntime(root, name, self._image(name, cfg), cfg.privileged) for name, cfg in configs.items()}
def _image(self, name: str, cfg: RuntimeConfig) -> str:
if cfg.dockerfile:
image = cfg.image or f"retrodebian/{name}:local"
sh(["docker", "build", "-t", image, "-f", cfg.dockerfile, cfg.context or "."], cwd=self.root)
return image
if cfg.image:
return cfg.image
raise ValueError(f"Missing image for runtime {name}")
def start(self) -> None:
for runtime in self.defaults.values():
runtime.start()
def close(self) -> None:
for runtime in self.defaults.values():
try: runtime.stop()
except Exception: pass
if not self.keep:
for runtime in self.defaults.values():
try: runtime.rm()
except Exception: pass
def resolve(self, runtime_name: str, override: RuntimeConfig | None = None) -> tuple[DockerRuntime, bool]:
if not override or not (override.image or override.dockerfile):
return self.defaults[runtime_name], False
runtime = DockerRuntime(self.root, f"{runtime_name}-override", self._image(f"{runtime_name}-override-{uuid.uuid4().hex[:6]}", override), override.privileged)
runtime.start()
return runtime, True
class Orchestrator:
def __init__(self, root: Path, runtimes: RuntimePool) -> None:
self.root = root
self.runtimes = runtimes
def validate(self) -> None:
for profile_name in list_names(self.root / "profiles"):
profile = load_profile(self.root, profile_name)
load_base_chain(self.root, profile.base)
for feature_name in profile.features:
load_module_spec(self.root, "feature", feature_name)
print("Validation OK")
def context(self, phase: str, kind: str, *, profile_name: str = "", base_name: str = "", feature_name: str = "") -> BuildContext:
profile = load_profile(self.root, profile_name) if profile_name else None
base_name = base_name or (profile.base if profile else "")
feature_names = tuple(profile.features) if profile else ()
return BuildContext(
project_root=self.root,
live_dir=self.root / "live",
artifacts_root=self.root / "artifacts",
phase=phase,
kind=kind,
name=feature_name or base_name or profile_name,
profile_name=profile_name,
base_name=base_name,
feature_name=feature_name,
profile={**(profile.config if profile else {}), "base": base_name, "features": feature_names, "edition": profile.edition if profile else "", "description": profile.description if profile else "", "splash": profile.splash if profile else ""},
base=resolve_module_config(self.root, "base", base_name) if base_name else {},
features={name: resolve_module_config(self.root, "feature", name) for name in feature_names},
)
def env_file(self, ctx: BuildContext, path: Path) -> Path:
values = ctx.to_env()
write_env_file(path, values)
return path
def override(self, kind: str, name: str, phase: str) -> RuntimeConfig | None:
stage = resolve_module_docker_override(self.root, kind, name, phase)
if not stage:
return None
return RuntimeConfig(stage.image or "", stage.dockerfile or "", stage.docker_context or ".")
def run_python(self, kind: str, name: str, phase: str, *, profile_name: str = "", base_name: str = "", feature_name: str = "") -> None:
module_dir = (base_dir if kind == "base" else feature_dir)(self.root, name)
runtime, owned = self.runtimes.resolve(ORCHESTRATOR_RUNTIME, self.override(kind, name, phase))
try:
runtime.exec(["python3", str(module_dir / "entry.py"), phase], self.context(phase, kind, profile_name=profile_name, base_name=base_name, feature_name=feature_name).to_env())
finally:
if owned:
runtime.stop()
if not self.runtimes.keep:
runtime.rm()
def run_generate(self, kind: str, name: str, env_path: Path) -> None:
module_dir = (base_dir if kind == "base" else feature_dir)(self.root, name)
runtime, owned = self.runtimes.resolve(PACKAGE_BUILDER_RUNTIME, self.override(kind, name, "generate"))
try:
runtime.exec(["builder/bash/run_generate.sh", str(module_dir), str(env_path)])
finally:
if owned:
runtime.stop()
if not self.runtimes.keep:
runtime.rm()
def run_shell(self, kind: str, name: str, phase: str, env_path: Path) -> None:
module_dir = (base_dir if kind == "base" else feature_dir)(self.root, name)
runtime, owned = self.runtimes.resolve(LIVE_HELPER_RUNTIME, self.override(kind, name, phase))
try:
runtime.exec(["builder/bash/run_entry.sh", str(module_dir), str(env_path), phase])
finally:
if owned:
runtime.stop()
if not self.runtimes.keep:
runtime.rm()
def build_common_features(self, profile_hint: str) -> None:
for feature_name in list_names(self.root / "features"):
ctx = self.context("pre-gen", "feature", profile_name=profile_hint, feature_name=feature_name)
self.run_python("feature", feature_name, "pre-gen", profile_name=profile_hint, feature_name=feature_name)
self.run_generate("feature", feature_name, self.env_file(ctx, feature_artifacts_dir(self.root, feature_name) / "runtime.env"))
self.run_python("feature", feature_name, "post-gen", profile_name=profile_hint, feature_name=feature_name)
def build_base_chain(self, profile_name: str, base_chain: list[tuple[str, object]]) -> None:
for base_name, _ in base_chain:
ctx = self.context("pre-gen", "base", profile_name=profile_name, base_name=base_name)
self.run_python("base", base_name, "pre-gen", profile_name=profile_name, base_name=base_name)
self.run_generate("base", base_name, self.env_file(ctx, base_artifacts_dir(self.root, base_name) / "runtime.env"))
self.run_python("base", base_name, "post-gen", profile_name=profile_name, base_name=base_name)
def configure_profile(self, profile_name: str, profile, base_chain: list[tuple[str, object]]) -> None:
clear_directory_contents(self.root / "live")
ensure_live_structure(self.root / "live")
save_profile_metadata(profile_artifacts_dir(self.root, profile_name) / "profile.json", profile_name, profile, profile.base, load_module_spec(self.root, "base", profile.base), base_chain)
self.runtimes.defaults[LIVE_HELPER_RUNTIME].exec(["builder/bash/run_profile_config.sh", str(self.env_file(self.context("config", "profile", profile_name=profile_name), profile_artifacts_dir(self.root, profile_name) / "profile-config.env"))])
apply_profile_common_configuration(self.root, self.root / "live", profile_name, profile)
for base_name, _ in base_chain:
inject_module_resources(base_dir(self.root, base_name), self.root / "live", base_name)
def inject_features(self, profile_name: str, base_name: str, feature_names: Iterable[str]) -> None:
pre = self.context("pre-feature", "base", profile_name=profile_name, base_name=base_name)
pre_env = self.env_file(pre, base_artifacts_dir(self.root, base_name) / "pre-feature.env")
self.run_python("base", base_name, "pre-feature", profile_name=profile_name, base_name=base_name)
self.run_shell("base", base_name, "pre-feature", pre_env)
for feature_name in feature_names:
save_feature_metadata(profile_artifacts_dir(self.root, profile_name) / "features" / f"{feature_name}.json", feature_name, load_module_spec(self.root, "feature", feature_name))
pre = self.context("pre-inj", "feature", profile_name=profile_name, base_name=base_name, feature_name=feature_name)
pre_env = self.env_file(pre, feature_artifacts_dir(self.root, feature_name) / "pre-inj.env")
self.run_python("feature", feature_name, "pre-inj", profile_name=profile_name, base_name=base_name, feature_name=feature_name)
self.run_shell("feature", feature_name, "pre-inj", pre_env)
inject_module_resources(feature_dir(self.root, feature_name), self.root / "live", feature_name)
post = self.context("post-inj", "feature", profile_name=profile_name, base_name=base_name, feature_name=feature_name)
post_env = self.env_file(post, feature_artifacts_dir(self.root, feature_name) / "post-inj.env")
self.run_shell("feature", feature_name, "post-inj", post_env)
self.run_python("feature", feature_name, "post-inj", profile_name=profile_name, base_name=base_name, feature_name=feature_name)
post = self.context("post-feature", "base", profile_name=profile_name, base_name=base_name)
post_env = self.env_file(post, base_artifacts_dir(self.root, base_name) / "post-feature.env")
self.run_python("base", base_name, "post-feature", profile_name=profile_name, base_name=base_name)
self.run_shell("base", base_name, "post-feature", post_env)
def finalize_profile(self, profile_name: str) -> None:
profile_pre_build(self.root / "live", profile_name)
build_env = self.env_file(self.context("build", "profile", profile_name=profile_name), profile_artifacts_dir(self.root, profile_name) / "profile-build.env")
self.runtimes.defaults[LIVE_HELPER_RUNTIME].exec(["builder/bash/run_profile_build.sh", str(build_env)])
profile_finalize(self.root, profile_name)
def build_profile(self, profile_name: str) -> None:
profile = load_profile(self.root, profile_name)
base_chain = load_base_chain(self.root, profile.base)
self.build_base_chain(profile_name, base_chain)
self.configure_profile(profile_name, profile, base_chain)
self.inject_features(profile_name, profile.base, profile.features)
self.finalize_profile(profile_name)
def run(self, all_profiles: bool, profile_name: str = "") -> None:
self.runtimes.start()
try:
self.validate()
self.build_common_features(profile_name or "demo")
for name in (list_names(self.root / "profiles") if all_profiles else [profile_name]):
self.build_profile(name)
finally:
self.runtimes.close()
def parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="RetroDebian orchestrator")
sub = p.add_subparsers(dest="command", required=True)
sub.add_parser("validate")
runp = sub.add_parser("run")
grp = runp.add_mutually_exclusive_group(required=True)
grp.add_argument("--profile")
grp.add_argument("--all-profiles", action="store_true")
for runtime in (ORCHESTRATOR_RUNTIME, PACKAGE_BUILDER_RUNTIME, LIVE_HELPER_RUNTIME):
opt = runtime.replace("-", "_")
runp.add_argument(f"--{opt}-image", default="")
runp.add_argument(f"--{opt}-dockerfile", default=DEFAULT_RUNTIME_DOCKERFILES[runtime])
runp.add_argument(f"--{opt}-context", default=".")
return p
def main() -> int:
args = parser().parse_args()
root = root_dir()
if args.command == "validate":
Orchestrator(root, RuntimePool(root, {
ORCHESTRATOR_RUNTIME: RuntimeConfig(image="noop"),
PACKAGE_BUILDER_RUNTIME: RuntimeConfig(image="noop"),
LIVE_HELPER_RUNTIME: RuntimeConfig(image="noop"),
}, keep=True)).validate()
return 0
runtimes = RuntimePool(root, {
ORCHESTRATOR_RUNTIME: RuntimeConfig(args.orchestrator_image, args.orchestrator_dockerfile, args.orchestrator_context),
PACKAGE_BUILDER_RUNTIME: RuntimeConfig(args.package_builder_image, args.package_builder_dockerfile, args.package_builder_context),
LIVE_HELPER_RUNTIME: RuntimeConfig(args.live_helper_image, args.live_helper_dockerfile, args.live_helper_context, True),
})
Orchestrator(root, runtimes).run(args.all_profiles, args.profile or "")
return 0
if __name__ == "__main__":
raise SystemExit(main())