:tocdepth: 3 .. currentmodule:: torchdata.datapipes.iter ReadingService =============== ``ReadingService`` handles in-place modification of ``DataPipe`` graph based on different use cases. Features --------- Dynamic Sharding ^^^^^^^^^^^^^^^^ Dynamic sharding is achieved by ``MultiProcessingReadingService`` and ``DistributedReadingService`` to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of ``DataPipe`` letting users define the sharding place within the pipeline. - ``sharding_filter`` (:class:`ShardingFilter`): When the pipeline is replicable, each distributed/multiprocessing worker loads data from its own replica of the ``DataPipe`` graph, while skipping samples that do not belong to the corresponding worker at the point where ``sharding_filter`` is placed. - ``sharding_round_robin_dispatch`` (:class:`ShardingRoundRobinDispatcher`): When there is any ``sharding_round_robin_dispatch`` ``DataPipe`` in the pipeline, that branch (i.e. all DataPipes prior to ``sharding_round_robin_dispatch``) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distribute data to the subsequent worker processes. The following is an example of having two types of sharding strategies in the pipeline. .. graphviz:: digraph Example { subgraph cluster_replicable { label="Replicable" a -> b -> c -> d -> l; color=blue; } subgraph cluster_non_replicable { style=filled; color=lightgrey; node [style=filled,color=white]; label="Non-Replicable" e -> f -> g -> k; h -> i -> j -> k; } k -> l -> fullsync -> end; a [label="DP1"]; b [label="shuffle"]; c [label="sharding_filter", color=blue]; d [label="DP4"]; e [label="DP2"]; f [label="shuffle"]; g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white]; h [label="DP3"]; i [label="shuffle"]; j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white]; k [label="DP5 (Lowest common ancestor)"]; l [label="DP6"]; fullsync; end [shape=box]; } When multiprocessing takes place, the graph becomes: .. graphviz:: digraph Example { subgraph cluster_worker_0 { label="Worker 0" a0 -> b0 -> c0 -> d0 -> l0; m0 -> l0; color=blue; } subgraph cluster_worker_1 { label="Worker 1" a1 -> b1 -> c1 -> d1 -> l1; m1 -> l1; color=blue; } subgraph cluster_non_replicable { style=filled; color=lightgrey; node [style=filled,color=white]; label="Non-Replicable" e -> f -> g -> k; h -> i -> j -> k; k -> round_robin_demux; } round_robin_demux -> m0; round_robin_demux -> m1; l0 -> n; l1 -> n; n -> fullsync -> end; a0 [label="DP1"]; b0 [label="shuffle"]; c0 [label="sharding_filter", color=blue]; d0 [label="DP4"]; a1 [label="DP1"]; b1 [label="shuffle"]; c1 [label="sharding_filter", color=blue]; d1 [label="DP4"]; e [label="DP2"]; f [label="shuffle"]; g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white]; h [label="DP3"]; i [label="shuffle"]; j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white]; k [label="DP5 (Lowest common ancestor)"]; fullsync; l0 [label="DP6"]; l1 [label="DP6"]; m0 [label="Client"] m1 [label="Client"] n [label="Client"] end [shape=box]; } ``Client`` in the graph is a ``DataPipe`` that sends a request and receives a response from multiprocessing queues. .. module:: torchdata.dataloader2 Determinism ^^^^^^^^^^^^ In ``DataLoader2``, a ``SeedGenerator`` becomes a single source of randomness and each ``ReadingService`` would access it via ``initialize_iteration()`` and generate corresponding random seeds for random ``DataPipe`` operations. In order to make sure that the Dataset shards are mutually exclusive and collectively exhaustive on multiprocessing processes and distributed nodes, ``MultiProcessingReadingService`` and ``DistributedReadingService`` would help :class:`DataLoader2` to synchronize random states for any random ``DataPipe`` operation prior to ``sharding_filter`` or ``sharding_round_robin_dispatch``. For the remaining ``DataPipe`` operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ``ReadingService``, in order to perform different random transformations. Graph Mode ^^^^^^^^^^^ This also allows easier transition of data-preprocessing pipeline from research to production. After the ``DataPipe`` graph is created and validated with the ``ReadingServices``, a different ``ReadingService`` that configures and connects to the production service/infrastructure such as ``AIStore`` can be provided to :class:`DataLoader2` as a drop-in replacement. The ``ReadingService`` could potentially search the graph, and find ``DataPipe`` operations that can be delegated to the production service/infrastructure, then modify the graph correspondingly to achieve higher-performant execution. Extend ReadingService ---------------------- The followings are interfaces for custom ``ReadingService``. .. autoclass:: ReadingServiceInterface :members: The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely): .. autoclass:: CheckpointableReadingServiceInterface :members: Graph Functions ^^^^^^^^^^^^^^^^ And, graph utility functions are provided in ``torchdata.dataloader.graph`` to help users to do ``DataPipe`` graph rewrite for custom ``ReadingService``: .. module:: torchdata.dataloader2.graph .. autosummary:: :nosignatures: :toctree: generated/ :template: function.rst traverse_dps find_dps list_dps remove_dp replace_dp