Shortcuts

Attention

June 2024 Status Update: Removing DataPipes and DataLoader V2

We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)

ShardingRoundRobinDispatcher

class torchdata.datapipes.iter.ShardingRoundRobinDispatcher(source_datapipe: IterDataPipe, sharding_group_filter: Optional[SHARDING_PRIORITIES] = None)

Wrapper that indicates the prior section of DataPipe graph is non-replicable and will be iterated in a separate, single dispatching process to distribute data to worker processes in a round-robin manner when multiprocessing is being used. (functional name: sharding_round_robin_dispatch).

Parameters:
  • source_datapipe – Iterable DataPipe that will be sharded

  • sharding_group_filter – Optional SHARDING_PRIORITIES value

Note

  • sharding_group_filter only accepts SHARDING_PRIORITIES.MULTIPROCESSING for now

  • When using distributed training, you can add a sharding_filter() prior to this DataPipe to distribute samples among worker nodes.

Examples

>>> # xdoctest: +SKIP
>>> from torchdata.datapipes.iter import IterableWrapper
>>> from torch.utils.data.datapipes.iter.sharding import SHARDING_PRIORITIES
>>> dp = IterableWrapper(range(10))
>>> # `.shuffle()` will be executed in a single dispatching processing, then the samples are distributed
>>> # to worker processes
>>> dp = dp.shuffle().sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)
>>> # `.map()` will be executed within each worker process
>>> dp = dp.map(lambda x: x + 1)
>>> # Distributed case: the 10 samples will be distributed among the nodes
>>> dp = IterableWrapper(range(10)).sharding_filter()
>>> # `.map()` will be executed in a single dispatching processing in each node
>>> # You may apply further transformation after within each worker process
>>> dp = dp.map(lambda x: x + 1).sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources