"""Demo entrypoint: fetch checkpoint from HF, then launch interactive playback."""
from __future__ import annotations
import importlib.util
import os
import platform
import shutil
import subprocess
import sys
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from unilab.assets import ASSETS_ROOT_PATH
from unilab.assets.hub import resolve_checkpoint_file
_OFFICIAL_MUJOCO_PACKAGE = "mujoco==3.8.0"
[docs]
@dataclass(frozen=True)
class DemoSpec:
algo: str
task: str
sim: str
entry: str # "eval", "play_interactive", or "teaser"
DEMO_REGISTRY: dict[str, DemoSpec] = {
"dance": DemoSpec(algo="ppo", task="g1_motion_tracking", sim="motrix", entry="eval"),
"wallflip": DemoSpec(algo="ppo", task="g1_wall_flip_tracking", sim="motrix", entry="eval"),
"boxtracking": DemoSpec(algo="ppo", task="g1_box_tracking", sim="motrix", entry="eval"),
"locomani": DemoSpec(
algo="ppo", task="go2_arm_manip_loco", sim="mujoco", entry="play_interactive"
),
"inhandgrasp": DemoSpec(
algo="hora_distill",
task="sharpa_inhand",
sim="mujoco_nodr",
entry="play_interactive",
),
"sharpa_appo_student": DemoSpec(
algo="hora_distill",
task="sharpa_inhand",
sim="mujoco_nodr",
entry="play_interactive",
),
"teaser": DemoSpec(algo="", task="", sim="", entry="teaser"),
}
_LOCAL_ONLY_CHECKPOINT_DEMOS = {"sharpa_appo_student"}
def _repo_root() -> Path:
return Path(__file__).resolve().parents[2]
[docs]
def get_demo_spec(demo_name: str) -> DemoSpec:
try:
return DEMO_REGISTRY[demo_name]
except KeyError as exc:
available = ", ".join(sorted(DEMO_REGISTRY))
raise SystemExit(f"Unknown demo {demo_name!r}. Available demos: {available}") from exc
def _checkpoint_relative_path(demo_name: str) -> str:
return f"checkpoints/{demo_name}/model_0.pt"
def _local_checkpoint_path(demo_name: str) -> Path:
return ASSETS_ROOT_PATH / _checkpoint_relative_path(demo_name)
def _refresh_local_checkpoint(demo_name: str) -> None:
local = _local_checkpoint_path(demo_name)
if local.exists():
local.unlink()
def _resolve_demo_checkpoint(demo_name: str) -> str | None:
local = _local_checkpoint_path(demo_name)
if demo_name not in _LOCAL_ONLY_CHECKPOINT_DEMOS:
resolved = resolve_checkpoint_file(_checkpoint_relative_path(demo_name))
assert isinstance(resolved, str)
return resolved
if local.exists():
return str(local)
print(
f"Local checkpoint not found for demo {demo_name!r}: {local}\n"
"Checkpoint not found; no download source is used for this demo.\n"
"Place the stage-2 PyTorch checkpoint at this path and run the demo again."
)
return None
def _build_play_interactive_command(
*,
spec: DemoSpec,
checkpoint_path: str,
extra_overrides: Sequence[str],
root: Path | None = None,
) -> list[str]:
selected_root = root or _repo_root()
script = selected_root / "scripts" / "play_interactive.py"
if not script.is_file():
raise SystemExit(f"Entrypoint script not found: {script}")
if spec.algo in {"sac", "flashsac"}:
owner_yaml = (
selected_root
/ "conf"
/ "offpolicy"
/ "task"
/ spec.algo
/ spec.task
/ f"{spec.sim}.yaml"
)
elif spec.algo == "hora_distill":
owner_yaml = (
selected_root / "conf" / "hora_distill" / "task" / spec.task / f"{spec.sim}.yaml"
)
else:
owner_yaml = selected_root / "conf" / spec.algo / "task" / spec.task / f"{spec.sim}.yaml"
if not owner_yaml.is_file():
raise SystemExit(
f"No owner config exists for algo={spec.algo}, task={spec.task}, sim={spec.sim}: "
f"{owner_yaml}"
)
command = [
*_play_interactive_command_prefix(selected_root),
str(script),
"--algo",
spec.algo,
"--task",
spec.task,
"--sim",
spec.sim,
]
command.extend(
[
f"algo.load_run={checkpoint_path}",
*extra_overrides,
]
)
return command
def _play_interactive_command_prefix(root: Path) -> list[str]:
if platform.system() != "Darwin":
return [sys.executable]
_ensure_mujoco_uni_mjpython_app(root)
return [_current_env_mjpython()]
def _official_mujoco_env(root: Path) -> Path:
return root / ".tmp" / "mjpython-demo"
def _official_mujoco_app_mjpython(env_root: Path) -> Path:
site_packages = (
env_root
/ "lib"
/ f"python{sys.version_info.major}.{sys.version_info.minor}"
/ "site-packages"
)
return site_packages / "mujoco" / "MuJoCo_(mjpython).app" / "Contents" / "MacOS" / "mjpython"
def _official_mujoco_app(env_root: Path) -> Path:
return _official_mujoco_app_mjpython(env_root).parents[2]
def _mujoco_package_dir() -> Path:
spec = importlib.util.find_spec("mujoco")
if spec is None or spec.origin is None:
raise SystemExit("macOS MuJoCo demos require the project mujoco-uni package.")
return Path(spec.origin).resolve().parent
def _mujoco_uni_mjpython_app() -> Path:
return _mujoco_package_dir() / "MuJoCo_(mjpython).app"
def _ensure_official_mujoco_env(root: Path) -> Path:
env_root = _official_mujoco_env(root)
env_mjpython = env_root / "bin" / "mjpython"
if env_mjpython.is_file() and _official_mujoco_app_mjpython(env_root).is_file():
return env_root
uv = shutil.which("uv")
if uv is None:
raise SystemExit(
"macOS MuJoCo demos require `uv` to create an isolated official MuJoCo "
"mjpython environment."
)
env_root.parent.mkdir(parents=True, exist_ok=True)
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
subprocess.run([uv, "venv", str(env_root), "--python", python_version], check=True)
subprocess.run(
[
uv,
"pip",
"install",
"--python",
str(env_root / "bin" / "python"),
_OFFICIAL_MUJOCO_PACKAGE,
],
check=True,
)
if not env_mjpython.is_file() or not _official_mujoco_app_mjpython(env_root).is_file():
raise SystemExit(
"Failed to create the isolated official MuJoCo mjpython environment for "
"macOS demo playback."
)
return env_root
def _ensure_mujoco_uni_mjpython_app(root: Path) -> None:
dest = _mujoco_uni_mjpython_app()
if (dest / "Contents" / "MacOS" / "mjpython").is_file():
return
official_env = _ensure_official_mujoco_env(root)
shutil.copytree(_official_mujoco_app(official_env), dest, dirs_exist_ok=True)
if not (dest / "Contents" / "MacOS" / "mjpython").is_file():
raise SystemExit(f"Failed to materialize MuJoCo mjpython app at {dest}")
def _current_env_mjpython() -> str:
if Path(sys.executable).name == "mjpython":
return sys.executable
venv_mjpython = Path(sys.executable).with_name("mjpython")
if venv_mjpython.is_file():
return str(venv_mjpython)
mjpython = shutil.which("mjpython")
if mjpython is not None:
return mjpython
raise SystemExit("macOS MuJoCo demos require `mjpython` in the active environment.")
[docs]
def build_demo_command(
*,
demo_name: str,
checkpoint_path: str,
device: str | None = None,
root: Path | None = None,
) -> list[str]:
"""Assemble the subprocess command for a given demo + resolved checkpoint."""
from unilab.cli import build_command
spec = get_demo_spec(demo_name)
extra: list[str] = []
if device is not None:
extra.append(f"training.device={device}")
if spec.entry == "eval":
return build_command(
mode="eval",
algo=spec.algo,
task=spec.task,
sim=spec.sim,
overrides=[f"algo.load_run={checkpoint_path}", *extra],
root=root,
)
if spec.entry == "play_interactive":
return _build_play_interactive_command(
spec=spec, checkpoint_path=checkpoint_path, extra_overrides=extra, root=root
)
if spec.entry == "teaser":
raise SystemExit(
f"Demo {demo_name!r} is a renderer-only entry and has no subprocess command."
)
raise SystemExit(f"Unknown demo entry kind: {spec.entry!r}")
def _mxpython_executable() -> str:
from unilab.cli import _mxpython_executable as resolve_mxpython_executable
return resolve_mxpython_executable()
def _run_teaser_demo() -> int:
if platform.system() == "Darwin" and Path(sys.executable).name != "mxpython":
command = [
_mxpython_executable(),
str(_repo_root() / "src" / "unilab" / "tools" / "render_teaser.py"),
]
env = os.environ.copy()
env.setdefault("UV_PROJECT_ENVIRONMENT", str(_repo_root() / ".venv"))
return subprocess.run(command, check=False, env=env).returncode
from unilab.tools.render_teaser import main as render_teaser_main
render_teaser_main()
return 0
[docs]
def run_demo(*, demo_name: str, refresh: bool = False, device: str | None = None) -> int:
spec = get_demo_spec(demo_name)
if spec.entry == "teaser":
return _run_teaser_demo()
if refresh and demo_name not in _LOCAL_ONLY_CHECKPOINT_DEMOS:
_refresh_local_checkpoint(demo_name)
elif refresh:
print(f"Refresh ignored for local-only demo {demo_name!r}; no download source is used.")
checkpoint_path = _resolve_demo_checkpoint(demo_name)
if checkpoint_path is None:
return 1
command = build_demo_command(
demo_name=demo_name, checkpoint_path=checkpoint_path, device=device
)
env = os.environ.copy()
env.setdefault("UV_PROJECT_ENVIRONMENT", str(_repo_root() / ".venv"))
returncode = subprocess.run(command, check=False, env=env).returncode
if returncode == 0:
print(f"Demo finished: {demo_name} (algo={spec.algo}, task={spec.task}, sim={spec.sim})")
return returncode