"""G1 Motion Tracking Environment - Motion imitation task."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal, cast
import numpy as np
from unilab.assets import ASSETS_ROOT_PATH
from unilab.base import registry
from unilab.base.backend import create_backend
from unilab.base.np_env import NpEnvState
from unilab.base.scene import SceneCfg
from unilab.dr import (
DomainRandomizationCapabilities,
DomainRandomizationProvider,
IntervalRandomizationPlan,
ResetPlan,
)
from unilab.dr.dr_utils import (
build_common_reset_randomization,
build_interval_push_plan,
validate_common_reset_randomization,
validate_interval_push_support,
zero_actions,
)
from unilab.dr.types import RESET_TERM_GEOM_FRICTION, ResetRandomizationPayload
from unilab.dtype_config import get_global_dtype
from unilab.envs.common.math import np_sample_uniform
from unilab.envs.common.rotation import (
np_quat_apply,
np_quat_from_euler_xyz,
np_quat_inv,
np_quat_mul,
)
from unilab.envs.locomotion.g1.base import G1BaseCfg, G1BaseEnv
from .motion_loader import MotionData, MotionLoader, MotionSampler
[docs]
@dataclass
class RewardConfig:
"""Reward configuration for motion tracking."""
scales: dict[str, float] = field(
default_factory=lambda: {
"motion_global_root_pos": 0.5,
"motion_global_root_ori": 0.5,
"motion_body_pos": 1.0,
"motion_body_ori": 1.0,
"motion_body_lin_vel": 1.0,
"motion_body_ang_vel": 1.0,
"motion_ee_body_pos_z": 0.0,
"motion_joint_pos": 0.0,
"motion_joint_vel": 0.0,
"action_rate_l2": -0.1,
"joint_limit": -10.0,
}
)
# Standard deviations for exponential rewards
std_root_pos: float = 0.3
std_root_ori: float = 0.4
std_body_pos: float = 0.3
std_body_ori: float = 0.4
std_body_lin_vel: float = 1.0
std_body_ang_vel: float = 3.14
std_joint_pos: float = 0.2
std_joint_vel: float = 1.0
[docs]
@dataclass
class PoseRandomization:
"""Pose randomization ranges for reset."""
x: tuple[float, float] = (-0.05, 0.05)
y: tuple[float, float] = (-0.05, 0.05)
z: tuple[float, float] = (-0.01, 0.01)
roll: tuple[float, float] = (-0.1, 0.1)
pitch: tuple[float, float] = (-0.1, 0.1)
yaw: tuple[float, float] = (-0.2, 0.2)
[docs]
@dataclass
class VelocityRandomization:
"""Velocity randomization ranges for reset."""
x: tuple[float, float] = (-0.5, 0.5)
y: tuple[float, float] = (-0.5, 0.5)
z: tuple[float, float] = (-0.2, 0.2)
roll: tuple[float, float] = (-0.52, 0.52)
pitch: tuple[float, float] = (-0.52, 0.52)
yaw: tuple[float, float] = (-0.78, 0.78)
[docs]
@dataclass
class Domain_Rand:
"""Domain randomization config required by motrix backend hooks."""
randomize_base_mass: bool = False
added_mass_range: list[float] = field(default_factory=lambda: [-1.5, 1.5])
random_com: bool = False
com_offset_x: list[float] = field(default_factory=lambda: [-0.05, 0.05])
com_offset_y: list[float] = field(default_factory=lambda: [-0.05, 0.05])
com_offset_z: list[float] = field(default_factory=lambda: [-0.05, 0.05])
randomize_gravity: bool = False
gravity_range: list[list[float]] = field(
default_factory=lambda: [[0.0, 0.0, -9.81], [0.0, 0.0, -9.81]]
)
push_robots: bool = False
push_interval: int = 750
max_force: list[float] = field(default_factory=lambda: [1.0, 1.0, 0.5])
push_body_name: str | None = None
randomize_kp: bool = False
kp_multiplier_range: list[float] = field(default_factory=lambda: [0.9, 1.1])
randomize_kd: bool = False
kd_multiplier_range: list[float] = field(default_factory=lambda: [0.9, 1.1])
randomize_geom_friction: bool = False
friction_range: list[float] = field(default_factory=lambda: [0.3, 1.2])
friction_geom_pattern: str = r"^(left|right)_foot[1-7]_collision$"
randomize_joint_default_pos: bool = False
joint_default_pos_range: list[float] = field(default_factory=lambda: [-0.01, 0.01])
[docs]
@dataclass
class G1MotionTrackingCfg(G1BaseCfg):
"""Configuration for G1 motion tracking environment."""
scene: SceneCfg = field(
default_factory=lambda: SceneCfg(
model_file=str(ASSETS_ROOT_PATH / "robots" / "g1" / "scene_flat.xml")
)
)
# Kept at the historical single-clip default for backward compatibility.
motion_file: str | list[str] = str(
ASSETS_ROOT_PATH / "motions" / "g1" / "dance1_subject2_part.npz"
)
# motion_file: str | list[str] = str(ASSETS_ROOT_PATH / "motions" / "g1" / "gangnam_style.npz")
# motion_file: str | list[str] = str(ASSETS_ROOT_PATH / "motions" / "g1" / "fight1_subject5_from_csv.npz") #LAFAN
# motion_file: str | list[str] = str(ASSETS_ROOT_PATH / "motions" / "g1" / "dance_basic_slide_180_R_loop_001__A322_M.npz") #LAFAN
# motion_file: str | list[str] = str(ASSETS_ROOT_PATH / "motions" / "g1" / "playing_violin_R_003__A327_from_csv.npz") #Seed
anchor_body_name: str = "torso_link"
body_names: tuple[str, ...] = (
"pelvis",
"left_hip_roll_link",
"left_knee_link",
"left_ankle_roll_link",
"right_hip_roll_link",
"right_knee_link",
"right_ankle_roll_link",
"torso_link",
"left_shoulder_roll_link",
"left_elbow_link",
"left_wrist_yaw_link",
"right_shoulder_roll_link",
"right_elbow_link",
"right_wrist_yaw_link",
)
sampling_mode: Literal["start", "clip_start", "uniform", "adaptive", "mixed"] = "adaptive"
sampling_start_ratio: float = 0.0
truncate_on_clip_end: bool = False
max_episode_seconds: float = 10.0
reward_config: RewardConfig = field(default_factory=RewardConfig)
pose_randomization: PoseRandomization = field(default_factory=PoseRandomization)
velocity_randomization: VelocityRandomization = field(default_factory=VelocityRandomization)
domain_rand: Domain_Rand = field(default_factory=Domain_Rand)
joint_position_range: tuple[float, float] = (-0.1, 0.1)
# Termination thresholds
anchor_pos_z_threshold: float = 0.25
anchor_ori_threshold: float = 0.8
ee_body_pos_z_threshold: float = 0.25
ee_body_names: tuple[str, ...] = (
"left_ankle_roll_link",
"right_ankle_roll_link",
"left_wrist_yaw_link",
"right_wrist_yaw_link",
)
undesired_contact_z_threshold: float = 0.05
terminate_on_undesired_contacts: bool = False
[docs]
@registry.envcfg("G1MotionTracking")
@dataclass
class G1MotionTrackingEnvCfg(G1MotionTrackingCfg):
"""Registered configuration for G1 motion tracking."""
pass
[docs]
@registry.envcfg("G1MotionTrackingDeploy")
@dataclass
class G1MotionTrackingDeployEnvCfg(G1MotionTrackingCfg):
"""Registered deploy configuration for G1 motion tracking."""
pass
def _build_motion_reference_state(
env: Any, env_ids: np.ndarray, motion_data: MotionData
) -> tuple[np.ndarray, np.ndarray]:
dtype = get_global_dtype()
num_reset = len(env_ids)
root_pos = motion_data.body_pos_w[:, 0].copy()
root_ori = motion_data.body_quat_w[:, 0].copy()
root_lin_vel = motion_data.body_lin_vel_w[:, 0].copy()
root_ang_vel = motion_data.body_ang_vel_w[:, 0].copy()
joint_pos = motion_data.joint_pos.copy()
joint_vel = motion_data.joint_vel.copy()
pose_rand = env.cfg.pose_randomization
pose_ranges = [
(pose_rand.x[0], pose_rand.x[1]),
(pose_rand.y[0], pose_rand.y[1]),
(pose_rand.z[0], pose_rand.z[1]),
(pose_rand.roll[0], pose_rand.roll[1]),
(pose_rand.pitch[0], pose_rand.pitch[1]),
(pose_rand.yaw[0], pose_rand.yaw[1]),
]
pose_samples = np.array(
[[np.random.uniform(low, high) for low, high in pose_ranges] for _ in range(num_reset)],
dtype=dtype,
)
root_pos += pose_samples[:, 0:3]
root_ori = np_quat_mul(
np_quat_from_euler_xyz(pose_samples[:, 3], pose_samples[:, 4], pose_samples[:, 5]),
root_ori,
)
vel_rand = env.cfg.velocity_randomization
vel_ranges = [
(vel_rand.x[0], vel_rand.x[1]),
(vel_rand.y[0], vel_rand.y[1]),
(vel_rand.z[0], vel_rand.z[1]),
(vel_rand.roll[0], vel_rand.roll[1]),
(vel_rand.pitch[0], vel_rand.pitch[1]),
(vel_rand.yaw[0], vel_rand.yaw[1]),
]
vel_samples = np.array(
[[np.random.uniform(low, high) for low, high in vel_ranges] for _ in range(num_reset)],
dtype=dtype,
)
root_lin_vel += vel_samples[:, :3]
root_ang_vel += vel_samples[:, 3:]
joint_pos += np_sample_uniform(
env.cfg.joint_position_range[0],
env.cfg.joint_position_range[1],
joint_pos.shape,
dtype=np.float32,
)
joint_range = env._get_joint_range()
if joint_range is not None:
joint_pos = np.clip(joint_pos, joint_range[:, 0], joint_range[:, 1])
qpos = np.tile(env._init_qpos, (num_reset, 1))
qvel = np.tile(env._init_qvel, (num_reset, 1))
qpos[:, 0:3] = root_pos
qpos[:, 3:7] = root_ori
qpos[:, 7:] = joint_pos
qvel[:, 0:3] = root_lin_vel
qvel[:, 3:6] = np_quat_apply(np_quat_inv(root_ori), root_ang_vel)
qvel[:, 6:] = joint_vel
return qpos, qvel
def _gravity_z_in_body_from_quat_w(quat_w: np.ndarray) -> np.ndarray:
"""Z component of world gravity ``[0, 0, -1]`` expressed in body frame."""
return 2.0 * (quat_w[:, 1] * quat_w[:, 1] + quat_w[:, 2] * quat_w[:, 2]) - 1.0
def _write_motion_anchor_transform(
robot_anchor_pos_w: np.ndarray,
robot_anchor_quat_w: np.ndarray,
anchor_pos_w: np.ndarray,
anchor_quat_w: np.ndarray,
out_pos: np.ndarray,
out_ori6: np.ndarray,
) -> None:
aw = robot_anchor_quat_w[:, 0]
ax = robot_anchor_quat_w[:, 1]
ay = robot_anchor_quat_w[:, 2]
az = robot_anchor_quat_w[:, 3]
vx = anchor_pos_w[:, 0] - robot_anchor_pos_w[:, 0]
vy = anchor_pos_w[:, 1] - robot_anchor_pos_w[:, 1]
vz = anchor_pos_w[:, 2] - robot_anchor_pos_w[:, 2]
qx = -ax
qy = -ay
qz = -az
tx = 2 * (qy * vz - qz * vy)
ty = 2 * (qz * vx - qx * vz)
tz = 2 * (qx * vy - qy * vx)
out_pos[:, 0] = vx + aw * tx + qy * tz - qz * ty
out_pos[:, 1] = vy + aw * ty + qz * tx - qx * tz
out_pos[:, 2] = vz + aw * tz + qx * ty - qy * tx
bw = anchor_quat_w[:, 0]
bx = anchor_quat_w[:, 1]
by = anchor_quat_w[:, 2]
bz = anchor_quat_w[:, 3]
rw = aw * bw + ax * bx + ay * by + az * bz
rx = aw * bx - ax * bw - ay * bz + az * by
ry = aw * by + ax * bz - ay * bw - az * bx
rz = aw * bz - ax * by + ay * bx - az * bw
xx = rx * rx
yy = ry * ry
zz = rz * rz
xy = rx * ry
xz = rx * rz
yz = ry * rz
wx = rw * rx
wy = rw * ry
wz = rw * rz
out_ori6[:, 0] = 1 - 2 * (yy + zz)
out_ori6[:, 1] = 2 * (xy - wz)
out_ori6[:, 2] = 2 * (xy + wz)
out_ori6[:, 3] = 1 - 2 * (xx + zz)
out_ori6[:, 4] = 2 * (xz - wy)
out_ori6[:, 5] = 2 * (yz + wx)
[docs]
class G1MotionTrackingDomainRandomizationProvider(DomainRandomizationProvider):
[docs]
def __init__(
self,
*,
base_kp: np.ndarray | None = None,
base_kd: np.ndarray | None = None,
base_geom_friction: np.ndarray | None = None,
foot_geom_ids: np.ndarray | None = None,
) -> None:
self._base_kp = base_kp
self._base_kd = base_kd
self._base_geom_friction = base_geom_friction
self._foot_geom_ids = foot_geom_ids
[docs]
def validate(self, env: Any, capabilities: DomainRandomizationCapabilities) -> None:
validate_common_reset_randomization(
env, capabilities, base_kp=self._base_kp, base_kd=self._base_kd
)
validate_interval_push_support(env, capabilities)
if getattr(env.cfg.domain_rand, "randomize_geom_friction", False):
if not capabilities.supports_reset_term(RESET_TERM_GEOM_FRICTION):
raise NotImplementedError(
f"{env._backend.backend_type} backend does not support "
"geom-friction reset randomization"
)
if (
self._base_geom_friction is None
or self._foot_geom_ids is None
or self._foot_geom_ids.size == 0
):
raise ValueError("randomize_geom_friction=True but provider has no foot geom IDs")
[docs]
def build_interval_randomization_plan(
self, env: Any, step_counter: int
) -> IntervalRandomizationPlan | None:
return build_interval_push_plan(env, step_counter)
[docs]
def build_reset_plan(self, env: Any, env_ids: np.ndarray) -> ResetPlan:
num_reset = len(env_ids)
motion_frames = env.motion_sampler.sample_frames(env_ids)
motion_data = env.motion_loader.get_motion_at_frame(motion_frames)
qpos, qvel = _build_motion_reference_state(env, env_ids, motion_data)
info_updates = {
"current_actions": zero_actions(num_reset, env._num_action),
"last_actions": zero_actions(num_reset, env._num_action),
}
randomization = build_common_reset_randomization(
env, num_reset, base_kp=self._base_kp, base_kd=self._base_kd
)
dr_cfg = env.cfg.domain_rand
if getattr(dr_cfg, "randomize_geom_friction", False):
assert self._base_geom_friction is not None
assert self._foot_geom_ids is not None
payload = randomization or ResetRandomizationPayload()
low, high = dr_cfg.friction_range
scale = np.random.uniform(low, high, size=(num_reset, 1)).astype(np.float64)
geom_friction = np.broadcast_to(
self._base_geom_friction,
(num_reset, *self._base_geom_friction.shape),
).copy()
geom_friction[:, self._foot_geom_ids, 0] = scale * np.ones(
(1, self._foot_geom_ids.size)
)
payload.geom_friction = geom_friction
randomization = payload
if getattr(dr_cfg, "randomize_joint_default_pos", False):
low, high = dr_cfg.joint_default_pos_range
info_updates["default_dof_pos_bias"] = np.random.uniform(
low, high, size=(num_reset, env._num_action)
).astype(get_global_dtype())
return ResetPlan(
env_ids=env_ids,
qpos=qpos,
qvel=qvel,
info_updates=info_updates,
randomization=randomization,
)
[docs]
def build_reset_observation(
self, env: Any, env_ids: np.ndarray, info_updates: dict[str, Any]
) -> dict[str, np.ndarray]:
motion_data = env.motion_loader.get_motion_at_frame(
env.motion_sampler.current_frames[env_ids]
)
linvel = env.get_local_linvel()[env_ids]
gyro = env.get_gyro()[env_ids]
dof_pos = env.get_dof_pos()[env_ids]
dof_vel = env.get_dof_vel()[env_ids]
all_pos_w, all_quat_w = env._get_body_pose_w()
obs_info = dict(info_updates)
default_dof_pos_bias = info_updates.get("default_dof_pos_bias")
if isinstance(default_dof_pos_bias, np.ndarray):
obs_info["default_dof_pos_bias"] = default_dof_pos_bias
obs_info["env_ids"] = env_ids
return cast(
dict[str, np.ndarray],
env._compute_obs(
obs_info,
motion_data,
linvel,
gyro,
dof_pos,
dof_vel,
all_pos_w[env_ids],
all_quat_w[env_ids],
),
)
[docs]
@registry.env("G1MotionTracking", sim_backend="mujoco")
@registry.env("G1MotionTracking", sim_backend="motrix")
class G1MotionTrackingEnv(G1BaseEnv):
"""G1 Motion Tracking Environment."""
_cfg: G1MotionTrackingCfg
[docs]
def __init__(self, cfg: G1MotionTrackingCfg, num_envs=1, backend_type="mujoco"):
if not cfg.motion_file:
raise ValueError("motion_file must be specified in config")
backend = create_backend(
backend_type,
cfg.scene,
num_envs,
cfg.sim_dt,
base_name=cfg.asset.base_name,
push_body_name=cfg.domain_rand.push_body_name,
add_body_sensors=True,
motrix_max_iterations=cfg.motrix_max_iterations,
post_step_forward_sensor=cfg.post_step_forward_sensor,
)
super().__init__(cfg, backend, num_envs)
# Resolve body IDs for backend querying and motion-file indexing.
self.body_ids = self._backend.get_body_ids(cfg.body_names)
motion_body_ids = self._backend.get_motion_body_ids(cfg.body_names)
self.anchor_body_idx = cfg.body_names.index(cfg.anchor_body_name)
# Get end-effector body indices for termination
self.ee_body_indices = np.array(
[cfg.body_names.index(name) for name in cfg.ee_body_names], dtype=np.int32
)
self._has_ee_body_indices = bool(self.ee_body_indices.size)
# Get non-EE body indices for undesired contact penalty
ee_set = set(cfg.ee_body_names)
self.undesired_contact_body_indices = np.array(
[i for i, name in enumerate(cfg.body_names) if name not in ee_set],
dtype=np.int32,
)
self._has_undesired_contact_body_indices = bool(self.undesired_contact_body_indices.size)
# Load motion data
self.motion_loader = MotionLoader(cfg.motion_file, body_indices=motion_body_ids)
self.motion_sampler = MotionSampler(
self.motion_loader,
mode=cfg.sampling_mode,
num_envs=num_envs,
start_ratio=cfg.sampling_start_ratio,
)
needs_kp_kd = cfg.domain_rand.randomize_kp or cfg.domain_rand.randomize_kd
needs_friction = getattr(cfg.domain_rand, "randomize_geom_friction", False)
base_kp = base_kd = None
if needs_kp_kd:
base_kp, base_kd = backend.get_actuator_gains()
base_geom_friction = None
foot_geom_ids = None
if needs_friction:
import re as _re
base_geom_friction = backend.get_geom_friction()
geom_names = backend.get_geom_names()
pattern = _re.compile(cfg.domain_rand.friction_geom_pattern)
foot_geom_ids = np.asarray(
[i for i, name in enumerate(geom_names) if name and pattern.match(name)],
dtype=np.int64,
)
if foot_geom_ids.size == 0:
raise ValueError(
"friction_geom_pattern "
f"'{cfg.domain_rand.friction_geom_pattern}' did not match any geom"
)
dr_provider = G1MotionTrackingDomainRandomizationProvider(
base_kp=base_kp,
base_kd=base_kd,
base_geom_friction=base_geom_friction,
foot_geom_ids=foot_geom_ids,
)
self._init_domain_randomization(dr_provider)
dtype = get_global_dtype()
n_body = len(cfg.body_names)
self._n_motion_bodies = n_body
self._actor_obs_width = self._actor_obs_dim(self._num_action)
self._critic_base_obs_width = self._critic_base_obs_dim(self._num_action)
self._critic_obs_width = self._critic_base_obs_width + n_body * 9
self._copy_body_state_w = self._backend.copy_body_state_w
# Buffers for relative body transforms
self.body_pos_relative_w = np.zeros((num_envs, n_body, 3), dtype=dtype)
self.body_quat_relative_w = np.zeros((num_envs, n_body, 4), dtype=dtype)
self.body_quat_relative_w[:, :, 0] = 1.0 # Initialize to identity quaternion
self._motion_data_buffer = (
self.motion_loader.make_motion_data_buffer(num_envs)
if hasattr(self.motion_loader, "make_motion_data_buffer")
else None
)
self._zero_actions = np.zeros((num_envs, self._num_action), dtype=dtype)
self._joint_range = self._backend.get_joint_range()
if self._joint_range is not None:
self._joint_range = np.asarray(self._joint_range, dtype=dtype)
self._joint_lower = self._joint_range[:, 0]
self._joint_upper = self._joint_range[:, 1]
else:
self._joint_lower = None
self._joint_upper = None
self._delta_pos_w = np.empty((num_envs, 3), dtype=dtype)
self._delta_ori_w = np.empty((num_envs, 4), dtype=dtype)
self._motion_anchor_pos_b = np.empty((num_envs, 3), dtype=dtype)
self._motion_anchor_ori_b = np.empty((num_envs, 6), dtype=dtype)
self._motion_command = np.empty((num_envs, self._num_action * 2), dtype=dtype)
self._joint_pos_rel = np.empty((num_envs, self._num_action), dtype=dtype)
self._robot_body_pos_w = np.empty((num_envs, n_body, 3), dtype=dtype)
self._robot_body_quat_w = np.empty((num_envs, n_body, 4), dtype=dtype)
self._robot_body_lin_vel_w = np.empty((num_envs, n_body, 3), dtype=dtype)
self._robot_body_ang_vel_w = np.empty((num_envs, n_body, 3), dtype=dtype)
self._quat_error_w = np.empty((num_envs, n_body), dtype=dtype)
self._quat_error_x = np.empty((num_envs, n_body), dtype=dtype)
self._body_vec_error = np.empty((num_envs, n_body, 3), dtype=dtype)
self._body_vec_tmp = np.empty((num_envs, n_body, 3), dtype=dtype)
self._joint_error = np.empty((num_envs, self._num_action), dtype=dtype)
self._joint_error_upper = np.empty((num_envs, self._num_action), dtype=dtype)
self._env_error = np.empty((num_envs,), dtype=dtype)
self._env_error2 = np.empty((num_envs,), dtype=dtype)
self._reward_term = np.empty((num_envs,), dtype=dtype)
self._weighted_reward = np.empty((num_envs,), dtype=dtype)
self._terminated = np.empty((num_envs,), dtype=bool)
self._env_bool = np.empty((num_envs,), dtype=bool)
self._ee_pos_error_z = np.empty((num_envs, self.ee_body_indices.size), dtype=dtype)
self._ee_terminated = np.empty((num_envs, self.ee_body_indices.size), dtype=bool)
self._undesired_contact_mask = np.empty(
(num_envs, self.undesired_contact_body_indices.size), dtype=bool
)
self._enable_reward_log = True
self._init_reward_functions()
self._active_reward_fns = {
name: reward_fn
for name, reward_fn in self._reward_fns.items()
if self._reward_term_is_active(name)
}
self._clip_end_truncated = np.zeros((num_envs,), dtype=bool)
def _effective_default_angles(self, env_ids: np.ndarray | None = None) -> np.ndarray:
"""Return default_angles with per-episode joint-default-pos bias applied."""
state = getattr(self, "_state", None)
if state is not None:
bias = state.info.get("default_dof_pos_bias")
if bias is not None:
if env_ids is not None:
return self.default_angles + bias[env_ids]
return self.default_angles + bias
return self.default_angles
[docs]
def apply_action(self, actions: np.ndarray, state: NpEnvState) -> np.ndarray:
state.info["last_actions"] = state.info.get("current_actions", np.zeros_like(actions))
state.info["current_actions"] = actions
exec_actions = (
state.info["last_actions"]
if self._cfg.control_config.simulate_action_latency
else actions
)
bias = state.info.get("default_dof_pos_bias")
base = self.default_angles + bias if bias is not None else self.default_angles
ctrl: np.ndarray = exec_actions * self._cfg.control_config.action_scale + base
return ctrl
def _resample_reference_state(self, env_ids: np.ndarray) -> None:
motion_frames = self.motion_sampler.sample_frames(env_ids)
motion_data = self.motion_loader.get_motion_at_frame(motion_frames)
qpos, qvel = _build_motion_reference_state(self, env_ids, motion_data)
self._backend.set_state(env_ids, qpos, qvel)
def _refresh_observation_rows(
self, obs: dict[str, np.ndarray], info: dict, env_ids: np.ndarray
) -> None:
motion_data = self.motion_loader.get_motion_at_frame(
self.motion_sampler.current_frames[env_ids]
)
row_ids = np.asarray(env_ids, dtype=np.intp)
linvel = self._backend.get_sensor_data_rows(self._cfg.sensor.local_linvel, row_ids)
gyro = self._backend.get_sensor_data_rows(self._cfg.sensor.gyro, row_ids)
dof_pos = self.get_dof_pos()[row_ids]
dof_vel = self.get_dof_vel()[row_ids]
robot_body_pos_w, robot_body_quat_w = self._backend.get_body_pose_w_rows(
row_ids, self.body_ids
)
obs_info: dict[str, Any] = {}
current_actions = info.get("current_actions")
if isinstance(current_actions, np.ndarray):
obs_info["current_actions"] = current_actions[env_ids]
obs_info["env_ids"] = env_ids
refreshed_obs = self._compute_obs(
obs_info,
motion_data,
linvel,
gyro,
dof_pos,
dof_vel,
robot_body_pos_w,
robot_body_quat_w,
)
for key, value in refreshed_obs.items():
if value.shape[0] == len(env_ids):
obs[key][env_ids] = value
else:
obs[key][env_ids] = value[env_ids]
def _get_body_pose_w(self) -> tuple[np.ndarray, np.ndarray]:
return self._backend.get_body_pose_w(self.body_ids)
def _get_body_state_w(self) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
copy_body_state_w = self._copy_body_state_w
if copy_body_state_w is not None:
return copy_body_state_w(
self.body_ids,
self._robot_body_pos_w,
self._robot_body_quat_w,
self._robot_body_lin_vel_w,
self._robot_body_ang_vel_w,
)
robot_body_pos_w, robot_body_quat_w = self._get_body_pose_w()
robot_body_lin_vel_w, robot_body_ang_vel_w = self._backend.get_body_vel_w(self.body_ids)
return (
robot_body_pos_w,
robot_body_quat_w,
robot_body_lin_vel_w,
robot_body_ang_vel_w,
)
def _get_joint_range(self) -> np.ndarray | None:
return self._joint_range
def _get_current_motion(self) -> MotionData:
if self._motion_data_buffer is None:
return self.motion_sampler.get_current_motion()
return self.motion_sampler.get_current_motion(self._motion_data_buffer)
@property
def obs_groups_spec(self) -> dict[str, int]:
# Actor: command(2n) + motion_anchor_pos_b(3) + motion_anchor_ori_b(6)
# + linvel(3) + gyro(3) + joint_pos(n) + joint_vel(n) + actions(n)
# Critic mirrors BeyondMimic physical terms without actor observation noise:
# command, motion anchor, robot body pos/ori, linvel, gyro, joints, actions.
n = self._num_action
actor_width = getattr(self, "_actor_obs_width", self._actor_obs_dim(n))
critic_width = getattr(
self,
"_critic_obs_width",
self._critic_base_obs_dim(n) + len(self._cfg.body_names) * 9,
)
return {"obs": actor_width, "critic": critic_width}
def _actor_obs_dim(self, n: int) -> int:
return 3 + 6 + 3 + 3 + n * 5
def _critic_base_obs_dim(self, n: int) -> int:
return 3 + 6 + 3 + 3 + n * 5
def _build_actor_obs(
self,
*,
command: np.ndarray,
motion_anchor_pos_b: np.ndarray,
motion_anchor_ori_b: np.ndarray,
noisy_linvel: np.ndarray,
noisy_gyro: np.ndarray,
noisy_joint_pos_rel: np.ndarray,
noisy_dof_vel: np.ndarray,
last_actions: np.ndarray,
) -> np.ndarray:
num_envs = command.shape[0]
n_action = noisy_joint_pos_rel.shape[1]
actor_obs = np.empty((num_envs, self._actor_obs_dim(n_action)), dtype=get_global_dtype())
offset = 0
actor_obs[:, offset : offset + command.shape[1]] = command
offset += command.shape[1]
actor_obs[:, offset : offset + 3] = motion_anchor_pos_b
offset += 3
actor_obs[:, offset : offset + 6] = motion_anchor_ori_b
offset += 6
actor_obs[:, offset : offset + 3] = noisy_linvel
offset += 3
actor_obs[:, offset : offset + 3] = noisy_gyro
offset += 3
actor_obs[:, offset : offset + n_action] = noisy_joint_pos_rel
offset += n_action
actor_obs[:, offset : offset + n_action] = noisy_dof_vel
offset += n_action
actor_obs[:, offset : offset + n_action] = last_actions
return actor_obs
def _init_reward_functions(self):
self._reward_fns = {
"motion_global_root_pos": self._reward_motion_global_root_pos,
"motion_global_root_ori": self._reward_motion_global_root_ori,
"motion_body_pos": self._reward_motion_body_pos,
"motion_body_ori": self._reward_motion_body_ori,
"motion_body_lin_vel": self._reward_motion_body_lin_vel,
"motion_body_ang_vel": self._reward_motion_body_ang_vel,
"motion_ee_body_pos_z": self._reward_motion_ee_body_pos_z,
"motion_joint_pos": self._reward_motion_joint_pos,
"motion_joint_vel": self._reward_motion_joint_vel,
"action_rate_l2": self._reward_action_rate_l2,
"joint_limit": self._reward_joint_limit,
"undesired_contacts": self._reward_undesired_contacts,
}
def _reward_term_is_active(self, name: str) -> bool:
if name == "joint_limit":
return self._joint_lower is not None and self._joint_upper is not None
if name == "undesired_contacts":
return self._has_undesired_contact_body_indices
if name == "motion_ee_body_pos_z":
return self._has_ee_body_indices
return True
[docs]
def update_state(self, state: NpEnvState) -> NpEnvState:
self._clip_end_truncated.fill(False)
# Get current motion data
motion_data = self._get_current_motion()
# Get robot state
linvel = self.get_local_linvel()
gyro = self.get_gyro()
dof_pos = self.get_dof_pos()
dof_vel = self.get_dof_vel()
# Get body states
(
robot_body_pos_w,
robot_body_quat_w,
robot_body_lin_vel_w,
robot_body_ang_vel_w,
) = self._get_body_state_w()
# Compute relative body transforms (for observations and rewards)
self._update_relative_transforms(motion_data, robot_body_pos_w, robot_body_quat_w)
# Compute terminations
terminated = self._compute_terminations(motion_data, robot_body_pos_w, robot_body_quat_w)
# Update failure statistics for adaptive sampling
self.motion_sampler.update_failure_stats(terminated)
# Compute reward
reward = self._compute_reward(
state.info,
motion_data,
robot_body_pos_w,
robot_body_quat_w,
robot_body_lin_vel_w,
robot_body_ang_vel_w,
dof_pos,
dof_vel,
)
# Compute observations
obs = self._compute_obs(
state.info,
motion_data,
linvel,
gyro,
dof_pos,
dof_vel,
robot_body_pos_w,
robot_body_quat_w,
)
# Advance motion frames
done_env_ids = self.motion_sampler.step()
if len(done_env_ids) > 0:
if self._cfg.truncate_on_clip_end:
self._clip_end_truncated[done_env_ids] = True
else:
# Match BeyondMimic: clip boundaries are command resampling points, not
# episode boundaries; sync the simulated robot to the new reference.
resample_env_ids = done_env_ids[~terminated[done_env_ids]]
if len(resample_env_ids) > 0:
self._resample_reference_state(resample_env_ids)
self._refresh_observation_rows(obs, state.info, resample_env_ids)
return state.replace(obs=obs, reward=reward, terminated=terminated)
def _compute_truncated(self, state: NpEnvState) -> np.ndarray:
truncated = super()._compute_truncated(state)
clip_end_only = getattr(self, "_env_bool", None)
if clip_end_only is None or clip_end_only.shape != (self._num_envs,):
clip_end_only = np.empty((self._num_envs,), dtype=bool)
self._env_bool = clip_end_only
np.logical_not(state.terminated, out=clip_end_only)
np.logical_and(self._clip_end_truncated, clip_end_only, out=clip_end_only)
np.logical_or(truncated, clip_end_only, out=truncated)
return truncated
def _update_relative_transforms(
self, motion_data, robot_body_pos_w: np.ndarray, robot_body_quat_w: np.ndarray
):
"""Update relative body transforms for tracking."""
# Get anchor states
anchor_pos_w = motion_data.body_pos_w[:, self.anchor_body_idx]
anchor_quat_w = motion_data.body_quat_w[:, self.anchor_body_idx]
robot_anchor_pos_w = robot_body_pos_w[:, self.anchor_body_idx]
robot_anchor_quat_w = robot_body_quat_w[:, self.anchor_body_idx]
# Compute delta transform: keep robot's XY position, use motion's Z height
# and apply yaw-only rotation difference.
delta_pos_w = self._delta_pos_w
delta_pos_w[:] = robot_anchor_pos_w
delta_pos_w[:, 2] = anchor_pos_w[:, 2]
# Compute yaw-only rotation difference, equivalent to
# np_yaw_quat(np_quat_mul(robot_anchor_quat_w, np_quat_inv(anchor_quat_w))).
delta_ori_w = self._delta_ori_w
rw, rx, ry, rz = (
robot_anchor_quat_w[:, 0],
robot_anchor_quat_w[:, 1],
robot_anchor_quat_w[:, 2],
robot_anchor_quat_w[:, 3],
)
aw, ax, ay, az = (
anchor_quat_w[:, 0],
anchor_quat_w[:, 1],
anchor_quat_w[:, 2],
anchor_quat_w[:, 3],
)
qw = rw * aw + rx * ax + ry * ay + rz * az
qx = -rw * ax + rx * aw - ry * az + rz * ay
qy = -rw * ay + rx * az + ry * aw - rz * ax
qz = -rw * az - rx * ay + ry * ax + rz * aw
half_yaw = 0.5 * np.arctan2(2 * (qw * qz + qx * qy), 1 - 2 * (qy * qy + qz * qz))
np.cos(half_yaw, out=delta_ori_w[:, 0])
delta_ori_w[:, 1:3] = 0.0
np.sin(half_yaw, out=delta_ori_w[:, 3])
dw1 = delta_ori_w[:, 0]
dz1 = delta_ori_w[:, 3]
dw = dw1[:, None]
dz = dz1[:, None]
mw = motion_data.body_quat_w[..., 0]
mx = motion_data.body_quat_w[..., 1]
my = motion_data.body_quat_w[..., 2]
mz = motion_data.body_quat_w[..., 3]
out_quat = self.body_quat_relative_w
out_quat[..., 0] = dw * mw
out_quat[..., 0] -= dz * mz
out_quat[..., 1] = dw * mx
out_quat[..., 1] -= dz * my
out_quat[..., 2] = dw * my
out_quat[..., 2] += dz * mx
out_quat[..., 3] = dw * mz
out_quat[..., 3] += dz * mw
rel_pos = self._body_vec_error
vx = rel_pos[..., 0]
vy = rel_pos[..., 1]
vz = rel_pos[..., 2]
np.subtract(motion_data.body_pos_w[..., 0], anchor_pos_w[:, None, 0], out=vx)
np.subtract(motion_data.body_pos_w[..., 1], anchor_pos_w[:, None, 1], out=vy)
np.subtract(motion_data.body_pos_w[..., 2], anchor_pos_w[:, None, 2], out=vz)
yaw_cross = self._env_error
yaw_z2 = self._reward_term
np.multiply(dw1, dz1, out=yaw_cross)
yaw_cross *= 2.0
np.square(dz1, out=yaw_z2)
yaw_z2 *= 2.0
yaw_cross_2d = yaw_cross[:, None]
yaw_z2_2d = yaw_z2[:, None]
out_pos = self.body_pos_relative_w
out_pos[..., 0] = vx
out_pos[..., 0] -= yaw_cross_2d * vy
out_pos[..., 0] -= yaw_z2_2d * vx
out_pos[..., 0] += delta_pos_w[:, None, 0]
out_pos[..., 1] = vy
out_pos[..., 1] += yaw_cross_2d * vx
out_pos[..., 1] -= yaw_z2_2d * vy
out_pos[..., 1] += delta_pos_w[:, None, 1]
out_pos[..., 2] = vz
out_pos[..., 2] += delta_pos_w[:, None, 2]
def _compute_terminations(
self,
motion_data,
robot_body_pos_w: np.ndarray,
robot_body_quat_w: np.ndarray,
) -> np.ndarray:
"""Compute termination conditions."""
terminated = self._terminated
terminated.fill(False)
# Anchor position error (Z-axis only)
anchor_pos_w = motion_data.body_pos_w[:, self.anchor_body_idx]
robot_anchor_pos_w = robot_body_pos_w[:, self.anchor_body_idx]
np.subtract(anchor_pos_w[:, 2], robot_anchor_pos_w[:, 2], out=self._env_error)
np.abs(self._env_error, out=self._env_error)
np.greater(self._env_error, self._cfg.anchor_pos_z_threshold, out=self._env_bool)
terminated |= self._env_bool
# Anchor orientation error (gravity direction). The gravity-z difference
# is bounded by 2 for unit quaternions, so huge thresholds disable this
# termination without doing the per-step math.
if self._cfg.anchor_ori_threshold < 2.0:
anchor_quat_w = motion_data.body_quat_w[:, self.anchor_body_idx]
robot_anchor_quat_w = robot_body_quat_w[:, self.anchor_body_idx]
motion_gravity_z_b = _gravity_z_in_body_from_quat_w(anchor_quat_w)
robot_gravity_z_b = _gravity_z_in_body_from_quat_w(robot_anchor_quat_w)
np.subtract(motion_gravity_z_b, robot_gravity_z_b, out=self._env_error)
np.abs(self._env_error, out=self._env_error)
np.greater(self._env_error, self._cfg.anchor_ori_threshold, out=self._env_bool)
terminated |= self._env_bool
# End-effector position error (Z-axis only)
if self._has_ee_body_indices:
np.subtract(
self.body_pos_relative_w[:, self.ee_body_indices, 2],
robot_body_pos_w[:, self.ee_body_indices, 2],
out=self._ee_pos_error_z,
)
np.abs(self._ee_pos_error_z, out=self._ee_pos_error_z)
np.greater(
self._ee_pos_error_z,
self._cfg.ee_body_pos_z_threshold,
out=self._ee_terminated,
)
np.logical_or.reduce(self._ee_terminated, axis=1, out=self._env_bool)
terminated |= self._env_bool
if self._cfg.terminate_on_undesired_contacts and self._has_undesired_contact_body_indices:
body_z = robot_body_pos_w[:, self.undesired_contact_body_indices, 2]
np.less(
body_z,
self._cfg.undesired_contact_z_threshold,
out=self._undesired_contact_mask,
)
np.logical_or.reduce(self._undesired_contact_mask, axis=-1, out=self._env_bool)
terminated |= self._env_bool
return terminated
def _write_body_pos_in_anchor_frame(
self,
anchor_pos: np.ndarray,
anchor_quat: np.ndarray,
body_pos: np.ndarray,
out: np.ndarray,
) -> None:
aw = anchor_quat[:, None, 0]
ax = anchor_quat[:, None, 1]
ay = anchor_quat[:, None, 2]
az = anchor_quat[:, None, 3]
num_envs, n_body = body_pos.shape[:2]
rel_pos = self._body_vec_error[:num_envs, :n_body]
vx = rel_pos[..., 0]
vy = rel_pos[..., 1]
vz = rel_pos[..., 2]
np.subtract(body_pos[..., 0], anchor_pos[:, None, 0], out=vx)
np.subtract(body_pos[..., 1], anchor_pos[:, None, 1], out=vy)
np.subtract(body_pos[..., 2], anchor_pos[:, None, 2], out=vz)
tx = 2 * (az * vy - ay * vz)
ty = 2 * (ax * vz - az * vx)
tz = 2 * (ay * vx - ax * vy)
out[..., 0] = vx + aw * tx + az * ty - ay * tz
out[..., 1] = vy + aw * ty + ax * tz - az * tx
out[..., 2] = vz + aw * tz + ay * tx - ax * ty
def _write_body_ori6_in_anchor_frame(
self,
anchor_quat: np.ndarray,
body_quat: np.ndarray,
out: np.ndarray,
) -> None:
aw = anchor_quat[:, None, 0]
ax = anchor_quat[:, None, 1]
ay = anchor_quat[:, None, 2]
az = anchor_quat[:, None, 3]
bw = body_quat[..., 0]
bx = body_quat[..., 1]
by = body_quat[..., 2]
bz = body_quat[..., 3]
rw = aw * bw + ax * bx + ay * by + az * bz
rx = aw * bx - ax * bw - ay * bz + az * by
ry = aw * by + ax * bz - ay * bw - az * bx
rz = aw * bz - ax * by + ay * bx - az * bw
out[..., 0] = 1 - 2 * (ry * ry + rz * rz)
out[..., 1] = 2 * (rx * ry - rw * rz)
out[..., 2] = 2 * (rx * ry + rw * rz)
out[..., 3] = 1 - 2 * (rx * rx + rz * rz)
out[..., 4] = 2 * (rx * rz - rw * ry)
out[..., 5] = 2 * (ry * rz + rw * rx)
def _compute_obs(
self,
info: dict,
motion_data,
linvel: np.ndarray,
gyro: np.ndarray,
dof_pos: np.ndarray,
dof_vel: np.ndarray,
robot_body_pos_w: np.ndarray,
robot_body_quat_w: np.ndarray,
) -> dict[str, np.ndarray]:
"""Compute observations as dict with actor and critic groups."""
num_envs = linvel.shape[0]
dtype = get_global_dtype()
n_action = dof_pos.shape[1]
n_body = self._n_motion_bodies
# Get anchor states
anchor_pos_w = motion_data.body_pos_w[:, self.anchor_body_idx]
anchor_quat_w = motion_data.body_quat_w[:, self.anchor_body_idx]
robot_anchor_pos_w = robot_body_pos_w[:, self.anchor_body_idx]
robot_anchor_quat_w = robot_body_quat_w[:, self.anchor_body_idx]
# Motion anchor pose in robot frame
if num_envs == self._num_envs:
motion_anchor_pos_b = self._motion_anchor_pos_b
motion_anchor_ori_b = self._motion_anchor_ori_b
joint_pos_rel = self._joint_pos_rel
zero_actions = self._zero_actions
else:
motion_anchor_pos_b = np.empty((num_envs, 3), dtype=dtype)
motion_anchor_ori_b = np.empty((num_envs, 6), dtype=dtype)
joint_pos_rel = np.empty((num_envs, n_action), dtype=dtype)
zero_actions = np.zeros((num_envs, n_action), dtype=dtype)
_write_motion_anchor_transform(
robot_anchor_pos_w,
robot_anchor_quat_w,
anchor_pos_w,
anchor_quat_w,
motion_anchor_pos_b,
motion_anchor_ori_b,
)
# Joint positions and velocities
bias = info.get("default_dof_pos_bias")
effective_default = self.default_angles + bias if bias is not None else self.default_angles
np.subtract(dof_pos, effective_default, out=joint_pos_rel)
last_actions = info.get("current_actions")
if not isinstance(last_actions, np.ndarray):
last_actions = zero_actions
if num_envs == self._num_envs:
command = self._motion_command
else:
command = np.empty((num_envs, n_action * 2), dtype=dtype)
command[:, :n_action] = motion_data.joint_pos
command[:, n_action : n_action * 2] = motion_data.joint_vel
# Per-step observation noise on sensor channels (actor only).
# Critic uses the clean originals — asymmetric actor–critic contract.
noise_cfg = self._cfg.noise_config
noise_enabled = noise_cfg.level > 0.0
if noise_enabled:
linvel_actor = self._obs_noise(linvel, noise_cfg.scale_linvel)
gyro_actor = self._obs_noise(gyro, noise_cfg.scale_gyro)
joint_pos_actor = self._obs_noise(joint_pos_rel, noise_cfg.scale_joint_angle)
dof_vel_actor = self._obs_noise(dof_vel, noise_cfg.scale_joint_vel)
else:
linvel_actor = linvel
gyro_actor = gyro
joint_pos_actor = joint_pos_rel
dof_vel_actor = dof_vel
# Actor observations (noisy proprioception)
actor_obs = self._build_actor_obs(
command=command,
motion_anchor_pos_b=motion_anchor_pos_b,
motion_anchor_ori_b=motion_anchor_ori_b,
noisy_linvel=linvel_actor,
noisy_gyro=gyro_actor,
noisy_joint_pos_rel=joint_pos_actor,
noisy_dof_vel=dof_vel_actor,
last_actions=last_actions,
)
# Critic observations (clean proprioception + privileged body transforms)
critic_obs = np.empty((num_envs, self._critic_obs_width), dtype=dtype)
offset = 0
critic_obs[:, offset : offset + command.shape[1]] = command
offset += command.shape[1]
critic_obs[:, offset : offset + 3] = motion_anchor_pos_b
offset += 3
critic_obs[:, offset : offset + 6] = motion_anchor_ori_b
offset += 6
critic_obs[:, offset : offset + 3] = linvel
offset += 3
critic_obs[:, offset : offset + 3] = gyro
offset += 3
critic_obs[:, offset : offset + n_action] = joint_pos_rel
offset += n_action
critic_obs[:, offset : offset + n_action] = dof_vel
offset += n_action
critic_obs[:, offset : offset + n_action] = last_actions
offset += n_action
robot_body_pos_b = critic_obs[:, offset : offset + n_body * 3].reshape(num_envs, n_body, 3)
self._write_body_pos_in_anchor_frame(
robot_anchor_pos_w, robot_anchor_quat_w, robot_body_pos_w, robot_body_pos_b
)
offset += n_body * 3
robot_body_ori_b = critic_obs[:, offset : offset + n_body * 6].reshape(num_envs, n_body, 6)
self._write_body_ori6_in_anchor_frame(
robot_anchor_quat_w, robot_body_quat_w, robot_body_ori_b
)
return {"obs": actor_obs, "critic": critic_obs}
def _compute_reward(
self,
info: dict,
motion_data,
robot_body_pos_w: np.ndarray,
robot_body_quat_w: np.ndarray,
robot_body_lin_vel_w: np.ndarray,
robot_body_ang_vel_w: np.ndarray,
dof_pos: np.ndarray,
dof_vel: np.ndarray,
) -> np.ndarray:
"""Compute reward."""
reward = self._env_error2
reward.fill(0.0)
cfg = self._cfg.reward_config
step_count = info.get("steps")
should_log = self._enable_reward_log and (
int(step_count[0]) % 4 == 0 if isinstance(step_count, np.ndarray) else True
)
log = {} if should_log else info.get("log", {})
# Store motion and robot states in info for reward functions
info["motion_data"] = motion_data
info["robot_body_pos_w"] = robot_body_pos_w
info["robot_body_quat_w"] = robot_body_quat_w
info["robot_body_lin_vel_w"] = robot_body_lin_vel_w
info["robot_body_ang_vel_w"] = robot_body_ang_vel_w
info["reward_ref_body_pos_w"] = self.body_pos_relative_w
info["reward_ref_body_quat_w"] = self.body_quat_relative_w
info["anchor_body_idx"] = self.anchor_body_idx
info["dof_pos"] = dof_pos
info["dof_vel"] = dof_vel
for name, scale in cfg.scales.items():
if scale == 0:
continue
reward_fn = self._active_reward_fns.get(name)
if reward_fn is None:
if should_log and name in self._reward_fns:
log[f"reward/{name}"] = 0.0
continue
rew = reward_fn(info)
weighted_rew = self._weighted_reward
np.multiply(rew, scale, out=weighted_rew)
reward += weighted_rew
if should_log:
log[f"reward/{name}"] = float(np.sum(weighted_rew) / weighted_rew.size)
info["log"] = log
reward *= self._cfg.ctrl_dt
return reward
def _mean_body_xyz_squared_error(self, reference: np.ndarray, actual: np.ndarray) -> np.ndarray:
vec_error = self._body_vec_error
env_error = self._env_error
tmp_error = self._reward_term
np.subtract(reference[..., 0], actual[..., 0], out=vec_error[..., 0])
np.square(vec_error[..., 0], out=vec_error[..., 0])
np.sum(vec_error[..., 0], axis=1, out=env_error)
np.subtract(reference[..., 1], actual[..., 1], out=vec_error[..., 1])
np.square(vec_error[..., 1], out=vec_error[..., 1])
np.sum(vec_error[..., 1], axis=1, out=tmp_error)
env_error += tmp_error
np.subtract(reference[..., 2], actual[..., 2], out=vec_error[..., 2])
np.square(vec_error[..., 2], out=vec_error[..., 2])
np.sum(vec_error[..., 2], axis=1, out=tmp_error)
env_error += tmp_error
env_error /= reference.shape[1]
return env_error
def _quat_error_magnitude_squared_body(self, q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
rel_w = self._quat_error_w
rel_x = self._quat_error_x
# Motion/backend quaternions are unit quaternions, so the relative
# rotation angle only needs abs(dot(q1, q2)).
np.multiply(q1[..., 0], q2[..., 0], out=rel_w)
np.multiply(q1[..., 1], q2[..., 1], out=rel_x)
rel_w += rel_x
np.multiply(q1[..., 2], q2[..., 2], out=rel_x)
rel_w += rel_x
np.multiply(q1[..., 3], q2[..., 3], out=rel_x)
rel_w += rel_x
np.abs(rel_w, out=rel_w)
np.clip(rel_w, 0.0, 1.0, out=rel_w)
np.arccos(rel_w, out=rel_x)
rel_x *= 2.0
np.square(rel_x, out=rel_x)
return rel_x
def _exp_reward_from_error(self, error: np.ndarray, std: float) -> np.ndarray:
out = self._reward_term
np.divide(error, -(std**2), out=out)
np.exp(out, out=out)
return out
# Reward functions
def _reward_motion_global_root_pos(self, info: dict) -> np.ndarray:
motion_data = info["motion_data"]
robot_body_pos_w = info["robot_body_pos_w"]
anchor_pos_w = motion_data.body_pos_w[:, self.anchor_body_idx]
robot_anchor_pos_w = robot_body_pos_w[:, self.anchor_body_idx]
error = self._env_error
np.subtract(anchor_pos_w[:, 0], robot_anchor_pos_w[:, 0], out=error)
np.square(error, out=error)
np.subtract(anchor_pos_w[:, 1], robot_anchor_pos_w[:, 1], out=self._reward_term)
np.square(self._reward_term, out=self._reward_term)
error += self._reward_term
np.subtract(anchor_pos_w[:, 2], robot_anchor_pos_w[:, 2], out=self._reward_term)
np.square(self._reward_term, out=self._reward_term)
error += self._reward_term
return self._exp_reward_from_error(error, self._cfg.reward_config.std_root_pos)
def _reward_motion_global_root_ori(self, info: dict) -> np.ndarray:
motion_data = info["motion_data"]
robot_body_quat_w = info["robot_body_quat_w"]
anchor_quat_w = motion_data.body_quat_w[:, self.anchor_body_idx]
robot_anchor_quat_w = robot_body_quat_w[:, self.anchor_body_idx]
np.multiply(anchor_quat_w[:, 0], robot_anchor_quat_w[:, 0], out=self._env_error)
np.multiply(anchor_quat_w[:, 1], robot_anchor_quat_w[:, 1], out=self._reward_term)
self._env_error += self._reward_term
np.multiply(anchor_quat_w[:, 2], robot_anchor_quat_w[:, 2], out=self._reward_term)
self._env_error += self._reward_term
np.multiply(anchor_quat_w[:, 3], robot_anchor_quat_w[:, 3], out=self._reward_term)
self._env_error += self._reward_term
np.abs(self._env_error, out=self._env_error)
np.clip(self._env_error, 0.0, 1.0, out=self._env_error)
np.arccos(self._env_error, out=self._env_error)
self._env_error *= 2.0
np.square(self._env_error, out=self._env_error)
return self._exp_reward_from_error(self._env_error, self._cfg.reward_config.std_root_ori)
def _reward_motion_body_pos(self, info: dict) -> np.ndarray:
robot_body_pos_w = info["robot_body_pos_w"]
error = self._mean_body_xyz_squared_error(self.body_pos_relative_w, robot_body_pos_w)
return self._exp_reward_from_error(error, self._cfg.reward_config.std_body_pos)
def _reward_motion_body_ori(self, info: dict) -> np.ndarray:
robot_body_quat_w = info["robot_body_quat_w"]
error = self._quat_error_magnitude_squared_body(
self.body_quat_relative_w, robot_body_quat_w
)
np.sum(error, axis=-1, out=self._env_error)
self._env_error /= error.shape[1]
return self._exp_reward_from_error(self._env_error, self._cfg.reward_config.std_body_ori)
def _reward_motion_body_lin_vel(self, info: dict) -> np.ndarray:
motion_data = info["motion_data"]
robot_body_lin_vel_w = info["robot_body_lin_vel_w"]
error = self._mean_body_xyz_squared_error(motion_data.body_lin_vel_w, robot_body_lin_vel_w)
return self._exp_reward_from_error(error, self._cfg.reward_config.std_body_lin_vel)
def _reward_motion_body_ang_vel(self, info: dict) -> np.ndarray:
motion_data = info["motion_data"]
robot_body_ang_vel_w = info["robot_body_ang_vel_w"]
error = self._mean_body_xyz_squared_error(motion_data.body_ang_vel_w, robot_body_ang_vel_w)
return self._exp_reward_from_error(error, self._cfg.reward_config.std_body_ang_vel)
def _reward_motion_ee_body_pos_z(self, info: dict) -> np.ndarray:
robot_body_pos_w = info["robot_body_pos_w"]
np.subtract(
self.body_pos_relative_w[:, self.ee_body_indices, 2],
robot_body_pos_w[:, self.ee_body_indices, 2],
out=self._ee_pos_error_z,
)
np.square(self._ee_pos_error_z, out=self._ee_pos_error_z)
np.sum(self._ee_pos_error_z, axis=-1, out=self._env_error)
self._env_error /= self._ee_pos_error_z.shape[1]
return self._exp_reward_from_error(self._env_error, self._cfg.reward_config.std_body_pos)
def _reward_motion_joint_pos(self, info: dict) -> np.ndarray:
motion_data = info["motion_data"]
dof_pos = info["dof_pos"]
np.subtract(motion_data.joint_pos, dof_pos, out=self._joint_error)
np.square(self._joint_error, out=self._joint_error)
np.sum(self._joint_error, axis=1, out=self._env_error)
self._env_error /= dof_pos.shape[1]
return self._exp_reward_from_error(self._env_error, self._cfg.reward_config.std_joint_pos)
def _reward_motion_joint_vel(self, info: dict) -> np.ndarray:
motion_data = info["motion_data"]
dof_vel = info["dof_vel"]
np.subtract(motion_data.joint_vel, dof_vel, out=self._joint_error)
np.square(self._joint_error, out=self._joint_error)
np.sum(self._joint_error, axis=1, out=self._env_error)
self._env_error /= dof_vel.shape[1]
return self._exp_reward_from_error(self._env_error, self._cfg.reward_config.std_joint_vel)
def _reward_undesired_contacts(self, info: dict) -> np.ndarray:
robot_body_pos_w = info["robot_body_pos_w"]
body_z = robot_body_pos_w[:, self.undesired_contact_body_indices, 2]
np.less(
body_z,
self._cfg.undesired_contact_z_threshold,
out=self._undesired_contact_mask,
)
np.sum(self._undesired_contact_mask, axis=-1, out=self._env_error)
return self._env_error
def _reward_action_rate_l2(self, info: dict) -> np.ndarray:
np.subtract(info["current_actions"], info["last_actions"], out=self._joint_error)
np.square(self._joint_error, out=self._joint_error)
np.sum(self._joint_error, axis=1, out=self._env_error)
return self._env_error
def _reward_joint_limit(self, info: dict) -> np.ndarray:
dof_pos = info["dof_pos"]
lower = self._joint_lower
upper = self._joint_upper
if lower is None or upper is None:
self._reward_term.fill(0.0)
return self._reward_term
# Compute violation
np.subtract(lower, dof_pos, out=self._joint_error)
np.maximum(self._joint_error, 0, out=self._joint_error)
np.subtract(dof_pos, upper, out=self._joint_error_upper)
np.maximum(self._joint_error_upper, 0, out=self._joint_error_upper)
self._joint_error += self._joint_error_upper
np.square(self._joint_error, out=self._joint_error)
np.sum(self._joint_error, axis=1, out=self._reward_term)
return self._reward_term
[docs]
@registry.env("G1MotionTrackingDeploy", sim_backend="mujoco")
@registry.env("G1MotionTrackingDeploy", sim_backend="motrix")
class G1MotionTrackingDeployEnv(G1MotionTrackingEnv):
"""Deploy-oriented G1 motion tracking env with unitree_rl_lab mimic actor inputs."""
_cfg: G1MotionTrackingDeployEnvCfg
def _actor_obs_dim(self, n: int) -> int:
# unitree_rl_lab mimic deploy actor input:
# motion_command(2n), motion_anchor_ori_b(6), gyro(3), joints, actions.
return 6 + 3 + n * 5
def _build_actor_obs(
self,
*,
command: np.ndarray,
motion_anchor_pos_b: np.ndarray,
motion_anchor_ori_b: np.ndarray,
noisy_linvel: np.ndarray,
noisy_gyro: np.ndarray,
noisy_joint_pos_rel: np.ndarray,
noisy_dof_vel: np.ndarray,
last_actions: np.ndarray,
) -> np.ndarray:
return np.concatenate(
[
command,
motion_anchor_ori_b,
noisy_gyro,
noisy_joint_pos_rel,
noisy_dof_vel,
last_actions,
],
axis=1,
dtype=get_global_dtype(),
)