Shortcuts

DataLoader2

A light-weight DataLoader2 is introduced to decouple the overloaded data-manipulation functionalities from torch.utils.data.DataLoader to DataPipe operations. Besides, a certain features can only be achieved with DataLoader2 like snapshotting and switching backend services to perform high-performant operations.

DataLoader2

class torchdata.dataloader2.DataLoader2(datapipe: Union[IterDataPipe, MapDataPipe], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)

DataLoader2 is used to optimize and execute the given DataPipe graph based on ReadingService and Adapter functions, with support for

  • Dynamic sharding for multi-process and distributed data loading

  • Multiple backend ReadingServices

  • DataPipe graph in-place modification like shuffle control, memory pinning, etc.

  • Snapshot the state of data-preprocessing pipeline (WIP)

Parameters:
  • datapipe (IterDataPipe or MapDataPipe) – DataPipe from which to load the data. A deepcopy of this will be made during initialization, allowing the input to be re-used in a different DataLoader2 without sharing states.

  • datapipe_adapter_fn (Iterable[Adapter] or Adapter, optional) – Adapter function(s) that will be applied to the DataPipe (default: None).

  • reading_service (ReadingServiceInterface, optional) – defines how DataLoader2 should execute operations over the DataPipe, e.g. multiprocessing/distributed (default: None). A deepcopy of this will be made during initialization, allowing the input to be re-used in a different DataLoader2 without sharing states.

__iter__() DataLoader2Iterator[T_co]

Return a singleton iterator from the DataPipe graph adapted by ReadingService. DataPipe will be restored if the serialized state is provided to construct DataLoader2. And, initialize_iteration and finalize_iterator will be invoked at the beginning and end of the iteration correspondingly.

classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co]

Create new DataLoader2 with DataPipe graph and ReadingService restored from the serialized state.

load_state_dict(state: Dict[str, Any]) None

For the existing DataLoader2, load serialized state to restore DataPipe graph and reset the internal state of ReadingService.

shutdown() None

Shuts down ReadingService and clean up iterator.

state_dict() Dict[str, Any]

Return a dictionary to represent the state of data-processing pipeline with keys:

  • serialized_datapipe:Serialized DataPipe before ReadingService adaption.

  • reading_service_state: The state of ReadingService and adapted DataPipe.

Note: DataLoader2 doesn’t support torch.utils.data.Dataset or torch.utils.data.IterableDataset. Please wrap each of them with the corresponding DataPipe below:

ReadingService

ReadingService specifies the execution backend for the data-processing graph. There are three types of ReadingServices in TorchData:

DistributedReadingService

DistributedReadingSerivce handles distributed sharding on the graph of DataPipe and guarantee the randomness by sharing the same seed across the distributed processes.

MultiProcessingReadingService

MultiProcessingReadingService that utilizes torch.utils.data.DataLoader to launch subprocesses for DataPipe graph.

PrototypeMultiProcessingReadingService

PrototypeMultiProcessingReadingService that spawns multiple subprocesses to iterate the DataPipe graph.

Each ReadingServices would take the DataPipe graph and modify it to achieve a few features like dynamic sharding, sharing random seeds and snapshoting for multi-/distributed processes.

This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe graph is created and validated with the ReadingServices, a different ReadingService that configures and connects to the production service/infra such as AIStore can be provided to DataLoader2 as a drop-in replacement. The ReadingService could potentially search the graph, and find DataPipe operations that can be delegated to the production service/infra, then modify the graph correspondingly to achieve higher-performant execution.

The followings are interfaces for custom ReadingService.

class torchdata.dataloader2.ReadingServiceInterface

Interface for ReadingService. Please extend custom ReadingService based on this interface class.

finalize() None

ReadingService cleans up internal states and fully shuts down the service. Called in DataLoader2’s shutdown and __del__.

finalize_iteration() None

ReadingService ends service after an epoch is finished. Called when the iterator of DataLoader2 is depleted.

abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

ReadingService takes a DataPipe graph, adapts it into a new DataPipe graph based on the custom need. Called once in creating DataLoader2 iterator at first time.

Parameters:

datapipe – Original DataPipe graph.

Returns:

An adapted or a new DataPipe graph.

initialize_iteration() None

ReadingService spins up service for an epoch. Called at the beginning of every time getting DataLoader2 iterator.

The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):

class torchdata.dataloader2.CheckpointableReadingServiceInterface

Extend ReadingServiceInterface with two additional methods to save/restore the state of the data-processing graph.

abstract checkpoint() bytes

ReadingService serializes the internal states. Called in DataLoader2.state_dict.

abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]

ReadingService adapts DataPipe graph based on the serialized state. Called once in creating DataLoader2 iterator at first time. Counterpart of initialize, which adapt DataPipe graph from scratch.

Parameters:
  • datapipe – original DataPipe graph before adapted by ReadingService

  • serialized_state – The serialized state of internal state used to restore the state of the adapted DataPipe graph.

Returns:

Adapted DataPipe generated from the serialized state.

And, graph utility functions are provided in torchdata.dataloader.graph to help users to define their own ReadingService and modify the graph:

traverse_dps

Traverse the DataPipes and their attributes to extract the DataPipe graph.

find_dps

Given the graph of DataPipe generated by traverse_dps function, return DataPipe instances with the provided DataPipe type.

replace_dp

Given the graph of DataPipe generated by traverse_dps function and the DataPipe to be replaced and the new DataPipe, return the new graph of DataPipe.

remove_dp

Given the graph of DataPipe generated by traverse_dps function and the DataPipe to be removed, return the new graph of DataPipe.

Adapter

Adapter is used to configure, modify and extend the DataPipe graph in DataLoader2. It allows in-place modification or replace the pre-assembled DataPipe graph provided by PyTorch domains. For example, Shuffle(False) can be provided to DataLoader2, which would disable any shuffle operations in the DataPipes graph.

class torchdata.dataloader2.adapter.Adapter

Adapter Base Class that follows python Callable protocol.

abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

Callable function that either runs in-place modification of the DataPipe graph, or returns a new DataPipe graph.

Parameters:

datapipeDataPipe that needs to be adapted.

Returns:

Adapted DataPipe or new DataPipe.

Here are the list of Adapter provided by TorchData in torchdata.dataloader2.adapter:

Shuffle

Shuffle DataPipes adapter allows control over all existing Shuffler (shuffle) DataPipes in the graph.

CacheTimeout

CacheTimeout DataPipes adapter allows control over timeouts of all existing EndOnDiskCacheHolder (end_caching) in the graph.

And, we will provide more Adapters to cover data-processing options:

  • PinMemory: Attach a DataPipe at the end of the data-processing graph that coverts output data to torch.Tensor in pinned memory.

  • FullSync: Attach a DataPipe to make sure the data-processing graph synchronized between distributed processes to prevent hanging.

  • ShardingPolicy: Modify sharding policy if sharding_filter is presented in the DataPipe graph.

  • PrefetchPolicy, InvalidateCache, etc.

If you have feature requests about the Adapters you’d like to be provided, please open a GitHub issue. For specific needs, DataLoader2 also accepts any custom Adapter as long as it inherits from the Adapter class.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources