Shortcuts

Attention

June 2024 Status Update: Removing DataPipes and DataLoader V2

We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)

DataLoader2

A new, light-weight DataLoader2 is introduced to decouple the overloaded data-manipulation functionalities from torch.utils.data.DataLoader to DataPipe operations. Besides, certain features can only be achieved with DataLoader2 like snapshotting and switching backend services to perform high-performant operations.

DataLoader2

class torchdata.dataloader2.DataLoader2(datapipe: Optional[Union[IterDataPipe, MapDataPipe]], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)

DataLoader2 is used to optimize and execute the given DataPipe graph based on ReadingService and Adapter functions, with support for

  • Dynamic sharding for multiprocess and distributed data loading

  • Multiple backend ReadingServices

  • DataPipe graph in-place modification like shuffle control, memory pinning, etc.

  • Snapshot the state of data-preprocessing pipeline (WIP)

Parameters:
  • datapipe (IterDataPipe or MapDataPipe) – DataPipe from which to load the data. A deepcopy of this datapipe will be made during initialization, allowing the input to be re-used in a different DataLoader2 without sharing states. Input None can only be used if load_state_dict is called right after the creation of the DataLoader.

  • datapipe_adapter_fn (Iterable[Adapter] or Adapter, optional) – Adapter function(s) that will be applied to the DataPipe (default: None).

  • reading_service (ReadingServiceInterface, optional) – defines how DataLoader2 should execute operations over the DataPipe, e.g. multiprocessing/distributed (default: None). A deepcopy of this will be created during initialization, allowing the ReadingService to be re-used in a different DataLoader2 without sharing states.

Note

When a MapDataPipe is passed into DataLoader2, in order to iterate through the data, DataLoader2 will attempt to create an iterator via iter(datapipe). If the object has a non-zero-indexed indices, this may fail. Consider using .shuffle() (which converts MapDataPipe to IterDataPipe) or datapipe.to_iter_datapipe(custom_indices).

__iter__() DataLoader2Iterator[T_co]

Return a singleton iterator from the DataPipe graph adapted by ReadingService. DataPipe will be restored if the serialized state is provided to construct DataLoader2. And, initialize_iteration and finalize_iterator will be invoked at the beginning and end of the iteration correspondingly.

classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co]

Create new DataLoader2 with DataPipe graph and ReadingService restored from the serialized state.

load_state_dict(state_dict: Dict[str, Any]) None

For the existing DataLoader2, load serialized state to restore DataPipe graph and reset the internal state of ReadingService.

seed(seed: int) None

Set random seed for DataLoader2 to control determinism.

Parameters:

seed – Random uint64 seed

shutdown() None

Shuts down ReadingService and clean up iterator.

state_dict() Dict[str, Any]

Return a dictionary to represent the state of data-processing pipeline with keys:

  • serialized_datapipe:Serialized DataPipe before ReadingService adaption.

  • reading_service_state: The state of ReadingService and adapted DataPipe.

Note: DataLoader2 doesn’t support torch.utils.data.Dataset or torch.utils.data.IterableDataset. Please wrap each of them with the corresponding DataPipe below:

ReadingService

ReadingService specifies the execution backend for the data-processing graph. There are three types of ReadingServices provided in TorchData:

DistributedReadingService

DistributedReadingSerivce handles distributed sharding on the graph of DataPipe and guarantee the randomness by sharing the same seed across the distributed processes.

InProcessReadingService

Default ReadingService to serve the DataPipe graph in the main process, and apply graph settings like determinism control to the graph.

MultiProcessingReadingService

Spawns multiple worker processes to load data from the DataPipe graph.

SequentialReadingService

Each ReadingServices would take the DataPipe graph and rewrite it to achieve a few features like dynamic sharding, sharing random seeds and snapshoting for multi-/distributed processes. For more detail about those features, please refer to the documentation.

Adapter

Adapter is used to configure, modify and extend the DataPipe graph in DataLoader2. It allows in-place modification or replace the pre-assembled DataPipe graph provided by PyTorch domains. For example, Shuffle(False) can be provided to DataLoader2, which would disable any shuffle operations in the DataPipes graph.

class torchdata.dataloader2.adapter.Adapter

Adapter Base Class that follows python Callable protocol.

abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

Callable function that either runs in-place modification of the DataPipe graph, or returns a new DataPipe graph.

Parameters:

datapipeDataPipe that needs to be adapted.

Returns:

Adapted DataPipe or new DataPipe.

Here are the list of Adapter provided by TorchData in torchdata.dataloader2.adapter:

Shuffle

Shuffle DataPipes adapter allows control over all existing Shuffler (shuffle) DataPipes in the graph.

CacheTimeout

CacheTimeout DataPipes adapter allows control over timeouts of all existing EndOnDiskCacheHolder (end_caching) in the graph.

And, we will provide more Adapters to cover data-processing options:

  • PinMemory: Attach a DataPipe at the end of the data-processing graph that coverts output data to torch.Tensor in pinned memory.

  • FullSync: Attach a DataPipe to make sure the data-processing graph synchronized between distributed processes to prevent hanging.

  • ShardingPolicy: Modify sharding policy if sharding_filter is presented in the DataPipe graph.

  • PrefetchPolicy, InvalidateCache, etc.

If you have feature requests about the Adapters you’d like to be provided, please open a GitHub issue. For specific needs, DataLoader2 also accepts any custom Adapter as long as it inherits from the Adapter class.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources