Shortcuts

ThreadPoolMapper

class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)

Applies a function over each item from the source DataPipe concurrently using ThreadPoolExecutor (functional name: threadpool_map). The function can be any regular Python function or partial object. Lambda function is not recommended as it is not supported by pickle.

Parameters:
  • source_datapipe – Source IterDataPipe

  • fn – Function being applied over each item

  • input_col

    Index or indices of data which fn is applied, such as:

    • None as default to apply fn to the data directly.

    • Integer(s) is used for list/tuple.

    • Key(s) is used for dict.

  • output_col

    Index of data where result of fn is placed. output_col can be specified only when input_col is not None

    • None as default to replace the index that input_col specified; For input_col with multiple indices, the left-most one is used, and other indices will be removed.

    • Integer is used for list/tuple. -1 represents to append result at the end.

    • Key is used for dict. New key is acceptable.

  • scheduled_tasks – How many tasks will be scheduled at any given time (Default value: 128)

  • max_workers – Maximum number of threads to execute function calls

  • **threadpool_kwargs – additional arguments to be given to the ThreadPoolExecutor

Note

For more information about max_workers and additional arguments for the ThreadPoolExecutor please refer to: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

Note

For optimal use of all threads, scheduled_tasks > max_workers is strongly recommended. The higher the variance of the time needed to finish execution of the given fn is, the higher the value of scheduled_tasks needs to be to avoid threads sitting idle while waiting for the next result (as results are returned in correct order).

However, too high value of scheduled_tasks might lead to long waiting period until the first element is yielded as next is called scheduled_tasks many times on source_datapipe before yielding.

We encourage you to try out different values of max_workers and scheduled_tasks in search for optimal values for your use-case.

Example:

# fetching html from remote
def fetch_html(url: str, **kwargs):
    r = requests.get(url, **kwargs)
    r.raise_for_status()
    return r.content
dp = IterableWrapper(urls)
dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x):
    time.sleep(0.1)
    return x * 10

dp = IterableWrapper([(i, i) for i in range(50)])
dp = dp.threadpool_map(mul_ten, input_col=1)
print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)])
dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1)
print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources