Source code for unilab.envs.locomotion.go2w.joystick

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, cast

import numpy as np

from unilab.assets import ASSETS_ROOT_PATH
from unilab.base import registry
from unilab.base.backend import create_backend
from unilab.base.np_env import NpEnvState
from unilab.base.scene import SceneCfg
from unilab.dr import DomainRandomizationCapabilities, ResetPlan, ResetRandomizationPayload
from unilab.dr.dr_utils import (
    build_interval_push_plan,
    validate_interval_push_support,
    zero_actions,
)
from unilab.dtype_config import get_global_dtype
from unilab.envs.common.rotation import (
    np_quat_mul,
    np_yaw_to_quat,
)
from unilab.envs.locomotion.common import rewards
from unilab.envs.locomotion.common.commands import (
    Commands,
    apply_heading_yaw_feedback,
    zero_small_xy_commands,
)
from unilab.envs.locomotion.common.commands import (
    sample_heading_commands as sample_go2w_heading_commands,
)
from unilab.envs.locomotion.common.domain_rand import DomainRandConfig
from unilab.envs.locomotion.common.dr_provider import LocomotionDRProvider
from unilab.envs.locomotion.common.rewards import RewardContext
from unilab.envs.locomotion.go2w.base import (
    DEFAULT_GO2W_ANGLES,
    NUM_GO2W_ACTIONS,
    NUM_LEG_ACTIONS,
    NUM_WHEEL_ACTIONS,
    Go2WBaseCfg,
    Go2WBaseEnv,
    compute_go2w_motor_ctrl,
    stack_joint_sensors,
)

GO2W_HIP_INDICES = np.asarray([0, 3, 6, 9], dtype=np.int32)


[docs] @dataclass class InitState: pos = [0.0, 0.0, 0.42]
[docs] @dataclass class Go2WDomainRandConfig(DomainRandConfig): randomize_init_yaw: bool = True init_z_range: list[float] = field(default_factory=lambda: [0.0, 0.2]) init_roll_range: list[float] = field(default_factory=lambda: [0.0, 0.0]) init_pitch_range: list[float] = field(default_factory=lambda: [0.0, 0.0]) init_yaw_range: list[float] = field(default_factory=lambda: [-np.pi, np.pi]) randomize_kp: bool = True kp_multiplier_range: list[float] = field(default_factory=lambda: [0.9, 1.1]) randomize_kd: bool = True kd_multiplier_range: list[float] = field(default_factory=lambda: [0.9, 1.1])
[docs] @dataclass class RewardConfig: scales: dict[str, float] tracking_sigma: float base_height_target: float only_positive_rewards: bool = False joint_pos_penalty_stand_still_scale: float = 5.0 joint_pos_penalty_velocity_threshold: float = 0.5 joint_pos_penalty_command_threshold: float = 0.1
[docs] @dataclass class JoystickSensor: local_linvel = "local_linvel" gyro = "gyro" gravity = "upvector"
[docs] @registry.envcfg("Go2WJoystickFlat") @dataclass class Go2WJoystickCfg(Go2WBaseCfg): scene: SceneCfg = field( default_factory=lambda: SceneCfg( model_file=str(ASSETS_ROOT_PATH / "robots" / "go2w" / "scene_flat.xml") ) ) max_episode_seconds: float = 20.0 init_state: InitState = field(default_factory=InitState) commands: Commands = field(default_factory=Commands) reward_config: RewardConfig | None = None sensor: JoystickSensor = field(default_factory=JoystickSensor) # type: ignore[assignment] domain_rand: Go2WDomainRandConfig = field(default_factory=Go2WDomainRandConfig)
[docs] def build_go2w_backend_reset_randomization( env: Any, num_reset: int ) -> ResetRandomizationPayload | None: """Build reset DR payloads that are valid for a motor-actuator Go2W model. kp/kd are intentionally excluded here. Go2W samples them through the same config path as Go2, but applies them inside its owner pre-step motor control. """ domain_rand = getattr(env.cfg, "domain_rand", None) if domain_rand is None: return None payload = ResetRandomizationPayload() if getattr(domain_rand, "randomize_base_mass", False): low, high = domain_rand.added_mass_range payload.base_mass_delta = np.random.uniform(low, high, size=(num_reset,)) if getattr(domain_rand, "random_com", False): low, high = domain_rand.com_offset_x base_com_offset = np.zeros((num_reset, 3), dtype=np.float64) base_com_offset[:, 0] = np.random.uniform(low, high, size=(num_reset,)) payload.base_com_offset = base_com_offset if getattr(domain_rand, "randomize_gravity", False): gravity_range = np.asarray(domain_rand.gravity_range, dtype=np.float64) if gravity_range.shape != (2, 3): raise ValueError( f"domain_rand.gravity_range must have shape (2, 3), got {gravity_range.shape}" ) low = np.minimum(gravity_range[0], gravity_range[1]) high = np.maximum(gravity_range[0], gravity_range[1]) payload.gravity = np.random.uniform(low=low, high=high, size=(num_reset, 3)) return None if payload.is_empty() else payload
[docs] def sample_go2w_reset_yaw(domain_rand: Go2WDomainRandConfig, num_reset: int) -> np.ndarray: if not domain_rand.randomize_init_yaw: return np.zeros((num_reset,), dtype=get_global_dtype()) yaw_range = np.asarray(domain_rand.init_yaw_range, dtype=np.float64) if yaw_range.shape != (2,): raise ValueError(f"domain_rand.init_yaw_range must have shape (2,), got {yaw_range.shape}") low, high = float(np.min(yaw_range)), float(np.max(yaw_range)) return np.asarray(np.random.uniform(low, high, size=(num_reset,)), dtype=get_global_dtype())
[docs] class Go2WJoystickDomainRandomizationProvider(LocomotionDRProvider):
[docs] def validate(self, env: Any, capabilities: DomainRandomizationCapabilities) -> None: payload = build_go2w_backend_reset_randomization(env, num_reset=1) if payload is not None: unsupported = capabilities.get_unsupported_reset_terms(payload.requested_terms()) if unsupported: names = ", ".join(sorted(unsupported)) raise NotImplementedError( f"{env._backend.backend_type} backend does not support Go2W reset randomization terms: {names}" ) validate_interval_push_support(env, capabilities)
[docs] def build_interval_randomization_plan(self, env: Any, step_counter: int): return build_interval_push_plan(env, step_counter)
[docs] def build_reset_plan(self, env: Any, env_ids: np.ndarray) -> ResetPlan: num_reset = len(env_ids) qpos = np.tile(env._init_qpos, (num_reset, 1)) qvel = np.tile(env._init_qvel, (num_reset, 1)) qpos[:, 0:2] += np.random.uniform(-0.5, 0.5, (num_reset, 2)) qpos[:, 0:3] += env._spawn.origins_for(env_ids) yaw = sample_go2w_reset_yaw(env.cfg.domain_rand, num_reset) qpos[:, 3:7] = np_quat_mul(qpos[:, 3:7], np_yaw_to_quat(yaw)) qvel[:, 0:6] = np.asarray( np.random.uniform(-0.5, 0.5, size=(num_reset, 6)), dtype=get_global_dtype() ) motor_kp, motor_kd = env.sample_reset_motor_gains(num_reset) env.set_motor_gains(env_ids, motor_kp, motor_kd) commands = self._sample_commands(env, num_reset) info_updates: dict[str, Any] = { "commands": commands, "current_actions": zero_actions(num_reset, env._num_action), "last_actions": zero_actions(num_reset, env._num_action), "motor_kp": motor_kp.astype(get_global_dtype()), "motor_kd": motor_kd.astype(get_global_dtype()), "torques": np.zeros((num_reset, env._num_action), dtype=get_global_dtype()), } if getattr(env.cfg.commands, "heading_command", False): info_updates["heading_commands"] = sample_go2w_heading_commands(env, num_reset) return ResetPlan( env_ids=env_ids, qpos=qpos, qvel=qvel, info_updates=info_updates, randomization=build_go2w_backend_reset_randomization(env, num_reset), )
def _compute_reset_obs( self, env: Any, env_ids: Any, info_updates: Any, linvel: Any, gyro: Any, gravity: Any, dof_pos: Any, dof_vel: Any, ) -> dict[str, np.ndarray]: del env_ids return cast( dict[str, np.ndarray], env._compute_obs(info_updates, linvel, gyro, gravity, dof_pos, dof_vel), ) def _sample_commands(self, env: Any, num_reset: int) -> np.ndarray: commands = super()._sample_commands(env, num_reset) zero_small_xy_commands(commands) standing_prob = float(getattr(env.cfg.commands, "rel_standing_envs", 0.0)) if standing_prob > 0.0: standing = np.random.uniform(size=(num_reset,)) < min(standing_prob, 1.0) commands[standing] = 0.0 if getattr(env.cfg.commands, "heading_command", False): commands[:, 2] = 0.0 return commands
[docs] @registry.env("Go2WJoystickFlat", sim_backend="mujoco") class Go2WJoystickEnv(Go2WBaseEnv): _cfg: Go2WJoystickCfg
[docs] def __init__(self, cfg: Go2WJoystickCfg, num_envs=1, backend_type="mujoco"): if cfg.reward_config is None: raise ValueError("reward_config must be provided via Hydra configuration") backend = create_backend( backend_type, cfg.scene, num_envs, cfg.sim_dt, base_name=cfg.asset.base_name, push_body_name=cfg.domain_rand.push_body_name, motrix_max_iterations=cfg.motrix_max_iterations, post_step_forward_sensor=cfg.post_step_forward_sensor, ) super().__init__(cfg, backend, num_envs) self._np_dtype = get_global_dtype() self._leg_action_scale = self._build_leg_action_scale() self._reward_cfg = cfg.reward_config self._enable_reward_log = True ctrl_range = np.asarray(self._backend.get_actuator_ctrl_range(), dtype=np.float64) self._validate_motor_control_contract(ctrl_range, num_envs) self._ctrl_lower = ctrl_range[:, 0].astype(self._np_dtype) self._ctrl_upper = ctrl_range[:, 1].astype(self._np_dtype) joint_range = self._backend.get_joint_range() self._leg_joint_range = ( np.asarray(joint_range[:NUM_LEG_ACTIONS], dtype=get_global_dtype()) if joint_range is not None else None ) self._base_motor_kp = np.full((NUM_LEG_ACTIONS,), cfg.control_config.Kp, dtype=np.float64) self._base_motor_kd = np.full((NUM_LEG_ACTIONS,), cfg.control_config.Kd, dtype=np.float64) self._base_wheel_kd = np.full( (NUM_WHEEL_ACTIONS,), cfg.control_config.wheel_Kd, dtype=np.float64 ) self._motor_kp = np.broadcast_to(self._base_motor_kp, (num_envs, NUM_LEG_ACTIONS)).copy() self._motor_kd = np.broadcast_to(self._base_motor_kd, (num_envs, NUM_LEG_ACTIONS)).copy() self._wheel_kd = np.broadcast_to(self._base_wheel_kd, (num_envs, NUM_WHEEL_ACTIONS)).copy() self._last_motor_ctrl = np.zeros((num_envs, NUM_GO2W_ACTIONS), dtype=self._np_dtype) self._last_dof_vel_for_acc = np.zeros( (num_envs, NUM_GO2W_ACTIONS), dtype=get_global_dtype() ) self._backend.set_pre_step_control(self._pre_step_motor_control) self._init_reward_functions() self._init_domain_randomization(Go2WJoystickDomainRandomizationProvider())
@property def obs_groups_spec(self) -> dict[str, int]: return {"obs": 53, "critic": 72}
[docs] def reset(self, env_indices: np.ndarray) -> tuple[dict[str, np.ndarray], dict]: env_ids = np.asarray(env_indices, dtype=np.int32) obs, info = super().reset(env_ids) dof_vel = self.get_dof_vel() if dof_vel.shape[0] == self._num_envs: self._last_dof_vel_for_acc[env_ids] = dof_vel[env_ids] return obs, info
def _validate_motor_control_contract(self, ctrl_range: np.ndarray, num_envs: int) -> None: if self._backend.num_actuators != NUM_GO2W_ACTIONS: raise ValueError( f"Go2W requires {NUM_GO2W_ACTIONS} motor actuators, got {self._backend.num_actuators}" ) if ctrl_range.shape != (NUM_GO2W_ACTIONS, 2): raise ValueError( f"Go2W actuator ctrl_range must have shape ({NUM_GO2W_ACTIONS}, 2), " f"got {ctrl_range.shape}" ) pos = stack_joint_sensors(self._backend, "pos", dtype=self.default_angles.dtype) vel = stack_joint_sensors(self._backend, "vel", dtype=self.default_angles.dtype) expected_shape = (num_envs, NUM_GO2W_ACTIONS) if pos.shape != expected_shape: raise ValueError(f"Go2W joint position sensor stack must have shape {expected_shape}") if vel.shape != expected_shape: raise ValueError(f"Go2W joint velocity sensor stack must have shape {expected_shape}") def _build_leg_action_scale(self) -> np.ndarray: scale = np.full( (NUM_LEG_ACTIONS,), float(self._cfg.control_config.action_scale), dtype=self._np_dtype, ) hip_action_scale = self._cfg.control_config.hip_action_scale if hip_action_scale is not None: scale[GO2W_HIP_INDICES] = float(hip_action_scale) return scale def _init_reward_functions(self) -> None: self._reward_fns: dict[str, Any] = { "tracking_lin_vel": rewards.tracking_lin_vel, "tracking_ang_vel": rewards.tracking_ang_vel, "lin_vel_z": rewards.lin_vel_z, "ang_vel_xy": rewards.ang_vel_xy, "base_height": rewards.base_height, "action_rate": rewards.action_rate, "similar_to_default": rewards.similar_to_default, "orientation": rewards.orientation, "torques": self._reward_torques_l2, "joint_torques_l2": self._reward_joint_torques_l2, "energy": rewards.energy, "dof_vel": self._reward_dof_vel, "dof_acc": self._reward_dof_acc, "joint_acc_l2": self._reward_dof_acc, "wheel_acc": self._reward_wheel_acc, "joint_acc_wheel_l2": self._reward_wheel_acc, "stand_still": self._reward_stand_still, "hip_pos": self._reward_hip_pos, "dof_error": self._reward_dof_error, "joint_pos_penalty": self._reward_joint_pos_penalty, "joint_power": self._reward_joint_power, "joint_mirror": self._reward_joint_mirror, "alive": rewards.alive, "upward": rewards.upward, "wheel_vel": self._reward_wheel_vel, }
[docs] def sample_reset_motor_gains(self, num_reset: int) -> tuple[np.ndarray, np.ndarray]: kp = np.broadcast_to(self._base_motor_kp, (num_reset, NUM_LEG_ACTIONS)).copy() kd = np.broadcast_to(self._base_motor_kd, (num_reset, NUM_LEG_ACTIONS)).copy() domain_rand = self._cfg.domain_rand if domain_rand.randomize_kp: low, high = domain_rand.kp_multiplier_range kp *= np.random.uniform(low, high, size=(num_reset, 1)) if domain_rand.randomize_kd: low, high = domain_rand.kd_multiplier_range kd *= np.random.uniform(low, high, size=(num_reset, 1)) return kp, kd
[docs] def set_motor_gains(self, env_ids: np.ndarray, kp: np.ndarray, kd: np.ndarray) -> None: self._motor_kp[env_ids] = np.asarray(kp, dtype=np.float64) self._motor_kd[env_ids] = np.asarray(kd, dtype=np.float64)
[docs] def apply_action(self, actions: np.ndarray, state: NpEnvState) -> np.ndarray: clipped_actions = np.asarray( np.clip( actions, -self._cfg.control_config.clip_actions, self._cfg.control_config.clip_actions, ), dtype=self._np_dtype, ) state.info["last_actions"] = state.info.get( "current_actions", np.zeros_like(clipped_actions) ) state.info["current_actions"] = clipped_actions exec_actions = ( state.info["last_actions"] if self._cfg.control_config.simulate_action_latency else clipped_actions ) leg_targets = ( exec_actions[:, :NUM_LEG_ACTIONS] * self._leg_action_scale + self.default_angles[:NUM_LEG_ACTIONS] ) wheel_velocity_targets = ( exec_actions[:, NUM_LEG_ACTIONS:] * self._cfg.control_config.wheel_action_scale ) return np.concatenate([leg_targets, wheel_velocity_targets], axis=1, dtype=self._np_dtype)
def _pre_step_motor_control(self, backend: Any, policy_ctrl: np.ndarray) -> np.ndarray: joint_pos = stack_joint_sensors(backend, "pos", dtype=self.default_angles.dtype) joint_vel = stack_joint_sensors(backend, "vel", dtype=self.default_angles.dtype) motor_ctrl = compute_go2w_motor_ctrl( policy_ctrl, joint_pos, joint_vel, self._motor_kp, self._motor_kd, self._wheel_kd, self._ctrl_lower, self._ctrl_upper, self._last_motor_ctrl, ) return motor_ctrl
[docs] def update_state(self, state: NpEnvState) -> NpEnvState: self._update_commands(state.info) linvel = self.get_local_linvel() gyro = self.get_gyro() gravity = self._backend.get_sensor_data(self._cfg.sensor.gravity) dof_pos = self.get_dof_pos() dof_vel = self.get_dof_vel() state.info["torques"] = self._last_motor_ctrl.copy() state.info["qacc"] = self._estimate_dof_acc(dof_vel) terminated = self._compute_terminated(gravity) reward = self._compute_reward(state.info, linvel, gyro, gravity, dof_pos, dof_vel) obs = self._compute_obs(state.info, linvel, gyro, gravity, dof_pos, dof_vel) return state.replace(obs=obs, reward=reward, terminated=terminated)
def _compute_terminated(self, gravity: np.ndarray) -> np.ndarray: return gravity[:, 2] <= 0.5 def _compute_obs( self, info: dict, linvel: np.ndarray, gyro: np.ndarray, gravity: np.ndarray, dof_pos: np.ndarray, dof_vel: np.ndarray, ) -> dict[str, np.ndarray]: noise_cfg = self._cfg.noise_config leg_diff = dof_pos[:, :NUM_LEG_ACTIONS] - self.default_angles[:NUM_LEG_ACTIONS] leg_vel = dof_vel[:, :NUM_LEG_ACTIONS] wheel_vel = dof_vel[:, NUM_LEG_ACTIONS:] noisy_gyro = self._obs_noise(gyro, noise_cfg.scale_gyro) noisy_gravity = self._obs_noise(gravity, noise_cfg.scale_gravity) noisy_leg_diff = self._obs_noise(leg_diff, noise_cfg.scale_joint_angle) noisy_leg_vel = self._obs_noise(leg_vel, noise_cfg.scale_joint_vel) noisy_wheel_vel = self._obs_noise(wheel_vel, noise_cfg.scale_wheel_vel) num_obs = gyro.shape[0] last_actions = info.get("current_actions", np.zeros((num_obs, self._num_action))) motor_ctrl = info.get("torques", np.zeros((num_obs, self._num_action), dtype=dof_pos.dtype)) obs = np.concatenate( [ noisy_gyro, -noisy_gravity, noisy_leg_diff, noisy_leg_vel, noisy_wheel_vel, last_actions, info["commands"], ], axis=1, dtype=get_global_dtype(), ) critic = np.concatenate( [ gyro, -gravity, leg_diff, leg_vel, wheel_vel, last_actions, info["commands"], linvel, motor_ctrl, ], axis=1, dtype=get_global_dtype(), ) return {"obs": obs, "critic": critic} def _compute_reward(self, info: dict, linvel, gyro, gravity, dof_pos, dof_vel) -> np.ndarray: dtype = get_global_dtype() num_obs = linvel.shape[0] ctx = RewardContext( info=info, linvel=linvel, gyro=gyro, dof_pos=dof_pos[:, :NUM_LEG_ACTIONS], dof_vel=dof_vel, num_envs=num_obs, default_angles=DEFAULT_GO2W_ANGLES[:NUM_LEG_ACTIONS].astype(dtype), tracking_sigma=self._reward_cfg.tracking_sigma, base_height_target=self._reward_cfg.base_height_target, base_height=self._reward_base_height_values(num_obs), gravity=gravity, joint_range=self._leg_joint_range, ) return rewards.run_reward_dispatch( scales=self._reward_cfg.scales, fns=self._reward_fns, ctx=ctx, info=info, enable_log=self._enable_reward_log, ctrl_dt=self._cfg.ctrl_dt, only_positive=self._reward_cfg.only_positive_rewards, ) def _update_commands(self, info: dict) -> None: commands = info.get("commands") if commands is None: return commands_arr = np.asarray(commands, dtype=get_global_dtype()) resampling_time = float(getattr(self._cfg.commands, "resampling_time", 0.0)) if resampling_time > 0.0: interval_steps = max(int(round(resampling_time / self._cfg.ctrl_dt)), 1) steps = np.asarray(info.get("steps", np.zeros((self._num_envs,), dtype=np.uint32))) resample_mask = (steps > 0) & ((steps % interval_steps) == 0) if np.any(resample_mask): num_resample = int(np.count_nonzero(resample_mask)) low = np.asarray(self._cfg.commands.vel_limit[0], dtype=get_global_dtype()) high = np.asarray(self._cfg.commands.vel_limit[1], dtype=get_global_dtype()) sampled = np.random.uniform(low=low, high=high, size=(num_resample, 3)).astype( get_global_dtype() ) zero_small_xy_commands(sampled) standing_prob = float(getattr(self._cfg.commands, "rel_standing_envs", 0.0)) if standing_prob > 0.0: standing = np.random.uniform(size=(num_resample,)) < min(standing_prob, 1.0) sampled[standing] = 0.0 commands_arr[resample_mask] = sampled if getattr(self._cfg.commands, "heading_command", False): heading_commands = self._ensure_heading_commands(info, commands_arr.shape[0]) heading_commands[resample_mask] = sample_go2w_heading_commands( self, num_resample ) info["heading_commands"] = heading_commands if getattr(self._cfg.commands, "heading_command", False): heading_commands = self._ensure_heading_commands(info, commands_arr.shape[0]) base_quat = np.asarray(self._backend.get_base_quat(), dtype=get_global_dtype()) if base_quat.shape[0] == commands_arr.shape[0]: stiffness = float(getattr(self._cfg.commands, "heading_control_stiffness", 0.5)) apply_heading_yaw_feedback( commands_arr, base_quat, heading_commands, stiffness=stiffness ) info["commands"] = commands_arr def _ensure_heading_commands(self, info: dict, num_obs: int) -> np.ndarray: heading_commands = info.get("heading_commands") if heading_commands is None or np.asarray(heading_commands).shape != (num_obs,): heading_commands = sample_go2w_heading_commands(self, num_obs) info["heading_commands"] = heading_commands heading_commands = np.asarray(heading_commands, dtype=get_global_dtype()) info["heading_commands"] = heading_commands return heading_commands def _estimate_dof_acc(self, dof_vel: np.ndarray) -> np.ndarray: qacc = np.asarray((dof_vel - self._last_dof_vel_for_acc) / self._cfg.ctrl_dt) self._last_dof_vel_for_acc[:] = dof_vel return np.asarray(qacc, dtype=get_global_dtype()) def _reward_base_height_values(self, num_obs: int) -> np.ndarray: base_pos = np.asarray(self._backend.get_base_pos(), dtype=get_global_dtype()) if base_pos.shape[0] != num_obs: return np.zeros((num_obs,), dtype=get_global_dtype()) return np.asarray(base_pos[:, 2], dtype=get_global_dtype()) def _reward_wheel_vel(self, ctx: RewardContext) -> np.ndarray: assert ctx.dof_vel is not None wheel_vel = ctx.dof_vel[:, NUM_LEG_ACTIONS:] return np.asarray(np.sum(np.square(wheel_vel), axis=1), dtype=get_global_dtype()) def _reward_torques_l2(self, ctx: RewardContext) -> np.ndarray: torques = np.asarray( ctx.info.get("torques", np.zeros((ctx.num_envs, self._num_action))), dtype=get_global_dtype(), ) return np.asarray(np.sum(np.square(torques), axis=1), dtype=get_global_dtype()) def _reward_joint_torques_l2(self, ctx: RewardContext) -> np.ndarray: torques = np.asarray( ctx.info.get("torques", np.zeros((ctx.num_envs, self._num_action))), dtype=get_global_dtype(), ) return np.asarray( np.sum(np.square(torques[:, :NUM_LEG_ACTIONS]), axis=1), dtype=get_global_dtype(), ) def _reward_dof_vel(self, ctx: RewardContext) -> np.ndarray: assert ctx.dof_vel is not None return np.asarray( np.sum(np.square(ctx.dof_vel[:, :NUM_LEG_ACTIONS]), axis=1), dtype=get_global_dtype(), ) def _reward_dof_acc(self, ctx: RewardContext) -> np.ndarray: qacc = np.asarray( ctx.info.get("qacc", np.zeros((ctx.num_envs, NUM_GO2W_ACTIONS))), dtype=get_global_dtype(), ) return np.asarray(np.sum(np.square(qacc[:, :NUM_LEG_ACTIONS]), axis=1), dtype=qacc.dtype) def _reward_wheel_acc(self, ctx: RewardContext) -> np.ndarray: qacc = np.asarray( ctx.info.get("qacc", np.zeros((ctx.num_envs, NUM_GO2W_ACTIONS))), dtype=get_global_dtype(), ) return np.asarray(np.sum(np.square(qacc[:, NUM_LEG_ACTIONS:]), axis=1), dtype=qacc.dtype) def _reward_stand_still(self, ctx: RewardContext) -> np.ndarray: commands = ctx.info["commands"] stopped = np.linalg.norm(commands[:, :2], axis=1) < 0.1 dof_error = np.sum(np.abs(ctx.dof_pos - DEFAULT_GO2W_ANGLES[:NUM_LEG_ACTIONS]), axis=1) return np.asarray(dof_error * stopped, dtype=get_global_dtype()) def _reward_hip_pos(self, ctx: RewardContext) -> np.ndarray: diff = ctx.dof_pos[:, GO2W_HIP_INDICES] - DEFAULT_GO2W_ANGLES[GO2W_HIP_INDICES] return np.asarray(np.sum(np.square(diff), axis=1), dtype=get_global_dtype()) def _reward_dof_error(self, ctx: RewardContext) -> np.ndarray: diff = ctx.dof_pos - DEFAULT_GO2W_ANGLES[:NUM_LEG_ACTIONS] return np.asarray(np.sum(np.square(diff), axis=1), dtype=get_global_dtype()) def _reward_joint_pos_penalty(self, ctx: RewardContext) -> np.ndarray: return rewards.joint_pos_penalty( ctx, stand_still_scale=self._reward_cfg.joint_pos_penalty_stand_still_scale, velocity_threshold=self._reward_cfg.joint_pos_penalty_velocity_threshold, command_threshold=self._reward_cfg.joint_pos_penalty_command_threshold, ) def _reward_joint_power(self, ctx: RewardContext) -> np.ndarray: assert ctx.dof_vel is not None torques = np.asarray( ctx.info.get("torques", np.zeros((ctx.num_envs, self._num_action))), dtype=get_global_dtype(), ) return np.asarray( np.sum(np.abs(ctx.dof_vel[:, :NUM_LEG_ACTIONS] * torques[:, :NUM_LEG_ACTIONS]), axis=1), dtype=get_global_dtype(), ) def _reward_joint_mirror(self, ctx: RewardContext) -> np.ndarray: fr_rl = ctx.dof_pos[:, 0:3] - ctx.dof_pos[:, 9:12] fl_rr = ctx.dof_pos[:, 3:6] - ctx.dof_pos[:, 6:9] mirror = 0.5 * (np.sum(np.square(fr_rl), axis=1) + np.sum(np.square(fl_rr), axis=1)) return np.asarray(mirror, dtype=get_global_dtype())
registry.register_env("Go2WJoystickFlat", Go2WJoystickEnv, sim_backend="motrix")