Source code for unilab.terrains.terrain_generator

from __future__ import annotations

import abc
from dataclasses import dataclass, field
from pathlib import Path

import numpy as np

_BORDER_BASE_THICKNESS = 0.05


[docs] @dataclass class FlatPatchSamplingCfg: """Configuration for sampling flat patches on a heightfield surface.""" num_patches: int = 10 """Number of flat patches to sample per sub-terrain.""" patch_radius: float = 0.5 """Radius of the circular footprint used to test flatness, in meters.""" max_height_diff: float = 0.05 """Maximum allowed height variation within the patch footprint, in meters.""" x_range: tuple[float, float] = (-1e6, 1e6) """Allowed range of x coordinates for sampled patches, in meters.""" y_range: tuple[float, float] = (-1e6, 1e6) """Allowed range of y coordinates for sampled patches, in meters.""" z_range: tuple[float, float] = (-1e6, 1e6) """Allowed range of z coordinates (world height) for sampled patches, in meters.""" grid_resolution: float | None = None """Resolution of the grid used for flat-patch detection, in meters. When ``None`` (default), the terrain's own ``horizontal_scale`` is used. Set to a smaller value (e.g. 0.025) for finer boundary precision at the cost of a larger intermediate grid."""
[docs] @dataclass class TerrainHeightField: """Backend-agnostic heightfield data for one sub-terrain patch.""" noise: np.ndarray """Quantized height units before normalization.""" size: tuple[float, float] """Patch size as ``(x, y)`` in meters.""" horizontal_scale: float vertical_scale: float elevation_min: int elevation_max: int max_physical_height: float base_thickness: float z_offset: float
[docs] def physical_heights_xy(self) -> np.ndarray: """Return world-space surface heights as an ``(x, y)`` matrix.""" return ( self.noise.astype(np.float64) - self.elevation_min ) * self.vertical_scale + self.z_offset
[docs] def normalized_elevation(self) -> np.ndarray: """Return normalized hfield values in ``[0, 1]``.""" elevation_range = self.elevation_max - self.elevation_min if elevation_range <= 0: return np.zeros_like(self.noise, dtype=np.float64) return (self.noise.astype(np.float64) - self.elevation_min) / elevation_range
[docs] @dataclass class TerrainOutput: origin: np.ndarray """Spawn origin position (x, y, z) in the sub-terrain's local frame.""" heightfield: TerrainHeightField """Backend-agnostic heightfield data.""" flat_patches: dict[str, np.ndarray] | None = None """Named sets of flat patch positions, each an (N, 3) array. None if not configured."""
[docs] @dataclass class GeneratedTerrain: """Merged terrain heightfield ready to be exported as a single PNG asset.""" heights_yx: np.ndarray """World-space surface heights in image convention: rows=y, cols=x.""" horizontal_scale: float z_min: float z_max: float base_thickness: float terrain_origins: np.ndarray @property def size(self) -> tuple[float, float]: rows_y, cols_x = self.heights_yx.shape return (cols_x * self.horizontal_scale, rows_y * self.horizontal_scale) @property def height_extent(self) -> float: return max(self.z_max - self.z_min, self.horizontal_scale * 0.02) @property def hfield_size(self) -> tuple[float, float, float, float]: size_x, size_y = self.size return (size_x / 2, size_y / 2, self.height_extent, self.base_thickness) @property def geom_pos(self) -> tuple[float, float, float]: return (0.0, 0.0, self.z_min)
[docs] def to_uint16(self) -> np.ndarray: span = self.z_max - self.z_min if span <= 0.0: return np.zeros_like(self.heights_yx, dtype=np.uint16) normalized = (self.heights_yx - self.z_min) / span return np.rint(np.clip(normalized, 0.0, 1.0) * np.iinfo(np.uint16).max).astype(np.uint16)
[docs] def surface_sampler(self) -> "HeightfieldSurfaceSampler": return HeightfieldSurfaceSampler( heights_uint16=self.to_uint16(), horizontal_scale=float(self.horizontal_scale), z_min=float(self.z_min), height_extent=float(self.height_extent), )
[docs] def write_png(self, path: Path) -> None: """Write the merged hfield as a 16-bit grayscale PNG.""" import imageio.v3 as iio path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) iio.imwrite(path, self.to_uint16())
[docs] def hfield_size_xml(self) -> str: return " ".join(f"{value:.9g}" for value in self.hfield_size)
[docs] def geom_pos_xml(self) -> str: return " ".join(f"{value:.9g}" for value in self.geom_pos)
[docs] @dataclass class HeightfieldSurfaceSampler: """Compact world-space sampler for a generated hfield surface.""" heights_uint16: np.ndarray horizontal_scale: float z_min: float height_extent: float @property def size(self) -> tuple[float, float]: rows_y, cols_x = self.heights_uint16.shape return (cols_x * self.horizontal_scale, rows_y * self.horizontal_scale)
[docs] def sample_height(self, xy: np.ndarray) -> np.ndarray: points = np.asarray(xy, dtype=np.float64) if points.ndim < 1 or points.shape[-1] != 2: raise ValueError(f"xy must have shape (..., 2), got {points.shape}") flat = points.reshape(-1, 2) size_x, size_y = self.size rows_y, cols_x = self.heights_uint16.shape cols = np.rint((flat[:, 0] + size_x * 0.5) / self.horizontal_scale).astype(np.intp) rows = np.rint((-flat[:, 1] + size_y * 0.5) / self.horizontal_scale).astype(np.intp) cols = np.clip(cols, 0, cols_x - 1) rows = np.clip(rows, 0, rows_y - 1) raw = self.heights_uint16[rows, cols].astype(np.float64) normalized = raw / float(np.iinfo(np.uint16).max) heights = self.z_min + normalized * self.height_extent return heights.reshape(points.shape[:-1])
[docs] @dataclass class SubTerrainCfg(abc.ABC): proportion: float = 1.0 """Robot spawning weight for this terrain type. In curriculum mode, controls how many robots are spawned on this terrain's column relative to other terrain types. Each terrain type always gets exactly one column; proportion only affects spawning distribution. In random mode, controls the sampling probability for each patch. """ size: tuple[float, float] = (10.0, 10.0) """Width and length of the terrain patch, in meters.""" flat_patch_sampling: dict[str, FlatPatchSamplingCfg] | None = None """Named flat-patch sampling configurations, or None to disable."""
[docs] @abc.abstractmethod def function(self, difficulty: float, rng: np.random.Generator) -> TerrainOutput: """Generate backend-agnostic terrain data. Returns: TerrainOutput containing spawn origin and heightfield data. """ raise NotImplementedError
[docs] @dataclass(kw_only=True) class TerrainGeneratorCfg: seed: int | None = None """Random seed for terrain generation. None uses a random seed.""" curriculum: bool = False """Controls terrain allocation mode: - curriculum=True: Each terrain type gets exactly ONE column. The generator uses ``len(sub_terrains)`` columns regardless of ``num_cols``. Difficulty increases along rows. The ``proportion`` field controls how many robots are spawned per column, not column count. - curriculum=False: Every patch is randomly sampled from all terrain types. Proportions control sampling probability. Use this for random variety. """ size: tuple[float, float] """Width and length of each sub-terrain patch, in meters. Both components must be integer multiples of ``horizontal_scale``.""" horizontal_scale: float = 0.05 """Heightfield grid resolution along x and y, in meters per cell. Shared by every sub-terrain (overwritten in :class:`TerrainGenerator` ``__init__``). All length-like sub-terrain parameters (step_width, platform_width, border_width, etc.) must be integer multiples of this value.""" vertical_scale: float = 0.005 """Heightfield height resolution, in meters per integer unit of the noise array. Shared by every sub-terrain (overwritten in :class:`TerrainGenerator` ``__init__``).""" border_width: float = 0.0 """Width of the flat border around the entire terrain grid, in meters. Must be an integer multiple of ``horizontal_scale`` if non-zero. The border is a flat hfield slab whose top surface is flush with the inner-terrain floor at z=0; it is NOT a wall.""" num_rows: int = 1 """Number of sub-terrain rows in the grid. Represents difficulty levels in curriculum mode. Note: Environments are randomly assigned to rows, so multiple envs can share the same patch.""" num_cols: int = 1 """Number of sub-terrain columns in the grid. In curriculum mode the generator ignores this value and uses one column per terrain type (``len(sub_terrains)``). In random mode it is used as-is.""" sub_terrains: dict[str, SubTerrainCfg] = field(default_factory=dict) """Named sub-terrain configurations to populate the grid.""" difficulty_range: tuple[float, float] = (0.0, 1.0) """Min and max difficulty values used when generating sub-terrains.""" add_lights: bool = False """If True, adds a directional light above the terrain grid."""
[docs] class TerrainGenerator: """Generates procedural terrain grids with configurable difficulty. Creates a grid of terrain patches where each patch can be a different terrain type. Supports two modes: - **Random mode** (curriculum=False): Every patch independently samples a terrain type weighted by proportions. Results in random variety across all patches. - **Curriculum mode** (curriculum=True): Each terrain type gets exactly one column (the generator uses ``len(sub_terrains)`` columns regardless of ``num_cols``). Difficulty increases along rows. The ``proportion`` field controls robot spawning distribution, not column count. Terrain types are weighted by proportion and their geometry is generated based on a difficulty value in the configured range. The grid is centered at the world origin. A border can be added around the entire grid along with optional overhead lighting. """
[docs] def __init__(self, cfg: TerrainGeneratorCfg, device: str = "cpu") -> None: if len(cfg.sub_terrains) == 0: raise ValueError("At least one sub_terrain must be specified.") self.cfg = cfg self.device = device # In curriculum mode, one column per terrain type. if self.cfg.curriculum: self._num_cols = len(self.cfg.sub_terrains) else: self._num_cols = self.cfg.num_cols for sub_cfg in self.cfg.sub_terrains.values(): sub_cfg.size = self.cfg.size self._propagate_resolution() self._validate_resolution() if self.cfg.seed is not None: seed = self.cfg.seed else: seed = np.random.randint(0, 10000) self.np_rng = np.random.default_rng(seed) self.terrain_origins = np.zeros((self.cfg.num_rows, self._num_cols, 3)) # Pre-allocate flat patch storage by scanning all sub-terrain configs. self.flat_patches: dict[str, np.ndarray] = {} self.flat_patch_radii: dict[str, float] = {} patch_names: dict[str, int] = {} for sub_cfg in self.cfg.sub_terrains.values(): if sub_cfg.flat_patch_sampling is not None: for name, patch_cfg in sub_cfg.flat_patch_sampling.items(): if name in patch_names: patch_names[name] = max(patch_names[name], patch_cfg.num_patches) else: patch_names[name] = patch_cfg.num_patches self.flat_patch_radii[name] = max( self.flat_patch_radii.get(name, 0.0), patch_cfg.patch_radius ) for name, max_num_patches in patch_names.items(): self.flat_patches[name] = np.zeros( (self.cfg.num_rows, self._num_cols, max_num_patches, 3) )
[docs] def generate(self) -> GeneratedTerrain: """Generate the full terrain as one backend-agnostic merged hfield.""" tile_x_px = int(round(self.cfg.size[0] / self.cfg.horizontal_scale)) tile_y_px = int(round(self.cfg.size[1] / self.cfg.horizontal_scale)) border_px = int(round(self.cfg.border_width / self.cfg.horizontal_scale)) rows_y = self._num_cols * tile_y_px + 2 * border_px cols_x = self.cfg.num_rows * tile_x_px + 2 * border_px heights_yx = np.zeros((rows_y, cols_x), dtype=np.float64) max_base_thickness = _BORDER_BASE_THICKNESS if self.cfg.border_width > 0.0 else 0.0 self.terrain_origins.fill(0.0) def place_output(output: TerrainOutput, sub_row: int, sub_col: int) -> None: nonlocal max_base_thickness patch_heights_xy = output.heightfield.physical_heights_xy() if patch_heights_xy.shape != (tile_x_px, tile_y_px): raise ValueError( "Sub-terrain heightfield shape does not match TerrainGeneratorCfg.size: " f"{patch_heights_xy.shape} != {(tile_x_px, tile_y_px)}" ) x0 = border_px + sub_row * tile_x_px y0 = border_px + (self._num_cols - 1 - sub_col) * tile_y_px heights_yx[y0 : y0 + tile_y_px, x0 : x0 + tile_x_px] = patch_heights_xy.T max_base_thickness = max(max_base_thickness, output.heightfield.base_thickness) world_position = self._get_sub_terrain_position(sub_row, sub_col) spawn_origin = output.origin + world_position self.terrain_origins[sub_row, sub_col] = spawn_origin for name, arr in self.flat_patches.items(): if output.flat_patches is not None and name in output.flat_patches: patches = output.flat_patches[name] arr[sub_row, sub_col, : len(patches)] = patches + world_position arr[sub_row, sub_col, len(patches) :] = spawn_origin else: arr[sub_row, sub_col] = spawn_origin if self.cfg.curriculum: sub_terrains_cfgs = list(self.cfg.sub_terrains.values()) for sub_col in range(self._num_cols): for sub_row in range(self.cfg.num_rows): lower, upper = self.cfg.difficulty_range difficulty = (sub_row + self.np_rng.uniform()) / self.cfg.num_rows difficulty = lower + (upper - lower) * difficulty output = sub_terrains_cfgs[sub_col].function(difficulty, self.np_rng) place_output(output, sub_row, sub_col) else: proportions = np.array( [sub_cfg.proportion for sub_cfg in self.cfg.sub_terrains.values()] ) proportions /= np.sum(proportions) sub_terrains_cfgs = list(self.cfg.sub_terrains.values()) for index in range(self.cfg.num_rows * self._num_cols): sub_row, sub_col = np.unravel_index(index, (self.cfg.num_rows, self._num_cols)) sub_row = int(sub_row) sub_col = int(sub_col) sub_index = self.np_rng.choice(len(proportions), p=proportions) difficulty = self.np_rng.uniform(*self.cfg.difficulty_range) output = sub_terrains_cfgs[sub_index].function(difficulty, self.np_rng) place_output(output, sub_row, sub_col) z_min = float(np.min(heights_yx)) z_max = float(np.max(heights_yx)) return GeneratedTerrain( heights_yx=heights_yx, horizontal_scale=self.cfg.horizontal_scale, z_min=z_min, z_max=z_max, base_thickness=max(max_base_thickness, 1e-3), terrain_origins=self.terrain_origins.copy(), )
[docs] def write_png(self, path: Path) -> GeneratedTerrain: terrain = self.generate() terrain.write_png(path) return terrain
def _get_sub_terrain_position(self, row: int, col: int) -> np.ndarray: """Get the world position for a sub-terrain at the given grid indices. This returns the position of the sub-terrain's corner (not center). The entire grid is centered at the world origin. """ # Calculate position relative to grid corner. rel_x = row * self.cfg.size[0] rel_y = col * self.cfg.size[1] # Offset to center the entire grid at world origin. grid_offset_x = -self.cfg.num_rows * self.cfg.size[0] * 0.5 grid_offset_y = -self._num_cols * self.cfg.size[1] * 0.5 return np.array([grid_offset_x + rel_x, grid_offset_y + rel_y, 0.0]) def _propagate_resolution(self) -> None: """Force every sub-terrain config to share the generator's resolution.""" hs = self.cfg.horizontal_scale vs = self.cfg.vertical_scale for sub_cfg in self.cfg.sub_terrains.values(): if hasattr(sub_cfg, "horizontal_scale"): setattr(sub_cfg, "horizontal_scale", hs) if hasattr(sub_cfg, "vertical_scale"): setattr(sub_cfg, "vertical_scale", vs) def _validate_resolution(self) -> None: """Check that all length-like config values divide evenly by horizontal_scale.""" hs = self.cfg.horizontal_scale if hs <= 0: raise ValueError(f"horizontal_scale must be positive, got {hs}.") if self.cfg.vertical_scale <= 0: raise ValueError(f"vertical_scale must be positive, got {self.cfg.vertical_scale}.") def _is_multiple(value: float) -> bool: return abs(round(value / hs) * hs - value) <= 1e-9 for axis, sz in zip(("size[0]", "size[1]"), self.cfg.size): if not _is_multiple(sz): raise ValueError( f"TerrainGeneratorCfg.{axis}={sz} must be an integer multiple of " f"horizontal_scale={hs}." ) if self.cfg.border_width > 0 and not _is_multiple(self.cfg.border_width): raise ValueError( f"TerrainGeneratorCfg.border_width={self.cfg.border_width} must be " f"an integer multiple of horizontal_scale={hs}." ) for name, sub_cfg in self.cfg.sub_terrains.items(): for fld in ("step_width", "platform_width", "border_width"): if not hasattr(sub_cfg, fld): continue value = getattr(sub_cfg, fld) if value is None or value == 0: continue if not _is_multiple(value): raise ValueError( f"Sub-terrain '{name}' field '{fld}'={value} must be an " f"integer multiple of horizontal_scale={hs}." )