Shortcuts

MultiProcessingReadingService

class torchdata.dataloader2.MultiProcessingReadingService(num_workers: int = 0, multiprocessing_context: Optional[str] = None, worker_prefetch_cnt: int = 10, main_prefetch_cnt: int = 10, worker_init_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo], Union[IterDataPipe, MapDataPipe]]] = None, worker_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo, SeedGenerator], Union[IterDataPipe, MapDataPipe]]] = None)

Spawns multiple worker processes to load data from the DataPipe graph. If any non-replicable DataPipe (sharding_round_robin_dispatch) is presented in the graph, a separate dispatching process will be created to load data from the lowest common ancestor of all non-replicable DataPipes and distributes data to each worker process in the round-robin manner Then, the subsequent DataPipe graph in each worker process will process the data from the dispatching process and eventually return the result to the main process.

Parameters:
  • num_workers (int) – How many subprocesses to use for data loading.

  • multiprocessing_context (str, optional) – Multiprocessing starting method. If method is None then the default context is returned. Otherwise, method should be ‘fork’, ‘spawn’.

  • worker_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of each worker process.

  • main_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of the whole pipeline in the main process.

  • worker_init_fn – (Callable, optional): Function to be called when each worker process launches with DataPipe and WorkerInfo as the expected arguments.

  • worker_reset_fn – (Callable, optional): Function to be called at the beginning of each epoch in each worker process with DataPipe, WorkerInfo and SeedGenerator as the expected arguments.

finalize() None

MultiProcessingReadingService invalidate states & properly exits all subprocesses.

initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

MultiProcessingReadingService finds information about sharding, separates graph by multiple pieces and reconnects it using queues. creates subprocesses.

initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]

ReadingService spins up service for an epoch. Called at the beginning of every time getting DataLoader2 iterator.

Parameters:
  • seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will govern the determinism for all of random operations with the graph of DataPipes.

  • iter_reset_fn – Optional reset function from the prior ReadingServcie when SequentialReadingService chains multiple ReadingServices

Returns:

A new iter_reset_fn to be used by subseqeuent ReadingService

Example

MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources