Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
DataLoader2¶
A new, light-weight DataLoader2
is introduced to decouple the overloaded data-manipulation functionalities from torch.utils.data.DataLoader
to DataPipe
operations. Besides, certain features can only be achieved with DataLoader2
like snapshotting and switching backend services to perform high-performant operations.
DataLoader2¶
- class torchdata.dataloader2.DataLoader2(datapipe: Optional[Union[IterDataPipe, MapDataPipe]], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)¶
DataLoader2
is used to optimize and execute the givenDataPipe
graph based onReadingService
andAdapter
functions, with support forDynamic sharding for multiprocess and distributed data loading
Multiple backend
ReadingServices
DataPipe
graph in-place modification like shuffle control, memory pinning, etc.Snapshot the state of data-preprocessing pipeline (WIP)
- Parameters:
datapipe (
IterDataPipe
orMapDataPipe
) –DataPipe
from which to load the data. A deepcopy of this datapipe will be made during initialization, allowing the input to be re-used in a differentDataLoader2
without sharing states. InputNone
can only be used ifload_state_dict
is called right after the creation of the DataLoader.datapipe_adapter_fn (
Iterable[Adapter]
orAdapter
, optional) –Adapter
function(s) that will be applied to the DataPipe (default:None
).reading_service (ReadingServiceInterface, optional) – defines how
DataLoader2
should execute operations over theDataPipe
, e.g. multiprocessing/distributed (default:None
). A deepcopy of this will be created during initialization, allowing the ReadingService to be re-used in a differentDataLoader2
without sharing states.
Note
When a
MapDataPipe
is passed intoDataLoader2
, in order to iterate through the data,DataLoader2
will attempt to create an iterator viaiter(datapipe)
. If the object has a non-zero-indexed indices, this may fail. Consider using.shuffle()
(which convertsMapDataPipe
toIterDataPipe
) ordatapipe.to_iter_datapipe(custom_indices)
.- __iter__() DataLoader2Iterator[T_co] ¶
Return a singleton iterator from the
DataPipe
graph adapted byReadingService
.DataPipe
will be restored if the serialized state is provided to constructDataLoader2
. And,initialize_iteration
andfinalize_iterator
will be invoked at the beginning and end of the iteration correspondingly.
- classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co] ¶
Create new
DataLoader2
withDataPipe
graph andReadingService
restored from the serialized state.
- load_state_dict(state_dict: Dict[str, Any]) None ¶
For the existing
DataLoader2
, load serialized state to restoreDataPipe
graph and reset the internal state ofReadingService
.
- seed(seed: int) None ¶
Set random seed for DataLoader2 to control determinism.
- Parameters:
seed – Random uint64 seed
- shutdown() None ¶
Shuts down
ReadingService
and clean up iterator.
- state_dict() Dict[str, Any] ¶
Return a dictionary to represent the state of data-processing pipeline with keys:
serialized_datapipe
:SerializedDataPipe
beforeReadingService
adaption.reading_service_state
: The state ofReadingService
and adaptedDataPipe
.
Note:
DataLoader2
doesn’t support torch.utils.data.Dataset
or torch.utils.data.IterableDataset
. Please wrap each of them with the corresponding DataPipe
below:
torchdata.datapipes.map.SequenceWrapper
:torch.utils.data.Dataset
torchdata.datapipes.iter.IterableWrapper
:torch.utils.data.IterableDataset
ReadingService¶
ReadingService
specifies the execution backend for the data-processing graph. There are three types of ReadingServices
provided in TorchData:
|
|
Default ReadingService to serve the |
|
Spawns multiple worker processes to load data from the |
|
Each ReadingServices
would take the DataPipe
graph and rewrite it to achieve a few features like dynamic sharding, sharing random seeds and snapshoting for multi-/distributed processes. For more detail about those features, please refer to the documentation.
Adapter¶
Adapter
is used to configure, modify and extend the DataPipe
graph in DataLoader2
. It allows in-place
modification or replace the pre-assembled DataPipe
graph provided by PyTorch domains. For example, Shuffle(False)
can be
provided to DataLoader2
, which would disable any shuffle
operations in the DataPipes
graph.
- class torchdata.dataloader2.adapter.Adapter¶
Adapter Base Class that follows python Callable protocol.
- abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
Callable function that either runs in-place modification of the
DataPipe
graph, or returns a newDataPipe
graph.- Parameters:
datapipe –
DataPipe
that needs to be adapted.- Returns:
Adapted
DataPipe
or newDataPipe
.
Here are the list of Adapter
provided by TorchData in torchdata.dataloader2.adapter
:
Shuffle DataPipes adapter allows control over all existing Shuffler ( |
|
CacheTimeout DataPipes adapter allows control over timeouts of all existing EndOnDiskCacheHolder ( |
And, we will provide more Adapters
to cover data-processing options:
PinMemory
: Attach aDataPipe
at the end of the data-processing graph that coverts output data totorch.Tensor
in pinned memory.FullSync
: Attach aDataPipe
to make sure the data-processing graph synchronized between distributed processes to prevent hanging.ShardingPolicy
: Modify sharding policy ifsharding_filter
is presented in theDataPipe
graph.PrefetchPolicy
,InvalidateCache
, etc.
If you have feature requests about the Adapters
you’d like to be provided, please open a GitHub issue. For specific
needs, DataLoader2
also accepts any custom Adapter
as long as it inherits from the Adapter
class.