Source code for unilab.dr.dr_utils

from __future__ import annotations

from typing import Any

import numpy as np

from unilab.dr.types import (
    DomainRandomizationCapabilities,
    IntervalRandomizationPlan,
    ResetRandomizationPayload,
)
from unilab.dtype_config import get_global_dtype


[docs] def zero_actions(num_reset: int, num_action: int) -> np.ndarray: return np.zeros((num_reset, num_action), dtype=get_global_dtype())
def _coerce_range(name: str, values: Any) -> tuple[float, float]: bounds = np.asarray(values, dtype=np.float64) if bounds.shape != (2,): raise ValueError(f"domain_rand.{name} must have shape (2,), got {bounds.shape}") low = float(bounds[0]) high = float(bounds[1]) if high < low: raise ValueError(f"domain_rand.{name} high must be >= low") return low, high
[docs] def build_common_reset_randomization( env: Any, num_reset: int, *, base_kp: np.ndarray | None = None, base_kd: np.ndarray | None = None, base_body_mass: np.ndarray | None = None, base_geom_friction: np.ndarray | None = None, ground_geom_id: int | None = None, base_dof_armature: np.ndarray | None = None, ) -> ResetRandomizationPayload | None: domain_rand = getattr(env.cfg, "domain_rand", None) if domain_rand is None: return None payload = ResetRandomizationPayload() if getattr(domain_rand, "randomize_base_mass", False): low, high = domain_rand.added_mass_range payload.base_mass_delta = np.random.uniform(low, high, size=(num_reset,)) if getattr(domain_rand, "randomize_body_mass", False): if base_body_mass is None: raise ValueError("body mass randomization requires a cached base body-mass table") body_mass_template = np.asarray(base_body_mass, dtype=np.float64) if body_mass_template.ndim != 1: raise ValueError( f"base_body_mass must have shape (nbody,), got {body_mass_template.shape}" ) low, high = _coerce_range( "body_mass_multiplier_range", domain_rand.body_mass_multiplier_range ) multipliers = np.random.uniform( low=low, high=high, size=(num_reset, body_mass_template.size) ) body_mass = np.broadcast_to(body_mass_template, multipliers.shape).copy() randomized = body_mass_template > 0.0 body_mass[:, randomized] *= multipliers[:, randomized] payload.body_mass = body_mass if getattr(domain_rand, "random_com", False): base_com_offset = np.zeros((num_reset, 3), dtype=np.float64) low, high = domain_rand.com_offset_x base_com_offset[:, 0] = np.random.uniform(low, high, size=(num_reset,)) com_offset_y = getattr(domain_rand, "com_offset_y", None) if com_offset_y is not None: low, high = com_offset_y base_com_offset[:, 1] = np.random.uniform(low, high, size=(num_reset,)) com_offset_z = getattr(domain_rand, "com_offset_z", None) if com_offset_z is not None: low, high = com_offset_z base_com_offset[:, 2] = np.random.uniform(low, high, size=(num_reset,)) payload.base_com_offset = base_com_offset if getattr(domain_rand, "randomize_gravity", False): gravity_range = np.asarray(domain_rand.gravity_range, dtype=np.float64) if gravity_range.shape != (2, 3): raise ValueError( f"domain_rand.gravity_range must have shape (2, 3), got {gravity_range.shape}" ) low = np.minimum(gravity_range[0], gravity_range[1]) high = np.maximum(gravity_range[0], gravity_range[1]) payload.gravity = np.random.uniform(low=low, high=high, size=(num_reset, 3)) if getattr(domain_rand, "randomize_ground_friction", False): if base_geom_friction is None or ground_geom_id is None: raise ValueError( "ground friction randomization requires cached geom friction and ground geom id" ) geom_friction_template = np.asarray(base_geom_friction, dtype=np.float64) if geom_friction_template.ndim != 2 or geom_friction_template.shape[1] != 3: raise ValueError( f"base_geom_friction must have shape (ngeom, 3), got {geom_friction_template.shape}" ) ground_id = int(ground_geom_id) if ground_id < 0 or ground_id >= geom_friction_template.shape[0]: raise ValueError( f"ground_geom_id must be in [0, {geom_friction_template.shape[0]}), got {ground_id}" ) low, high = _coerce_range( "ground_friction_multiplier_range", domain_rand.ground_friction_multiplier_range, ) geom_friction = np.broadcast_to( geom_friction_template, (num_reset, *geom_friction_template.shape) ).copy() geom_friction[:, ground_id, 0] = geom_friction_template[ground_id, 0] * np.random.uniform( low=low, high=high, size=(num_reset,) ) payload.geom_friction = geom_friction if getattr(domain_rand, "randomize_dof_armature", False): if base_dof_armature is None: raise ValueError("dof armature randomization requires a cached dof-armature table") dof_armature_template = np.asarray(base_dof_armature, dtype=np.float64) if dof_armature_template.ndim != 1: raise ValueError( f"base_dof_armature must have shape (nv,), got {dof_armature_template.shape}" ) low, high = _coerce_range( "dof_armature_multiplier_range", domain_rand.dof_armature_multiplier_range ) dof_armature = np.broadcast_to( dof_armature_template, (num_reset, dof_armature_template.size) ).copy() randomized = dof_armature_template > 0.0 dof_armature[:, randomized] *= np.random.uniform( low=low, high=high, size=(num_reset, int(np.count_nonzero(randomized))) ) payload.dof_armature = dof_armature num_actuators = getattr(env, "_num_action", None) need_kp = num_actuators is not None and getattr(domain_rand, "randomize_kp", False) need_kd = num_actuators is not None and getattr(domain_rand, "randomize_kd", False) if need_kp or need_kd: assert num_actuators is not None if need_kp: kp = ( base_kp if base_kp is not None else np.full(num_actuators, float(env.cfg.control_config.Kp)) ) low, high = domain_rand.kp_multiplier_range payload.kp = (kp * np.random.uniform(low, high, (num_reset, 1))).astype(np.float64) if need_kd: kd = ( base_kd if base_kd is not None else np.full(num_actuators, float(env.cfg.control_config.Kd)) ) low, high = domain_rand.kd_multiplier_range payload.kd = (kd * np.random.uniform(low, high, (num_reset, 1))).astype(np.float64) return None if payload.is_empty() else payload
[docs] def validate_common_reset_randomization( env: Any, capabilities: DomainRandomizationCapabilities, *, base_kp: np.ndarray | None = None, base_kd: np.ndarray | None = None, base_body_mass: np.ndarray | None = None, base_geom_friction: np.ndarray | None = None, ground_geom_id: int | None = None, base_dof_armature: np.ndarray | None = None, ) -> frozenset[str]: payload = build_common_reset_randomization( env, num_reset=1, base_kp=base_kp, base_kd=base_kd, base_body_mass=base_body_mass, base_geom_friction=base_geom_friction, ground_geom_id=ground_geom_id, base_dof_armature=base_dof_armature, ) if payload is None: return frozenset() return capabilities.get_unsupported_reset_terms(payload.requested_terms())
[docs] def build_interval_push_plan(env: Any, step_counter: int) -> IntervalRandomizationPlan | None: domain_rand = getattr(env.cfg, "domain_rand", None) if domain_rand is None or not getattr(domain_rand, "push_robots", False): return None if step_counter % domain_rand.push_interval != 0: return None return IntervalRandomizationPlan(push_perturbation_limit=domain_rand.max_force)
[docs] def validate_interval_push_support(env: Any, capabilities: DomainRandomizationCapabilities) -> None: domain_rand = getattr(env.cfg, "domain_rand", None) if domain_rand is None or not getattr(domain_rand, "push_robots", False): return if not capabilities.supports_interval_push: raise NotImplementedError( f"{env._backend.backend_type} backend does not support interval push" ) force_limit = np.asarray(domain_rand.max_force, dtype=np.float64) if force_limit.shape != (3,): raise ValueError(f"domain_rand.max_force must have shape (3,), got {force_limit.shape}")