Shortcuts

Attention

June 2024 Status Update: Removing DataPipes and DataLoader V2

We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)

ThreadPoolMapper

class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)

Applies a function over each item from the source DataPipe concurrently using ThreadPoolExecutor (functional name: threadpool_map). The function can be any regular Python function or partial object. Lambda function is not recommended as it is not supported by pickle.

Parameters:
  • source_datapipe – Source IterDataPipe

  • fn – Function being applied over each item

  • input_col

    Index or indices of data which fn is applied, such as:

    • None as default to apply fn to the data directly.

    • Integer(s) is used for list/tuple.

    • Key(s) is used for dict.

  • output_col

    Index of data where result of fn is placed. output_col can be specified only when input_col is not None

    • None as default to replace the index that input_col specified; For input_col with multiple indices, the left-most one is used, and other indices will be removed.

    • Integer is used for list/tuple. -1 represents to append result at the end.

    • Key is used for dict. New key is acceptable.

  • scheduled_tasks – How many tasks will be scheduled at any given time (Default value: 128)

  • max_workers – Maximum number of threads to execute function calls

  • **threadpool_kwargs – additional arguments to be given to the ThreadPoolExecutor

Note

For more information about max_workers and additional arguments for the ThreadPoolExecutor please refer to: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

Note

For optimal use of all threads, scheduled_tasks > max_workers is strongly recommended. The higher the variance of the time needed to finish execution of the given fn is, the higher the value of scheduled_tasks needs to be to avoid threads sitting idle while waiting for the next result (as results are returned in correct order).

However, too high value of scheduled_tasks might lead to long waiting period until the first element is yielded as next is called scheduled_tasks many times on source_datapipe before yielding.

We encourage you to try out different values of max_workers and scheduled_tasks in search for optimal values for your use-case.

Example:

# fetching html from remote
def fetch_html(url: str, **kwargs):
    r = requests.get(url, **kwargs)
    r.raise_for_status()
    return r.content
dp = IterableWrapper(urls)
dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x):
    time.sleep(0.1)
    return x * 10

dp = IterableWrapper([(i, i) for i in range(50)])
dp = dp.threadpool_map(mul_ten, input_col=1)
print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)])
dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1)
print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources