DataLoader2 Tutorial ===================== This is the tutorial for users to create a ``DataPipe`` graph and load data via ``DataLoader2`` with different backend systems (``ReadingService``). An usage example can be found in `this colab notebook `_. DataPipe --------- Please refer to `DataPipe Tutorial `_ for more details. Here are the most important caveats necessary: to make sure the data pipeline has different order per epoch and data shards are mutually exclusive and collectively exhaustive: - Place ``sharding_filter`` or ``sharding_round_robin_dispatch`` as early as possible in the pipeline to avoid repeating expensive operations in worker/distributed processes. - Add a ``shuffle`` DataPipe before sharding to achieve inter-shard shuffling. ``ReadingService`` will handle synchronization of those ``shuffle`` operations to ensure the order of data are the same before sharding so that all shards are mutually exclusive and collectively exhaustive. Here is an example of a ``DataPipe`` graph: .. code:: python datapipe = IterableWrapper(["./train1.csv", "./train2.csv"]) datapipe = datapipe.open_files(encoding="utf-8").parse_csv() datapipe = datapipe.shuffle().sharding_filter() datapipe = datapipe.map(fn).batch(8) Multiprocessing ---------------- ``MultiProcessingReadingService`` handles multiprocessing sharding at the point of ``sharding_filter`` and synchronizes the seeds across worker processes. .. code:: python rs = MultiProcessingReadingService(num_workers=4) dl = DataLoader2(datapipe, reading_service=rs) for epoch in range(10): dl.seed(epoch) for d in dl: model(d) dl.shutdown() Distributed ------------ ``DistributedReadingService`` handles distributed sharding at the point of ``sharding_filter`` and synchronizes the seeds across distributed processes. And, in order to balance the data shards across distributed nodes, a ``fullsync`` ``DataPipe`` will be attached to the ``DataPipe`` graph to align the number of batches across distributed ranks. This would prevent hanging issue caused by uneven shards in distributed training. .. code:: python rs = DistributedReadingService() dl = DataLoader2(datapipe, reading_service=rs) for epoch in range(10): dl.seed(epoch) for d in dl: model(d) dl.shutdown() Multiprocessing + Distributed ------------------------------ ``SequentialReadingService`` can be used to combine both ``ReadingServices`` together to achieve multiprocessing and distributed training at the same time. .. code:: python mp_rs = MultiProcessingReadingService(num_workers=4) dist_rs = DistributedReadingService() rs = SequentialReadingService(dist_rs, mp_rs) dl = DataLoader2(datapipe, reading_service=rs) for epoch in range(10): dl.seed(epoch) for d in dl: model(d) dl.shutdown()