Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.10.0 (Late 2024) they will be deleted. Existing users are advised to pin to torchdata<=0.9.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
Stateful DataLoader¶
StatefulDataLoader is a drop-in replacement for torch.utils.data.DataLoader which offers state_dict
/ load_state_dict
methods for handling mid-epoch checkpointing which operate on the previous/next iterator requested from the dataloader (resp.).
By default, the state includes the number of batches yielded and uses this to naively fast-forward the sampler (map-style) or the dataset (iterable-style). However if the sampler and/or dataset include state_dict
/ load_state_dict
methods, then it will call them during its own state_dict
/ load_state_dict
calls. Under the hood, StatefulDataLoader
handles aggregation and distribution of state across multiprocess workers (but not across ranks).
- class torchdata.stateful_dataloader.StatefulDataLoader(dataset: Dataset[_T_co], batch_size: Optional[int] = 1, shuffle: Optional[bool] = None, sampler: Optional[Union[Sampler, Iterable]] = None, batch_sampler: Optional[Union[Sampler[List], Iterable[List]]] = None, num_workers: int = 0, collate_fn: Optional[Callable[[List[_T]], Any]] = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Optional[Callable[[int], None]] = None, multiprocessing_context=None, generator=None, *, prefetch_factor: Optional[int] = None, persistent_workers: bool = False, pin_memory_device: str = '', snapshot_every_n_steps: Optional[int] = 1)¶
This is a drop in replacement for
torch.utils.data.DataLoader
that implements state_dict and load_state_dict methods, enabling mid-epoch checkpointing.All arguments are identical to
torch.utils.data.DataLoader
, with a new kwarg:snapshot_every_n_steps
.- Parameters:
dataset (Dataset) – dataset from which to load the data.
batch_size (int, optional) – how many samples per batch to load (default:
1
).shuffle (bool, optional) – set to
True
to have the data reshuffled at every epoch (default:False
).sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any
Iterable
with__len__
implemented. If specified,shuffle
must not be specified.batch_sampler (Sampler or Iterable, optional) – like
sampler
, but returns a batch of indices at a time. Mutually exclusive withbatch_size
,shuffle
,sampler
, anddrop_last
.num_workers (int, optional) – how many subprocesses to use for data loading.
0
means that the data will be loaded in the main process. (default:0
)collate_fn (Callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
pin_memory (bool, optional) – If
True
, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or yourcollate_fn
returns a batch that is a custom type, see the example below.drop_last (bool, optional) – set to
True
to drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalse
and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default:False
)timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default:
0
)worker_init_fn (Callable, optional) – If not
None
, this will be called on each worker subprocess with the worker id (an int in[0, num_workers - 1]
) as input, after seeding and before data loading. (default:None
)multiprocessing_context (str or multiprocessing.context.BaseContext, optional) – If
None
, the default multiprocessing context of your operating system will be used. (default:None
)generator (torch.Generator, optional) – If not
None
, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generatebase_seed
for workers. (default:None
)prefetch_factor (int, optional, keyword-only arg) – Number of batches loaded in advance by each worker.
2
means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default isNone
. Otherwise, if value ofnum_workers > 0
default is2
).persistent_workers (bool, optional) – If
True
, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default:False
)pin_memory_device (str, optional) – the device to
pin_memory
to ifpin_memory
isTrue
.snapshot_every_n_steps (int, optional) – Defines how often the state is transferred from the dataloader workers to the dataloader. By default, it is set to
1
, i.e., state is transferred every step. If the state is large, this value can be increased (and ideally set to the frequency of training checkpointing) to reduce the overhead of transferring state every step.
Warning
If the
spawn
start method is used,worker_init_fn
cannot be an unpicklable object, e.g., a lambda function. See multiprocessing-best-practices on more details related to multiprocessing in PyTorch.Warning
len(dataloader)
heuristic is based on the length of the sampler used. Whendataset
is anIterableDataset
, it instead returns an estimate based onlen(dataset) / batch_size
, with proper rounding depending ondrop_last
, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts userdataset
code in correctly handling multi-process loading to avoid duplicate data.However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when
drop_last
is set. Unfortunately, PyTorch can not detect such cases in general.See Dataset Types for more details on these two types of datasets and how
IterableDataset
interacts with Multi-process data loading.Warning
See Reproducibility, and Dataloader-workers-random-seed, and Data-loading-randomness notes for random seed related questions.