unilab.ipc.replay_pipelines.cpu_pinned_double_buffer

Double-buffer replay pipeline for packed CPU replay samples.

CUDA uses the original CPU-pinned shared slot + native async H2D fast path. Non-CUDA devices use the same collector-thread packing contract and a portable torch copy into the device batch slots.

Classes

CPUPinnedDoubleBufferReplayPipeline

Double-buffered packed replay batch pipeline.

class unilab.ipc.replay_pipelines.cpu_pinned_double_buffer.CPUPinnedDoubleBufferReplayPipeline[source]

Bases: object

Double-buffered packed replay batch pipeline.

CUDA keeps the pinned-host → GPU fast path. MPS/CPU keep the same collector-thread pack and hot/cold batch contract with a portable torch copy into the learner device slot.

Parameters:
__init__(replay_buffer, *, device, sample_count, base_seed=0, trace_recorder=None, trace_cuda_events=True, verbose=False, verbose_output_dir=None, collector_pack_request_queue=None, collector_pack_ready_queue=None, collector_pack_shared_slots=None)[source]
Parameters:
property h2d_submitter: str
property transfer_manifest: dict[str, object]
start_prepare(tick_id, sample_count, min_snapshot_ptr=None)[source]

Start CPU pack + device transfer for the current cold slot.

Returns True when this call launches new work. If the same tick is already pending or prepared, returns False.

Parameters:
Return type:

bool

batch_ready(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

bool

wait_ready()[source]
Return type:

None

wait_until_ready(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

bool

sample_large_batch(tick_id, sample_count)[source]
Parameters:
  • tick_id (int)

  • sample_count (int)

Return type:

Dict[str, Tensor]

after_tick()[source]
Return type:

None

close()[source]
Return type:

None