DataLoader2¶
A light-weight DataLoader2
is introduced to decouple the overloaded data-manipulation functionalities from torch.utils.data.DataLoader
to DataPipe
operations. Besides, a certain features can only be achieved with DataLoader2
like snapshotting and switching backend services to perform high-performant operations.
DataLoader2¶
- class torchdata.dataloader2.DataLoader2(datapipe: Union[IterDataPipe, MapDataPipe], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)¶
DataLoader2
is used to optimize and execute the givenDataPipe
graph based onReadingService
andAdapter
functions, with support forDynamic sharding for multi-process and distributed data loading
Multiple backend
ReadingServices
DataPipe
graph in-place modification like shuffle control, memory pinning, etc.Snapshot the state of data-preprocessing pipeline (WIP)
- Parameters:
datapipe (
IterDataPipe
orMapDataPipe
) –DataPipe
from which to load the data. A deepcopy of this will be made during initialization, allowing the input to be re-used in a differentDataLoader2
without sharing states.datapipe_adapter_fn (
Iterable[Adapter]
orAdapter
, optional) –Adapter
function(s) that will be applied to the DataPipe (default:None
).reading_service (ReadingServiceInterface, optional) – defines how
DataLoader2
should execute operations over theDataPipe
, e.g. multiprocessing/distributed (default:None
). A deepcopy of this will be made during initialization, allowing the input to be re-used in a differentDataLoader2
without sharing states.
- __iter__() DataLoader2Iterator[T_co] ¶
Return a singleton iterator from the
DataPipe
graph adapted byReadingService
.DataPipe
will be restored if the serialized state is provided to constructDataLoader2
. And,initialize_iteration
andfinalize_iterator
will be invoked at the beginning and end of the iteration correspondingly.
- classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co] ¶
Create new
DataLoader2
withDataPipe
graph andReadingService
restored from the serialized state.
- load_state_dict(state: Dict[str, Any]) None ¶
For the existing
DataLoader2
, load serialized state to restoreDataPipe
graph and reset the internal state ofReadingService
.
- shutdown() None ¶
Shuts down
ReadingService
and clean up iterator.
- state_dict() Dict[str, Any] ¶
Return a dictionary to represent the state of data-processing pipeline with keys:
serialized_datapipe
:SerializedDataPipe
beforeReadingService
adaption.reading_service_state
: The state ofReadingService
and adaptedDataPipe
.
Note:
DataLoader2
doesn’t support torch.utils.data.Dataset
or torch.utils.data.IterableDataset
. Please wrap each of them with the corresponding DataPipe
below:
torchdata.datapipes.map.SequenceWrapper
:torch.utils.data.Dataset
torchdata.datapipes.iter.IterableWrapper
:torch.utils.data.IterableDataset
ReadingService¶
ReadingService
specifies the execution backend for the data-processing graph. There are three types of ReadingServices
in TorchData:
|
|
|
|
|
Each ReadingServices
would take the DataPipe
graph and modify it to achieve a few features like dynamic sharding, sharing random seeds and snapshoting for multi-/distributed processes.
This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe
graph is created and validated with the ReadingServices
, a different ReadingService
that configures and connects to the production service/infra such as AIStore
can be provided to DataLoader2
as a drop-in replacement. The ReadingService
could potentially search the graph, and find DataPipe
operations that can be delegated to the production service/infra, then modify the graph correspondingly to achieve higher-performant execution.
The followings are interfaces for custom ReadingService
.
- class torchdata.dataloader2.ReadingServiceInterface¶
Interface for
ReadingService
. Please extend customReadingService
based on this interface class.- finalize() None ¶
ReadingService
cleans up internal states and fully shuts down the service. Called inDataLoader2
’sshutdown
and__del__
.
- finalize_iteration() None ¶
ReadingService
ends service after an epoch is finished. Called when the iterator ofDataLoader2
is depleted.
- abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
takes aDataPipe
graph, adapts it into a newDataPipe
graph based on the custom need. Called once in creatingDataLoader2
iterator at first time.- Parameters:
datapipe – Original
DataPipe
graph.- Returns:
An adapted or a new
DataPipe
graph.
- initialize_iteration() None ¶
ReadingService
spins up service for an epoch. Called at the beginning of every time gettingDataLoader2
iterator.
The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
Extend
ReadingServiceInterface
with two additional methods to save/restore the state of the data-processing graph.- abstract checkpoint() bytes ¶
ReadingService
serializes the internal states. Called inDataLoader2.state_dict
.
- abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
adaptsDataPipe
graph based on the serialized state. Called once in creatingDataLoader2
iterator at first time. Counterpart ofinitialize
, which adaptDataPipe
graph from scratch.- Parameters:
datapipe – original
DataPipe
graph before adapted byReadingService
serialized_state – The serialized state of internal state used to restore the state of the adapted
DataPipe
graph.
- Returns:
Adapted
DataPipe
generated from the serialized state.
And, graph utility functions are provided in torchdata.dataloader.graph
to help users to define their own ReadingService
and modify the graph:
Traverse the DataPipes and their attributes to extract the DataPipe graph. |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
Adapter¶
Adapter
is used to configure, modify and extend the DataPipe
graph in DataLoader2
. It allows in-place
modification or replace the pre-assembled DataPipe
graph provided by PyTorch domains. For example, Shuffle(False)
can be
provided to DataLoader2
, which would disable any shuffle
operations in the DataPipes
graph.
- class torchdata.dataloader2.adapter.Adapter¶
Adapter Base Class that follows python Callable protocol.
- abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
Callable function that either runs in-place modification of the
DataPipe
graph, or returns a newDataPipe
graph.- Parameters:
datapipe –
DataPipe
that needs to be adapted.- Returns:
Adapted
DataPipe
or newDataPipe
.
Here are the list of Adapter
provided by TorchData in torchdata.dataloader2.adapter
:
Shuffle DataPipes adapter allows control over all existing Shuffler ( |
|
CacheTimeout DataPipes adapter allows control over timeouts of all existing EndOnDiskCacheHolder ( |
And, we will provide more Adapters
to cover data-processing options:
PinMemory
: Attach aDataPipe
at the end of the data-processing graph that coverts output data totorch.Tensor
in pinned memory.FullSync
: Attach aDataPipe
to make sure the data-processing graph synchronized between distributed processes to prevent hanging.ShardingPolicy
: Modify sharding policy ifsharding_filter
is presented in theDataPipe
graph.PrefetchPolicy
,InvalidateCache
, etc.
If you have feature requests about the Adapters
you’d like to be provided, please open a GitHub issue. For specific
needs, DataLoader2
also accepts any custom Adapter
as long as it inherits from the Adapter
class.