Source code for unilab.base.backend.motrix.playback
"""Motrix-owned playback execution helpers."""
from __future__ import annotations
import time
from os import PathLike
from typing import Any, Callable, TypeVar
import numpy as np
from unilab.base.backend.playback_common import env_cfg_value
ObsT = TypeVar("ObsT")
[docs]
def run_motrix_playback(
*,
backend: Any,
env: Any,
initialize: Callable[[], ObsT],
step: Callable[[ObsT], ObsT],
num_steps: int | None,
output_video: str | PathLike[str] | None,
render_spacing: float | None,
render_offset_mode: str | None,
headless: bool,
record_video: bool,
camera_kwargs: dict[str, Any] | None,
extra_data_getter: Callable[[], np.ndarray | None] | None = None,
) -> str | None:
del extra_data_getter
if record_video and not headless:
raise ValueError("Motrix video recording requires headless=true.")
if headless or record_video:
if num_steps is None:
raise ValueError("Motrix captured playback requires a finite num_steps value.")
if record_video and output_video is None:
raise ValueError("Motrix video recording requires an output_video path.")
effective_spacing = (
float(render_spacing)
if render_spacing is not None
else float(env_cfg_value(env, "render_spacing", 1.0))
)
backend.init_renderer(
spacing=effective_spacing,
offset_mode=str(render_offset_mode) if render_offset_mode is not None else "grid",
headless=headless,
capture=True,
width=1280,
height=720,
camera_kwargs=dict(camera_kwargs or {}),
)
obs = initialize()
frames: list[np.ndarray] | None = [] if record_video else None
for _ in range(num_steps):
obs = step(obs)
frame = np.asarray(backend.capture_video_frame(), dtype=np.uint8)
if frames is not None:
frames.append(frame.copy())
if not record_video:
return None
assert output_video is not None
assert frames is not None
import mediapy as media
ctrl_dt = float(env_cfg_value(env, "ctrl_dt", 1.0 / 60.0))
media.write_video(str(output_video), frames, fps=int(1.0 / ctrl_dt))
return str(output_video)
effective_spacing = (
float(render_spacing)
if render_spacing is not None
else float(env_cfg_value(env, "render_spacing", 1.0))
)
backend.init_renderer(
spacing=effective_spacing,
offset_mode=str(render_offset_mode) if render_offset_mode is not None else "grid",
)
obs = initialize()
last_render_time = time.perf_counter()
render_dt = 1.0 / 60.0
steps_run = 0
while num_steps is None or steps_run < num_steps:
obs = step(obs)
current_time = time.perf_counter()
elapsed = current_time - last_render_time
if elapsed < render_dt:
time.sleep(render_dt - elapsed)
last_render_time = time.perf_counter()
backend.render()
steps_run += 1
return None