unilab.ipc

IPC primitives for multi-process RL training.

class unilab.ipc.SharedWeightSync[source]

Bases: object

Synchronize actor weights between learner and collector.

Parameters:
__init__(param_shapes, *, create=True, shm_name=None, lock=None)[source]
Parameters:
property name: str
property version: int
classmethod from_state_dict(state_dict, **kwargs)[source]
write_weights(state_dict)[source]
Return type:

None

read_weights_into(state_dict)[source]
Return type:

int

cleanup()[source]
Return type:

None

close()[source]
Return type:

None

class unilab.ipc.RolloutRingBuffer[source]

Bases: object

N-slot shared-memory ring buffer for raw rollout payloads.

Parameters:
__init__(num_envs, num_steps, obs_dim, action_dim, *, critic_dim=0, num_slots=4, create=True, shm_name_prefix=None)[source]
Parameters:
property name: Dict[str, str]
property slot_shapes: Dict[str, tuple[int, ...]]
attach_sync_primitives(write_ptr, read_ptr)[source]
Return type:

None

property write_slot: int
property write_buffer: Dict[str, ndarray]
signal_write_done()[source]
Return type:

None

available()[source]
Return type:

int

wait_for_data(timeout=60.0)[source]
Parameters:

timeout (float)

Return type:

bool

property read_slot: int
read_numpy_views()[source]

Return shared-memory views for the current read slot.

The returned arrays are borrowed views. Consumers must copy them into owned storage before calling advance_read().

Return type:

dict[str, ndarray]

copy_read_slot_to_torch(destination)[source]
Parameters:

destination (dict)

Return type:

None

read_torch(device)[source]
Parameters:

device (str)

Return type:

dict

advance_read()[source]
Return type:

None

cleanup()[source]
Return type:

None

close()[source]
Return type:

None

class unilab.ipc.AsyncRunner[source]

Bases: ABC

Base class for async RL algorithms.

Manages: - Shared memory allocation/cleanup - Collector process lifecycle - Error propagation from collector subprocess - Training loop skeleton

Parameters:
__init__(env_name, env_cfg_overrides, rl_cfg, *, device=None, collector_device=None, sim_backend='mujoco', num_envs=4096)[source]
Parameters:
abstract learn(max_iterations, save_interval=50, log_dir='logs')[source]
Parameters:
  • max_iterations (int)

  • save_interval (int)

  • log_dir (str)

Return type:

None

close()[source]
Return type:

None

class unilab.ipc.SharedObsNormStats[source]

Bases: object

Synchronize observation normalization statistics between learner and collector.

Uses a queue to pass (mean, std) tuples from learner to collector.

__init__(ctx)[source]
put(stats)[source]

Put new stats, clearing old ones first.

get()[source]

Get latest stats, returns None if no new stats.

class unilab.ipc.ReplayBuffer[source]

Bases: SharedBufferBase

Shared replay buffer backed by authoritative packed CPU storage.

Device transfer is owned by replay pipeline transfer backends. The fallback sample() path copies a sampled packed batch to self.device and keeps no per-device replay cache.

Parameters:
  • capacity (int)

  • obs_dim (int)

  • action_dim (int)

  • device (str)

  • defer_gpu (bool)

  • critic_dim (int)

  • packed_cpu_storage (bool)

__init__(capacity, obs_dim, action_dim, device, defer_gpu=False, critic_dim=0, packed_cpu_storage=False)[source]
Parameters:
  • capacity (int)

  • obs_dim (int)

  • action_dim (int)

  • device (str)

  • defer_gpu (bool)

  • critic_dim (int)

  • packed_cpu_storage (bool)

add(obs, actions, rewards, next_obs, dones, truncated, terminal_mask=None, terminal_next_obs=None, critic=None, next_critic=None, terminal_next_critic=None)[source]

Add batch (called by collector).

dones follows the UniLab env lifecycle contract: done = terminated | truncated. Learners must pair it with truncated when computing bootstrap masks.

sample(batch_size)[source]

Sample batch (called by learner).

Parameters:

batch_size (int)

Return type:

Dict[str, Tensor]

Modules

async_runner

Base async runner for multi-process RL training.

collector_error

Cross-process error propagation for collector subprocesses.

memory_budget

Memory budget estimation for async RL training buffers.

replay_buffer

Packed shared-memory replay buffer for off-policy RL.

replay_pipelines

Replay pipeline abstraction.

rollout_ring_buffer

Shared rollout IPC ring buffer for APPO / async PPO.

shared_buffer

Base class for device-adaptive shared memory buffers.

shared_obs_stats

Shared observation normalization statistics for multi-process training.

weight_sync

Shared weight synchronization for actor networks.