Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
MultiProcessingReadingService¶
- class torchdata.dataloader2.MultiProcessingReadingService(num_workers: int = 0, multiprocessing_context: Optional[str] = None, worker_prefetch_cnt: int = 10, main_prefetch_cnt: int = 10, worker_init_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo], Union[IterDataPipe, MapDataPipe]]] = None, worker_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo, SeedGenerator], Union[IterDataPipe, MapDataPipe]]] = None)¶
Spawns multiple worker processes to load data from the
DataPipe
graph. If any non-replicableDataPipe
(sharding_round_robin_dispatch
) is presented in the graph, a separate dispatching process will be created to load data from the lowest common ancestor of all non-replicableDataPipes
and distributes data to each worker process in the round-robin manner Then, the subsequentDataPipe
graph in each worker process will process the data from the dispatching process and eventually return the result to the main process.- Parameters:
num_workers (int) – How many subprocesses to use for data loading.
multiprocessing_context (str, optional) – Multiprocessing starting method. If method is None then the default context is returned. Otherwise, method should be ‘fork’, ‘spawn’.
worker_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of each worker process.
main_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of the whole pipeline in the main process.
worker_init_fn – (Callable, optional): Function to be called when each worker process launches with
DataPipe
andWorkerInfo
as the expected arguments.worker_reset_fn – (Callable, optional): Function to be called at the beginning of each epoch in each worker process with
DataPipe
,WorkerInfo
andSeedGenerator
as the expected arguments.
- finalize() None ¶
MultiProcessingReadingService
invalidate states & properly exits all subprocesses.
- initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
MultiProcessingReadingService
finds information about sharding, separates graph by multiple pieces and reconnects it using queues. creates subprocesses.
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
spins up service for an epoch. Called at the beginning of every time gettingDataLoader2
iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will govern the determinism for all of random operations with the graph of DataPipes.
iter_reset_fn – Optional reset function from the prior
ReadingServcie
whenSequentialReadingService
chains multipleReadingServices
- Returns:
A new
iter_reset_fn
to be used by subseqeuentReadingService
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.