unilab.ipc.rollout_ring_buffer.RolloutRingBuffer

class unilab.ipc.rollout_ring_buffer.RolloutRingBuffer[source]

Bases: object

N-slot shared-memory ring buffer for raw rollout payloads.

Parameters:

Methods

__init__(num_envs, num_steps, obs_dim, ...)

advance_read()

attach_sync_primitives(write_ptr, read_ptr)

available()

cleanup()

close()

copy_read_slot_to_torch(destination)

read_numpy_views()

Return shared-memory views for the current read slot.

read_torch(device)

signal_write_done()

wait_for_data([timeout])

Attributes

__init__(num_envs, num_steps, obs_dim, action_dim, *, critic_dim=0, num_slots=4, create=True, shm_name_prefix=None)[source]
Parameters:
property name: Dict[str, str]
property slot_shapes: Dict[str, tuple[int, ...]]
attach_sync_primitives(write_ptr, read_ptr)[source]
Return type:

None

property write_slot: int
property write_buffer: Dict[str, ndarray]
signal_write_done()[source]
Return type:

None

available()[source]
Return type:

int

wait_for_data(timeout=60.0)[source]
Parameters:

timeout (float)

Return type:

bool

property read_slot: int
read_numpy_views()[source]

Return shared-memory views for the current read slot.

The returned arrays are borrowed views. Consumers must copy them into owned storage before calling advance_read().

Return type:

dict[str, ndarray]

copy_read_slot_to_torch(destination)[source]
Parameters:

destination (dict)

Return type:

None

read_torch(device)[source]
Parameters:

device (str)

Return type:

dict

advance_read()[source]
Return type:

None

cleanup()[source]
Return type:

None

close()[source]
Return type:

None