Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on
continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from
the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release
torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing
users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent
releases will not include DataPipes or DataLoaderV2.
Please reach out if you suggestions or comments (please use this issue for feedback)
ShardingRoundRobinDispatcher
-
class torchdata.datapipes.iter.ShardingRoundRobinDispatcher(source_datapipe: IterDataPipe, sharding_group_filter: Optional[SHARDING_PRIORITIES] = None)
Wrapper that indicates the prior section of DataPipe
graph is non-replicable and will be
iterated in a separate, single dispatching process to distribute data to worker processes
in a round-robin manner when multiprocessing is being used.
(functional name: sharding_round_robin_dispatch
).
- Parameters:
-
Note
sharding_group_filter
only accepts SHARDING_PRIORITIES.MULTIPROCESSING
for now
When using distributed training, you can add a sharding_filter()
prior to this DataPipe
to distribute samples among worker nodes.
Examples
>>> # xdoctest: +SKIP
>>> from torchdata.datapipes.iter import IterableWrapper
>>> from torch.utils.data.datapipes.iter.sharding import SHARDING_PRIORITIES
>>> dp = IterableWrapper(range(10))
>>> # `.shuffle()` will be executed in a single dispatching processing, then the samples are distributed
>>> # to worker processes
>>> dp = dp.shuffle().sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)
>>> # `.map()` will be executed within each worker process
>>> dp = dp.map(lambda x: x + 1)
>>> # Distributed case: the 10 samples will be distributed among the nodes
>>> dp = IterableWrapper(range(10)).sharding_filter()
>>> # `.map()` will be executed in a single dispatching processing in each node
>>> # You may apply further transformation after within each worker process
>>> dp = dp.map(lambda x: x + 1).sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)