unilab.ipc.replay_pipelines

Replay pipeline abstraction.

class unilab.ipc.replay_pipelines.ReplayPipeline[source]

Bases: Protocol

start_prepare(tick_id, sample_count, min_snapshot_ptr=None)[source]
Parameters:
Return type:

bool

batch_ready(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

bool

wait_ready()[source]
Return type:

None

wait_until_ready(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

bool

sample_large_batch(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

Dict[str, Tensor]

after_tick()[source]
Return type:

None

close()[source]
Return type:

None

__init__(*args, **kwargs)
class unilab.ipc.replay_pipelines.ReplayTickMetadata[source]

Bases: object

ReplayTickMetadata(tick_id: ‘int’, snapshot_ptr: ‘int’, snapshot_size: ‘int’, sample_seed: ‘int’, sample_count: ‘int’, batch_host_slot: ‘int | None’ = None, batch_gpu_slot: ‘int | None’ = None)

Parameters:
  • tick_id (int)

  • snapshot_ptr (int)

  • snapshot_size (int)

  • sample_seed (int)

  • sample_count (int)

  • batch_host_slot (int | None)

  • batch_gpu_slot (int | None)

tick_id: int
snapshot_ptr: int
snapshot_size: int
sample_seed: int
sample_count: int
batch_host_slot: int | None = None
batch_gpu_slot: int | None = None
__init__(tick_id, snapshot_ptr, snapshot_size, sample_seed, sample_count, batch_host_slot=None, batch_gpu_slot=None)
Parameters:
  • tick_id (int)

  • snapshot_ptr (int)

  • snapshot_size (int)

  • sample_seed (int)

  • sample_count (int)

  • batch_host_slot (int | None)

  • batch_gpu_slot (int | None)

class unilab.ipc.replay_pipelines.CPUPinnedDoubleBufferReplayPipeline[source]

Bases: object

Double-buffered packed replay batch pipeline.

CUDA keeps the pinned-host → GPU fast path. MPS/CPU keep the same collector-thread pack and hot/cold batch contract with a portable torch copy into the learner device slot.

Parameters:
__init__(replay_buffer, *, device, sample_count, base_seed=0, trace_recorder=None, trace_cuda_events=True, verbose=False, verbose_output_dir=None, collector_pack_request_queue=None, collector_pack_ready_queue=None, collector_pack_shared_slots=None)[source]
Parameters:
property h2d_submitter: str
property transfer_manifest: dict[str, object]
start_prepare(tick_id, sample_count, min_snapshot_ptr=None)[source]

Start CPU pack + device transfer for the current cold slot.

Returns True when this call launches new work. If the same tick is already pending or prepared, returns False.

Parameters:
Return type:

bool

batch_ready(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

bool

wait_ready()[source]
Return type:

None

wait_until_ready(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

bool

sample_large_batch(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

Dict[str, Tensor]

after_tick()[source]
Return type:

None

close()[source]
Return type:

None

Modules

base

Base types for replay pipeline abstraction.

cpu_pinned_double_buffer

Double-buffer replay pipeline for packed CPU replay samples.

native_h2d

Optional native H2D submit helper with graceful fallback.

transfer

Device transfer backends for replay pipelines.