MultiProcessingReadingService¶
- class torchdata.dataloader2.MultiProcessingReadingService(num_workers: int = 0, multiprocessing_context: Optional[str] = None, worker_prefetch_cnt: int = 10, main_prefetch_cnt: int = 10, worker_init_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo], Union[IterDataPipe, MapDataPipe]]] = None, worker_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo, SeedGenerator], Union[IterDataPipe, MapDataPipe]]] = None)¶
Spawns multiple worker processes to load data from the
DataPipe
graph. If any non-replicableDataPipe
(sharding_round_robin_dispatch
) is presented in the graph, a separate dispatching process will be created to load data from the lowest common ancestor of all non-replicableDataPipes
and distributes data to each worker process in the round-robin manner Then, the subsequentDataPipe
graph in each worker process will process the data from the dispatching process and eventually return the result to the main process.- Parameters:
num_workers (int) – How many subprocesses to use for data loading.
multiprocessing_context (str, optional) – Multiprocessing starting method. If method is None then the default context is returned. Otherwise, method should be ‘fork’, ‘spawn’.
worker_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of each worker process.
main_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of the whole pipeline in the main process.
worker_init_fn – (Callable, optional): Function to be called when each worker process launches with
DataPipe
andWorkerInfo
as the expected arguments.worker_reset_fn – (Callable, optional): Function to be called at the beginning of each epoch in each worker process with
DataPipe
,WorkerInfo
andSeedGenerator
as the expected arguments.
- finalize() None ¶
MultiProcessingReadingService
invalidate states & properly exits all subprocesses.
- initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
MultiProcessingReadingService
finds information about sharding, separates graph by multiple pieces and reconnects it using queues. creates subprocesses.
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
spins up service for an epoch. Called at the beginning of every time gettingDataLoader2
iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will govern the determinism for all of random operations with the graph of DataPipes.
iter_reset_fn – Optional reset function from the prior
ReadingServcie
whenSequentialReadingService
chains multipleReadingServices
- Returns:
A new
iter_reset_fn
to be used by subseqeuentReadingService
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.