ThreadPoolMapper¶
- class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)¶
Applies a function over each item from the source DataPipe concurrently using
ThreadPoolExecutor
(functional name:threadpool_map
). The function can be any regular Python function or partial object. Lambda function is not recommended as it is not supported by pickle.- Parameters:
source_datapipe – Source IterDataPipe
fn – Function being applied over each item
input_col –
Index or indices of data which
fn
is applied, such as:None
as default to applyfn
to the data directly.Integer(s) is used for list/tuple.
Key(s) is used for dict.
output_col –
Index of data where result of
fn
is placed.output_col
can be specified only wheninput_col
is notNone
None
as default to replace the index thatinput_col
specified; Forinput_col
with multiple indices, the left-most one is used, and other indices will be removed.Integer is used for list/tuple.
-1
represents to append result at the end.Key is used for dict. New key is acceptable.
scheduled_tasks – How many tasks will be scheduled at any given time (Default value: 128)
max_workers – Maximum number of threads to execute function calls
**threadpool_kwargs – additional arguments to be given to the
ThreadPoolExecutor
Note
For more information about
max_workers
and additional arguments for theThreadPoolExecutor
please refer to: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutorNote
For optimal use of all threads,
scheduled_tasks
>max_workers
is strongly recommended. The higher the variance of the time needed to finish execution of the givenfn
is, the higher the value ofscheduled_tasks
needs to be to avoid threads sitting idle while waiting for the next result (as results are returned in correct order).However, too high value of
scheduled_tasks
might lead to long waiting period until the first element is yielded asnext
is calledscheduled_tasks
many times onsource_datapipe
before yielding.We encourage you to try out different values of
max_workers
andscheduled_tasks
in search for optimal values for your use-case.Example:
# fetching html from remote def fetch_html(url: str, **kwargs): r = requests.get(url, **kwargs) r.raise_for_status() return r.content dp = IterableWrapper(urls) dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x): time.sleep(0.1) return x * 10 dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1) print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1) print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]