Shortcuts

ReadingService

ReadingService handles in-place modification of DataPipe graph based on different use cases.

Features

Dynamic Sharding

Dynamic sharding is achieved by MultiProcessingReadingService and DistributedReadingService to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of DataPipe letting users define the sharding place within the pipeline.

  • sharding_filter (ShardingFilter): When the pipeline is replicable, each distributed/multiprocessing worker loads data from its own replica of the DataPipe graph, while skipping samples that do not belong to the corresponding worker at the point where sharding_filter is placed.

  • sharding_round_robin_dispatch (ShardingRoundRobinDispatcher): When there is any sharding_round_robin_dispatch DataPipe in the pipeline, that branch (i.e. all DataPipes prior to sharding_round_robin_dispatch) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distribute data to the subsequent worker processes.

The following is an example of having two types of sharding strategies in the pipeline.

digraph Example {
    subgraph cluster_replicable {
        label="Replicable"
        a -> b -> c -> d -> l;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
    }

    k -> l -> fullsync -> end;

    a [label="DP1"];
    b [label="shuffle"];
    c [label="sharding_filter", color=blue];
    d [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    l [label="DP6"];
    fullsync;
    end [shape=box];
}

When multiprocessing takes place, the graph becomes:

digraph Example {
    subgraph cluster_worker_0 {
        label="Worker 0"
        a0 -> b0 -> c0 -> d0 -> l0;
        m0 -> l0;
        color=blue;
    }

    subgraph cluster_worker_1 {
        label="Worker 1"
        a1 -> b1 -> c1 -> d1 -> l1;
        m1 -> l1;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
        k -> round_robin_demux;
    }

    round_robin_demux -> m0;
    round_robin_demux -> m1;
    l0 -> n;
    l1 -> n;
    n -> fullsync -> end;

    a0 [label="DP1"];
    b0 [label="shuffle"];
    c0 [label="sharding_filter", color=blue];
    d0 [label="DP4"];
    a1 [label="DP1"];
    b1 [label="shuffle"];
    c1 [label="sharding_filter", color=blue];
    d1 [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    fullsync;
    l0 [label="DP6"];
    l1 [label="DP6"];
    m0 [label="Client"]
    m1 [label="Client"]
    n [label="Client"]
    end [shape=box];
}

Client in the graph is a DataPipe that sends a request and receives a response from multiprocessing queues.

Determinism

In DataLoader2, a SeedGenerator becomes a single source of randomness and each ReadingService would access it via initialize_iteration() and generate corresponding random seeds for random DataPipe operations.

In order to make sure that the Dataset shards are mutually exclusive and collectively exhaustive on multiprocessing processes and distributed nodes, MultiProcessingReadingService and DistributedReadingService would help DataLoader2 to synchronize random states for any random DataPipe operation prior to sharding_filter or sharding_round_robin_dispatch. For the remaining DataPipe operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ReadingService, in order to perform different random transformations.

Graph Mode

This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe graph is created and validated with the ReadingServices, a different ReadingService that configures and connects to the production service/infrastructure such as AIStore can be provided to DataLoader2 as a drop-in replacement. The ReadingService could potentially search the graph, and find DataPipe operations that can be delegated to the production service/infrastructure, then modify the graph correspondingly to achieve higher-performant execution.

Extend ReadingService

The followings are interfaces for custom ReadingService.

class torchdata.dataloader2.ReadingServiceInterface

Interface for ReadingService. Please extend custom ReadingService based on this interface class.

ReadingService must be picklable prior to initialize being called. This is because a copy of it will be created by DataLoader2 to avoid the situation where the same ReadingService object is used by multiple DataLoader2, and its internal state will be modifiable by each of them.

As a result of this constraint, certain initialization steps may need to take place within the initialize method rather than __init__ of the ReadingService class.

finalize() None

ReadingService cleans up internal states and fully shuts down the service. Called in DataLoader2’s shutdown and __del__.

finalize_iteration() None

ReadingService ends service after an epoch is finished. Called when the iterator of DataLoader2 is depleted.

abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

ReadingService takes a DataPipe graph, adapts it into a new DataPipe graph based on the custom need. Called once in creating DataLoader2 iterator at first time. Prior to calling this method, the ReadingService object must be picklable.

Parameters:

datapipe – Original DataPipe graph.

Returns:

An adapted or a new DataPipe graph.

initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]

ReadingService spins up service for an epoch. Called at the beginning of every time getting DataLoader2 iterator.

Parameters:
  • seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will govern the determinism for all of random operations with the graph of DataPipes.

  • iter_reset_fn – Optional reset function from the prior ReadingServcie when SequentialReadingService chains multiple ReadingServices

Returns:

A new iter_reset_fn to be used by subseqeuent ReadingService

Example

MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.

The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):

class torchdata.dataloader2.CheckpointableReadingServiceInterface

Extend ReadingServiceInterface with two additional methods to save/restore the state of the data-processing graph.

abstract checkpoint() bytes

ReadingService serializes the internal states. Called in DataLoader2.state_dict.

abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]

ReadingService adapts DataPipe graph based on the serialized state. Called once in creating DataLoader2 iterator at first time. Counterpart of initialize, which adapt DataPipe graph from scratch.

Parameters:
  • datapipe – original DataPipe graph before adapted by ReadingService

  • serialized_state – The serialized state of internal state used to restore the state of the adapted DataPipe graph.

Returns:

Adapted DataPipe generated from the serialized state.

Graph Functions

And, graph utility functions are provided in torchdata.dataloader.graph to help users to do DataPipe graph rewrite for custom ReadingService:

traverse_dps

Traverse the DataPipes and their attributes to extract the DataPipe graph.

find_dps

Given the graph of DataPipe generated by traverse_dps function, return DataPipe instances with the provided DataPipe type.

list_dps

Given the graph of DataPipe generated by traverse_dps function, return a list of all DataPipe instances without duplication.

remove_dp

Given the graph of DataPipe generated by traverse_dps function and the DataPipe to be removed, return the new graph of DataPipe.

replace_dp

Given the graph of DataPipe generated by traverse_dps function and the DataPipe to be replaced and the new DataPipe, return the new graph of DataPipe.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources