DataLoader2 Tutorial¶
This is the tutorial for users to create a DataPipe
graph and load data via DataLoader2
with different backend systems (ReadingService
). An usage example can be found in this colab notebook.
DataPipe¶
Please refer to DataPipe Tutorial for more details. Here are the most important caveats necessary: to make sure the data pipeline has different order per epoch and data shards are mutually exclusive and collectively exhaustive:
Place
sharding_filter
orsharding_round_robin_dispatch
as early as possible in the pipeline to avoid repeating expensive operations in worker/distributed processes.Add a
shuffle
DataPipe before sharding to achieve inter-shard shuffling.ReadingService
will handle synchronization of thoseshuffle
operations to ensure the order of data are the same before sharding so that all shards are mutually exclusive and collectively exhaustive.
Here is an example of a DataPipe
graph:
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)
Multiprocessing¶
MultiProcessingReadingService
handles multiprocessing sharding at the point of sharding_filter
and synchronizes the seeds across worker processes.
rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
Distributed¶
DistributedReadingService
handles distributed sharding at the point of sharding_filter
and synchronizes the seeds across distributed processes. And, in order to balance the data shards across distributed nodes, a fullsync
DataPipe
will be attached to the DataPipe
graph to align the number of batches across distributed ranks. This would prevent hanging issue caused by uneven shards in distributed training.
rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
Multiprocessing + Distributed¶
SequentialReadingService
can be used to combine both ReadingServices
together to achieve multiprocessing and distributed training at the same time.
mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()