• Docs >
  • Iterable-style DataPipes
Shortcuts

Iterable-style DataPipes

An iterable-style dataset is an instance of a subclass of IterableDataset that implements the __iter__() protocol, and represents an iterable over data samples. This type of datasets is particularly suitable for cases where random reads are expensive or even improbable, and where the batch size depends on the fetched data.

For example, such a dataset, when called iter(iterdatapipe), could return a stream of data reading from a database, a remote server, or even logs generated in real time.

This is an updated version of IterableDataset in torch.

class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite __iter__(), which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note

Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

Note

When a subclass is used with DataLoader, each item in the DataPipe will be yielded from the DataLoader iterator. When num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. get_worker_info(), when called in a worker process, returns information about the worker. It can be used in either the dataset’s __iter__() method or the DataLoader ‘s worker_init_fn option to modify each copy’s behavior.

Examples

General Usage:
>>> # xdoctest: +SKIP
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> dp = IterableWrapper(range(10))
>>> map_dp_1 = Mapper(dp, lambda x: x + 1)  # Using class constructor
>>> map_dp_2 = dp.map(lambda x: x + 1)  # Using functional form (recommended)
>>> list(map_dp_1)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> list(map_dp_2)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0)
>>> list(filter_dp)
[2, 4, 6, 8, 10]
Single Iterator Constraint Example:
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> source_dp = IterableWrapper(range(10))
>>> it1 = iter(source_dp)
>>> list(it1)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> it1 = iter(source_dp)
>>> it2 = iter(source_dp)  # The creation of a new iterator invalidates `it1`
>>> next(it2)
0
>>> next(it1)  # Further usage of `it1` will raise a `RunTimeError`

We have different types of Iterable DataPipes:

  1. Archive - open and decompress archive files of different formats.

  2. Augmenting - augment your samples (e.g. adding index, or cycle through indefinitely).

  3. Combinatorial - perform combinatorial operations (e.g. sampling, shuffling).

  4. Combining/Splitting - interact with multiple DataPipes by combining them or splitting one to many.

  5. Grouping - group samples within a DataPipe

  6. IO - interacting with the file systems or remote server (e.g. downloading, opening, saving files, and listing the files in directories).

  7. Mapping - apply the a given function to each element in the DataPipe.

  8. Others - perform miscellaneous set of operations.

  9. Selecting - select specific samples within a DataPipe.

  10. Text - parse, read, and transform text files and data

Archive DataPipes

These DataPipes help opening and decompressing archive files of different formats.

Bz2FileLoader

Decompresses bz2 binary streams from an Iterable DataPipe which contains tuples of path name and bz2 binary streams, and yields a tuple of path name and extracted binary stream (functional name: load_from_bz2).

Decompressor

Takes tuples of path and compressed stream of data, and returns tuples of path and decompressed stream of data (functional name: decompress).

RarArchiveLoader

Decompresses rar binary streams from input Iterable Datapipes which contains tuples of path name and rar binary stream, and yields a tuple of path name and extracted binary stream (functional name: load_from_rar).

TarArchiveLoader

Opens/decompresses tar binary streams from an Iterable DataPipe which contains tuples of path name and tar binary stream, and yields a tuple of path name and extracted binary stream (functional name: load_from_tar).

TFRecordLoader

Opens/decompresses tfrecord binary streams from an Iterable DataPipe which contains tuples of path name and tfrecord binary stream, and yields the stored records (functional name: load_from_tfrecord).

WebDataset

Iterable DataPipe that accepts stream of (path, data) tuples, usually, representing the pathnames and files of a tar archive (functional name: webdataset).

XzFileLoader

Decompresses xz (lzma) binary streams from an Iterable DataPipe which contains tuples of path name and xy binary streams, and yields a tuple of path name and extracted binary stream (functional name: load_from_xz).

ZipArchiveLoader

Opens/decompresses zip binary streams from an Iterable DataPipe which contains a tuple of path name and zip binary stream, and yields a tuple of path name and extracted binary stream (functional name: load_from_zip).

Augmenting DataPipes

These DataPipes help to augment your samples.

Cycler

Cycles the specified input in perpetuity by default, or for the specified number of times (functional name: cycle).

Enumerator

Adds an index to an existing DataPipe through enumeration, with the index starting from 0 by default (functional name: enumerate).

IndexAdder

Adds an index to an existing Iterable DataPipe with (functional name: add_index).

Repeater

Repeatedly yield each element of source DataPipe for the specified number of times before moving onto the next element (functional name: repeat).

Combinatorial DataPipes

These DataPipes help to perform combinatorial operations.

InBatchShuffler

Shuffles each mini-batch from the prior DataPipe (functional name: in_batch_shuffle).

Sampler

Generate sample elements using the provided Sampler (defaults to SequentialSampler).

Shuffler

Shuffle the input DataPipe with a buffer (functional name: shuffle).

Combining/Splitting DataPipes

These tend to involve multiple DataPipes, combining them or splitting one to many.

Concater

Concatenates multiple Iterable DataPipes (functional name: concat).

Demultiplexer

Splits the input DataPipe into multiple child DataPipes, using the given classification function (functional name: demux).

Forker

Creates multiple instances of the same Iterable DataPipe (functional name: fork).

IterKeyZipper

Zips two IterDataPipes together based on the matching key (functional name: zip_with_iter).

MapKeyZipper

Joins the items from the source IterDataPipe with items from a MapDataPipe (functional name: zip_with_map).

Multiplexer

Yields one element at a time from each of the input Iterable DataPipes (functional name: mux).

MultiplexerLongest

Yields one element at a time from each of the input Iterable DataPipes (functional name: mux_longest).

RoundRobinDemultiplexer

Splits the input DataPipe into multiple child DataPipes in the round-robin order (functional name: round_robin_demux).

SampleMultiplexer

Takes a Dict of (IterDataPipe, Weight), and yields items by sampling from these DataPipes with respect to their weights.

UnZipper

Takes in a DataPipe of Sequences, unpacks each Sequence, and return the elements in separate DataPipes based on their position in the Sequence (functional name: unzip).

Zipper

Aggregates elements into a tuple from each of the input DataPipes (functional name: zip).

ZipperLongest

Aggregates elements into a tuple from each of the input DataPipes (functional name: zip_longest).

Grouping DataPipes

These DataPipes have you group samples within a DataPipe.

Batcher

Creates mini-batches of data (functional name: batch).

BucketBatcher

Creates mini-batches of data from sorted bucket (functional name: bucketbatch).

Collator

Collates samples from DataPipe to Tensor(s) by a custom collate function (functional name: collate).

Grouper

Groups data from IterDataPipe by keys from group_key_fn, yielding a DataChunk with batch size up to group_size.

MaxTokenBucketizer

Creates mini-batches of data from a min-heap with limited size, and the total length of samples returned by len_fn within each batch will be limited by max_token_count (functional name: max_token_bucketize).

UnBatcher

Undos batching of data (functional name: unbatch).

IO DataPipes

These DataPipes help interacting with the file systems or remote server (e.g. downloading, opening, saving files, and listing the files in directories).

AISFileLister

Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes (functional name: list_files_by_ais).

AISFileLoader

Iterable DataPipe that loads files from AIStore with the given URLs (functional name: load_files_by_ais).

FSSpecFileLister

Lists the contents of the directory at the provided root pathname or URL, and yields the full pathname or URL for each file within the directory (functional name: list_files_by_fsspec).

FSSpecFileOpener

Opens files from input datapipe which contains fsspec paths and yields a tuple of pathname and opened file stream (functional name: open_files_by_fsspec).

FSSpecSaver

Takes in a DataPipe of tuples of metadata and data, saves the data to the target path (generated by the filepath_fn and metadata), and yields the resulting fsspec path (functional name: save_by_fsspec).

FileLister

Given path(s) to the root directory, yields file pathname(s) (path + filename) of files within the root directory.

FileOpener

Given pathnames, opens files and yield pathname and file stream in a tuple (functional name: open_files).

GDriveReader

Takes URLs pointing at GDrive files, and yields tuples of file name and IO stream (functional name: read_from_gdrive).

HttpReader

Takes file URLs (HTTP URLs pointing to files), and yields tuples of file URL and IO stream (functional name: read_from_http).

HuggingFaceHubReader

Takes in dataset names and returns an Iterable HuggingFace dataset.

IoPathFileLister

Lists the contents of the directory at the provided root pathname or URL, and yields the full pathname or URL for each file within the directory (functional name: list_files_by_iopath).

IoPathFileOpener

Opens files from input datapipe which contains pathnames or URLs, and yields a tuple of pathname and opened file stream (functional name: open_files_by_iopath).

IoPathSaver

Takes in a DataPipe of tuples of metadata and data, saves the data to the target path which is generated by the filepath_fn and metadata, and yields the resulting path in iopath format (functional name: save_by_iopath).

OnlineReader

Takes file URLs (can be HTTP URLs pointing to files or URLs to GDrive files), and yields tuples of file URL and IO stream (functional name: read_from_remote).

ParquetDataFrameLoader

Takes in paths to Parquet files and return a TorchArrow DataFrame for each row group within a Parquet file (functional name: load_parquet_as_df).

S3FileLister

Iterable DataPipe that lists Amazon S3 file URLs with the given prefixes (functional name: list_files_by_s3).

S3FileLoader

Iterable DataPipe that loads Amazon S3 files from the given S3 URLs (functional name: load_files_by_s3).

Saver

Takes in a DataPipe of tuples of metadata and data, saves the data to the target path generated by the filepath_fn and metadata, and yields file path on local file system (functional name: save_to_disk).

Mapping DataPipes

These DataPipes apply the a given function to each element in the DataPipe.

BatchAsyncMapper

Combines elements from the source DataPipe to batches and applies a coroutine function over each element within the batch concurrently, then flattens the outpus to a single, unnested IterDataPipe (functional name: async_map_batches).

BatchMapper

Combines elements from the source DataPipe to batches and applies a function over each batch, then flattens the outputs to a single, unnested IterDataPipe (functional name: map_batches).

FlatMapper

Applies a function over each item from the source DataPipe, then flattens the outputs to a single, unnested IterDataPipe (functional name: flatmap).

Mapper

Applies a function over each item from the source DataPipe (functional name: map).

ShuffledFlatMapper

Applies a function over each item from the source DataPipe, then collects the iterables returned in a buffer, then, at every iteration, chooses at random one of the iterables in the buffer and yields one item from this iterable (functional name: shuffled_flatmap).

ThreadPoolMapper

Applies a function over each item from the source DataPipe concurrently using ThreadPoolExecutor (functional name: threadpool_map).

Other DataPipes

A miscellaneous set of DataPipes with different functionalities.

DataFrameMaker

Takes rows of data, batches a number of them together and creates TorchArrow DataFrames (functional name: dataframe).

EndOnDiskCacheHolder

Indicates when the result of prior DataPipe will be saved local files specified by filepath_fn (functional name: end_caching).

FullSync

Synchronizes data across distributed processes to prevent hanging during training, which is caused by uneven sharded data (functional name: fullsync).

HashChecker

Computes and checks the hash of each file, from an input DataPipe of tuples of file name and data/stream (functional name: check_hash).

InMemoryCacheHolder

Stores elements from the source DataPipe in memory, up to a size limit if specified (functional name: in_memory_cache).

IterableWrapper

Wraps an iterable object to create an IterDataPipe.

LengthSetter

Set the length attribute of the DataPipe, which is returned by __len__ (functional name: set_length).

MapToIterConverter

Convert a MapDataPipe to an IterDataPipe (functional name: to_iter_datapipe).

OnDiskCacheHolder

Caches the outputs of multiple DataPipe operations to local files, which are typically performance bottleneck such download, decompress, and etc (functional name: on_disk_cache).

PinMemory

Prefetches one element from the source DataPipe and moves it to pinned memory (functional name: pin_memory).

Prefetcher

Prefetches elements from the source DataPipe and puts them into a buffer (functional name: prefetch).

RandomSplitter

Randomly split samples from a source DataPipe into groups (functional name: random_split).

ShardExpander

Expands incoming shard strings into shards.

ShardingFilter

Wrapper that allows DataPipe to be sharded (functional name: sharding_filter).

ShardingRoundRobinDispatcher

Wrapper that indicates the prior section of DataPipe graph is non-replicable and will be iterated in a separate, single dispatching process to distribute data to worker processes in a round-robin manner when multiprocessing is being used.

Selecting DataPipes

These DataPipes helps you select specific samples within a DataPipe.

Filter

Filters out elements from the source datapipe according to input filter_fn (functional name: filter).

Header

Yields elements from the source DataPipe from the start, up to the specfied limit (functional name: header).

Dropper

Drop columns/elements in input DataPipe via its indices (functional name: drop).

Slicer

returns a slice of elements in input DataPipe via start/stop/step or indices (functional name: slice).

Flattener

returns a flattened copy of the input DataPipe at the per sample/element level based on provided indices (functional name: flatten).

Text DataPipes

These DataPipes help you parse, read, and transform text files and data.

CSVDictParser

Accepts a DataPipe consists of tuples of file name and CSV data stream, reads and returns the contents within the CSV files one row at a time (functional name: parse_csv_as_dict).

CSVParser

Accepts a DataPipe consists of tuples of file name and CSV data stream, reads and returns the contents within the CSV files one row at a time (functional name: parse_csv).

JsonParser

Reads from JSON data streams and yields a tuple of file name and JSON data (functional name: parse_json_files).

LineReader

Accepts a DataPipe consisting of tuples of file name and string data stream, and for each line in the stream, yields a tuple of file name and the line (functional name: readlines).

ParagraphAggregator

Aggregates lines of text from the same file into a single paragraph (functional name: lines_to_paragraphs).

RoutedDecoder

Decodes binary streams from input DataPipe, yields pathname and decoded data in a tuple.

Rows2Columnar

Accepts an input DataPipe with batches of data, and processes one batch at a time and yields a Dict for each batch, with column_names as keys and lists of corresponding values from each row as values (functional name: rows2columnar).

StreamReader

Given IO streams and their label names, yield bytes with label name as tuple.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources