ShardingRoundRobinDispatcher¶
- class torchdata.datapipes.iter.ShardingRoundRobinDispatcher(source_datapipe: IterDataPipe, sharding_group_filter: Optional[SHARDING_PRIORITIES] = None)¶
Wrapper that indicates the prior section of
DataPipe
graph is non-replicable and will be iterated in a separate, single dispatching process to distribute data to worker processes in a round-robin manner when multiprocessing is being used. (functional name:sharding_round_robin_dispatch
).- Parameters:
source_datapipe – Iterable DataPipe that will be sharded
sharding_group_filter – Optional
SHARDING_PRIORITIES
value
Note
sharding_group_filter
only acceptsSHARDING_PRIORITIES.MULTIPROCESSING
for nowWhen using distributed training, you can add a
sharding_filter()
prior to this DataPipe to distribute samples among worker nodes.
Examples
>>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper >>> from torch.utils.data.datapipes.iter.sharding import SHARDING_PRIORITIES >>> dp = IterableWrapper(range(10)) >>> # `.shuffle()` will be executed in a single dispatching processing, then the samples are distributed >>> # to worker processes >>> dp = dp.shuffle().sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING) >>> # `.map()` will be executed within each worker process >>> dp = dp.map(lambda x: x + 1) >>> # Distributed case: the 10 samples will be distributed among the nodes >>> dp = IterableWrapper(range(10)).sharding_filter() >>> # `.map()` will be executed in a single dispatching processing in each node >>> # You may apply further transformation after within each worker process >>> dp = dp.map(lambda x: x + 1).sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)