Iterable-style DataPipes¶
An iterable-style dataset is an instance of a subclass of IterableDataset that implements the __iter__()
protocol,
and represents an iterable over data samples. This type of datasets is particularly suitable for cases where random
reads are expensive or even improbable, and where the batch size depends on the fetched data.
For example, such a dataset, when called iter(iterdatapipe)
, could return a stream of data reading from a database,
a remote server, or even logs generated in real time.
This is an updated version of IterableDataset
in torch
.
- class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)¶
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory.
IterDataPipe
is lazily initialized and its elements are computed only whennext()
is called on the iterator of anIterDataPipe
.All subclasses should overwrite
__iter__()
, which would return an iterator of samples in this DataPipe. Calling__iter__
of anIterDataPipe
automatically invokes its methodreset()
, which by default performs no operation. When writing a customIterDataPipe
, users should overridereset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the customIterDataPipe
.Note
Only one iterator can be valid for each
IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because someIterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue.These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing
IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.Note
When a subclass is used with
DataLoader
, each item in the DataPipe will be yielded from theDataLoader
iterator. Whennum_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers.get_worker_info()
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s__iter__()
method or theDataLoader
‘sworker_init_fn
option to modify each copy’s behavior.Examples
- General Usage:
>>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10]
- Single Iterator Constraint Example:
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> source_dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates `it1` >>> next(it2) 0 >>> next(it1) # Further usage of `it1` will raise a `RunTimeError`
We have different types of Iterable DataPipes:
Archive - open and decompress archive files of different formats.
Augmenting - augment your samples (e.g. adding index, or cycle through indefinitely).
Combinatorial - perform combinatorial operations (e.g. sampling, shuffling).
Combining/Splitting - interact with multiple DataPipes by combining them or splitting one to many.
Grouping - group samples within a DataPipe
IO - interacting with the file systems or remote server (e.g. downloading, opening, saving files, and listing the files in directories).
Mapping - apply the a given function to each element in the DataPipe.
Others - perform miscellaneous set of operations.
Selecting - select specific samples within a DataPipe.
Text - parse, read, and transform text files and data
Archive DataPipes¶
These DataPipes help opening and decompressing archive files of different formats.
Decompresses bz2 binary streams from an Iterable DataPipe which contains tuples of path name and bz2 binary streams, and yields a tuple of path name and extracted binary stream (functional name: |
|
Takes tuples of path and compressed stream of data, and returns tuples of path and decompressed stream of data (functional name: |
|
Decompresses rar binary streams from input Iterable Datapipes which contains tuples of path name and rar binary stream, and yields a tuple of path name and extracted binary stream (functional name: |
|
Opens/decompresses tar binary streams from an Iterable DataPipe which contains tuples of path name and tar binary stream, and yields a tuple of path name and extracted binary stream (functional name: |
|
Opens/decompresses tfrecord binary streams from an Iterable DataPipe which contains tuples of path name and tfrecord binary stream, and yields the stored records (functional name: |
|
Iterable DataPipe that accepts stream of (path, data) tuples, usually, representing the pathnames and files of a tar archive (functional name: |
|
Decompresses xz (lzma) binary streams from an Iterable DataPipe which contains tuples of path name and xy binary streams, and yields a tuple of path name and extracted binary stream (functional name: |
|
Opens/decompresses zip binary streams from an Iterable DataPipe which contains a tuple of path name and zip binary stream, and yields a tuple of path name and extracted binary stream (functional name: |
Augmenting DataPipes¶
These DataPipes help to augment your samples.
Cycles the specified input in perpetuity by default, or for the specified number of times (functional name: |
|
Adds an index to an existing DataPipe through enumeration, with the index starting from 0 by default (functional name: |
|
Adds an index to an existing Iterable DataPipe with (functional name: |
|
Repeatedly yield each element of source DataPipe for the specified number of times before moving onto the next element (functional name: |
Combinatorial DataPipes¶
These DataPipes help to perform combinatorial operations.
Shuffles each mini-batch from the prior DataPipe (functional name: |
|
Generate sample elements using the provided |
|
Shuffle the input DataPipe with a buffer (functional name: |
Combining/Splitting DataPipes¶
These tend to involve multiple DataPipes, combining them or splitting one to many.
Concatenates multiple Iterable DataPipes (functional name: |
|
Splits the input DataPipe into multiple child DataPipes, using the given classification function (functional name: |
|
Creates multiple instances of the same Iterable DataPipe (functional name: |
|
Zips two IterDataPipes together based on the matching key (functional name: |
|
Joins the items from the source IterDataPipe with items from a MapDataPipe (functional name: |
|
Yields one element at a time from each of the input Iterable DataPipes (functional name: |
|
Yields one element at a time from each of the input Iterable DataPipes (functional name: |
|
Splits the input DataPipe into multiple child DataPipes in the round-robin order (functional name: |
|
Takes a Dict of (IterDataPipe, Weight), and yields items by sampling from these DataPipes with respect to their weights. |
|
Takes in a DataPipe of Sequences, unpacks each Sequence, and return the elements in separate DataPipes based on their position in the Sequence (functional name: |
|
Aggregates elements into a tuple from each of the input DataPipes (functional name: |
|
Aggregates elements into a tuple from each of the input DataPipes (functional name: |
Grouping DataPipes¶
These DataPipes have you group samples within a DataPipe.
Creates mini-batches of data (functional name: |
|
Creates mini-batches of data from sorted bucket (functional name: |
|
Collates samples from DataPipe to Tensor(s) by a custom collate function (functional name: |
|
Groups data from IterDataPipe by keys from |
|
Creates mini-batches of data from a min-heap with limited size, and the total length of samples returned by |
|
Undos batching of data (functional name: |
IO DataPipes¶
These DataPipes help interacting with the file systems or remote server (e.g. downloading, opening, saving files, and listing the files in directories).
Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes (functional name: |
|
Iterable DataPipe that loads files from AIStore with the given URLs (functional name: |
|
Lists the contents of the directory at the provided |
|
Opens files from input datapipe which contains fsspec paths and yields a tuple of pathname and opened file stream (functional name: |
|
Takes in a DataPipe of tuples of metadata and data, saves the data to the target path (generated by the filepath_fn and metadata), and yields the resulting fsspec path (functional name: |
|
Given path(s) to the root directory, yields file pathname(s) (path + filename) of files within the root directory. |
|
Given pathnames, opens files and yield pathname and file stream in a tuple (functional name: |
|
Takes URLs pointing at GDrive files, and yields tuples of file name and IO stream (functional name: |
|
Takes file URLs (HTTP URLs pointing to files), and yields tuples of file URL and IO stream (functional name: |
|
Takes in dataset names and returns an Iterable HuggingFace dataset. |
|
Lists the contents of the directory at the provided |
|
Opens files from input datapipe which contains pathnames or URLs, and yields a tuple of pathname and opened file stream (functional name: |
|
Takes in a DataPipe of tuples of metadata and data, saves the data to the target path which is generated by the |
|
Takes file URLs (can be HTTP URLs pointing to files or URLs to GDrive files), and yields tuples of file URL and IO stream (functional name: |
|
Takes in paths to Parquet files and return a TorchArrow DataFrame for each row group within a Parquet file (functional name: |
|
Iterable DataPipe that lists Amazon S3 file URLs with the given prefixes (functional name: |
|
Iterable DataPipe that loads Amazon S3 files from the given S3 URLs (functional name: |
|
Takes in a DataPipe of tuples of metadata and data, saves the data to the target path generated by the |
Mapping DataPipes¶
These DataPipes apply the a given function to each element in the DataPipe.
Combines elements from the source DataPipe to batches and applies a coroutine function over each element within the batch concurrently, then flattens the outpus to a single, unnested IterDataPipe (functional name: |
|
Combines elements from the source DataPipe to batches and applies a function over each batch, then flattens the outputs to a single, unnested IterDataPipe (functional name: |
|
Applies a function over each item from the source DataPipe, then flattens the outputs to a single, unnested IterDataPipe (functional name: |
|
Applies a function over each item from the source DataPipe (functional name: |
|
Applies a function over each item from the source DataPipe, then collects the iterables returned in a buffer, then, at every iteration, chooses at random one of the iterables in the buffer and yields one item from this iterable (functional name: |
|
Applies a function over each item from the source DataPipe concurrently using |
Other DataPipes¶
A miscellaneous set of DataPipes with different functionalities.
Takes rows of data, batches a number of them together and creates TorchArrow DataFrames (functional name: |
|
Indicates when the result of prior DataPipe will be saved local files specified by |
|
Synchronizes data across distributed processes to prevent hanging during training, which is caused by uneven sharded data (functional name: |
|
Computes and checks the hash of each file, from an input DataPipe of tuples of file name and data/stream (functional name: |
|
Stores elements from the source DataPipe in memory, up to a size limit if specified (functional name: |
|
Wraps an iterable object to create an IterDataPipe. |
|
Set the length attribute of the DataPipe, which is returned by |
|
Convert a |
|
Caches the outputs of multiple DataPipe operations to local files, which are typically performance bottleneck such download, decompress, and etc (functional name: |
|
Prefetches one element from the source DataPipe and moves it to pinned memory (functional name: |
|
Prefetches elements from the source DataPipe and puts them into a buffer (functional name: |
|
Randomly split samples from a source DataPipe into groups (functional name: |
|
Expands incoming shard strings into shards. |
|
Wrapper that allows DataPipe to be sharded (functional name: |
|
Wrapper that indicates the prior section of |
Selecting DataPipes¶
These DataPipes helps you select specific samples within a DataPipe.
Filters out elements from the source datapipe according to input |
|
Yields elements from the source DataPipe from the start, up to the specfied limit (functional name: |
|
Drop columns/elements in input DataPipe via its indices (functional name: |
|
returns a slice of elements in input DataPipe via start/stop/step or indices (functional name: |
|
returns a flattened copy of the input DataPipe at the per sample/element level based on provided indices (functional name: |
Text DataPipes¶
These DataPipes help you parse, read, and transform text files and data.
Accepts a DataPipe consists of tuples of file name and CSV data stream, reads and returns the contents within the CSV files one row at a time (functional name: |
|
Accepts a DataPipe consists of tuples of file name and CSV data stream, reads and returns the contents within the CSV files one row at a time (functional name: |
|
Reads from JSON data streams and yields a tuple of file name and JSON data (functional name: |
|
Accepts a DataPipe consisting of tuples of file name and string data stream, and for each line in the stream, yields a tuple of file name and the line (functional name: |
|
Aggregates lines of text from the same file into a single paragraph (functional name: |
|
Decodes binary streams from input DataPipe, yields pathname and decoded data in a tuple. |
|
Accepts an input DataPipe with batches of data, and processes one batch at a time and yields a Dict for each batch, with |
|
Given IO streams and their label names, yield bytes with label name as tuple. |