Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
ReadingService¶
ReadingService
handles in-place modification of DataPipe
graph based on different use cases.
Features¶
Dynamic Sharding¶
Dynamic sharding is achieved by MultiProcessingReadingService
and DistributedReadingService
to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of DataPipe
letting users define the sharding place within the pipeline.
sharding_filter
(ShardingFilter
): When the pipeline is replicable, each distributed/multiprocessing worker loads data from its own replica of theDataPipe
graph, while skipping samples that do not belong to the corresponding worker at the point wheresharding_filter
is placed.sharding_round_robin_dispatch
(ShardingRoundRobinDispatcher
): When there is anysharding_round_robin_dispatch
DataPipe
in the pipeline, that branch (i.e. all DataPipes prior tosharding_round_robin_dispatch
) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distribute data to the subsequent worker processes.
The following is an example of having two types of sharding strategies in the pipeline.
When multiprocessing takes place, the graph becomes:
Client
in the graph is a DataPipe
that sends a request and receives a response from multiprocessing queues.
Determinism¶
In DataLoader2
, a SeedGenerator
becomes a single source of randomness and each ReadingService
would access it via initialize_iteration()
and generate corresponding random seeds for random DataPipe
operations.
In order to make sure that the Dataset shards are mutually exclusive and collectively exhaustive on multiprocessing processes and distributed nodes, MultiProcessingReadingService
and DistributedReadingService
would help DataLoader2
to synchronize random states for any random DataPipe
operation prior to sharding_filter
or sharding_round_robin_dispatch
. For the remaining DataPipe
operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ReadingService
, in order to perform different random transformations.
Graph Mode¶
This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe
graph is created and validated with the ReadingServices
, a different ReadingService
that configures and connects to the production service/infrastructure such as AIStore
can be provided to DataLoader2
as a drop-in replacement. The ReadingService
could potentially search the graph, and find DataPipe
operations that can be delegated to the production service/infrastructure, then modify the graph correspondingly to achieve higher-performant execution.
Extend ReadingService¶
The followings are interfaces for custom ReadingService
.
- class torchdata.dataloader2.ReadingServiceInterface¶
Interface for
ReadingService
. Please extend customReadingService
based on this interface class.ReadingService must be picklable prior to
initialize
being called. This is because a copy of it will be created byDataLoader2
to avoid the situation where the same ReadingService object is used by multipleDataLoader2
, and its internal state will be modifiable by each of them.As a result of this constraint, certain initialization steps may need to take place within the
initialize
method rather than__init__
of the ReadingService class.- finalize() None ¶
ReadingService
cleans up internal states and fully shuts down the service. Called inDataLoader2
’sshutdown
and__del__
.
- finalize_iteration() None ¶
ReadingService
ends service after an epoch is finished. Called when the iterator ofDataLoader2
is depleted.
- abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
takes aDataPipe
graph, adapts it into a newDataPipe
graph based on the custom need. Called once in creatingDataLoader2
iterator at first time. Prior to calling this method, theReadingService
object must be picklable.- Parameters:
datapipe – Original
DataPipe
graph.- Returns:
An adapted or a new
DataPipe
graph.
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
spins up service for an epoch. Called at the beginning of every time gettingDataLoader2
iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will govern the determinism for all of random operations with the graph of DataPipes.
iter_reset_fn – Optional reset function from the prior
ReadingServcie
whenSequentialReadingService
chains multipleReadingServices
- Returns:
A new
iter_reset_fn
to be used by subseqeuentReadingService
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.
The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
Extend
ReadingServiceInterface
with two additional methods to save/restore the state of the data-processing graph.- abstract checkpoint() bytes ¶
ReadingService
serializes the internal states. Called inDataLoader2.state_dict
.
- abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
adaptsDataPipe
graph based on the serialized state. Called once in creatingDataLoader2
iterator at first time. Counterpart ofinitialize
, which adaptDataPipe
graph from scratch.- Parameters:
datapipe – original
DataPipe
graph before adapted byReadingService
serialized_state – The serialized state of internal state used to restore the state of the adapted
DataPipe
graph.
- Returns:
Adapted
DataPipe
generated from the serialized state.
Graph Functions¶
And, graph utility functions are provided in torchdata.dataloader.graph
to help users to do DataPipe
graph rewrite for custom ReadingService
:
Traverse the DataPipes and their attributes to extract the DataPipe graph. |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |