Shortcuts

DataLoader2 Tutorial

This is the tutorial for users to create a DataPipe graph and load data via DataLoader2 with different backend systems (ReadingService). An usage example can be found in this colab notebook.

DataPipe

Please refer to DataPipe Tutorial for more details. Here are the most important caveats necessary: to make sure the data pipeline has different order per epoch and data shards are mutually exclusive and collectively exhaustive:

  • Place sharding_filter or sharding_round_robin_dispatch as early as possible in the pipeline to avoid repeating expensive operations in worker/distributed processes.

  • Add a shuffle DataPipe before sharding to achieve inter-shard shuffling. ReadingService will handle synchronization of those shuffle operations to ensure the order of data are the same before sharding so that all shards are mutually exclusive and collectively exhaustive.

Here is an example of a DataPipe graph:

datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)

Multiprocessing

MultiProcessingReadingService handles multiprocessing sharding at the point of sharding_filter and synchronizes the seeds across worker processes.

rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

Distributed

DistributedReadingService handles distributed sharding at the point of sharding_filter and synchronizes the seeds across distributed processes. And, in order to balance the data shards across distributed nodes, a fullsync DataPipe will be attached to the DataPipe graph to align the number of batches across distributed ranks. This would prevent hanging issue caused by uneven shards in distributed training.

rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

Multiprocessing + Distributed

SequentialReadingService can be used to combine both ReadingServices together to achieve multiprocessing and distributed training at the same time.

mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)

dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources