Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
ThreadPoolMapper¶
- class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)¶
Applies a function over each item from the source DataPipe concurrently using
ThreadPoolExecutor
(functional name:threadpool_map
). The function can be any regular Python function or partial object. Lambda function is not recommended as it is not supported by pickle.- Parameters:
source_datapipe – Source IterDataPipe
fn – Function being applied over each item
input_col –
Index or indices of data which
fn
is applied, such as:None
as default to applyfn
to the data directly.Integer(s) is used for list/tuple.
Key(s) is used for dict.
output_col –
Index of data where result of
fn
is placed.output_col
can be specified only wheninput_col
is notNone
None
as default to replace the index thatinput_col
specified; Forinput_col
with multiple indices, the left-most one is used, and other indices will be removed.Integer is used for list/tuple.
-1
represents to append result at the end.Key is used for dict. New key is acceptable.
scheduled_tasks – How many tasks will be scheduled at any given time (Default value: 128)
max_workers – Maximum number of threads to execute function calls
**threadpool_kwargs – additional arguments to be given to the
ThreadPoolExecutor
Note
For more information about
max_workers
and additional arguments for theThreadPoolExecutor
please refer to: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutorNote
For optimal use of all threads,
scheduled_tasks
>max_workers
is strongly recommended. The higher the variance of the time needed to finish execution of the givenfn
is, the higher the value ofscheduled_tasks
needs to be to avoid threads sitting idle while waiting for the next result (as results are returned in correct order).However, too high value of
scheduled_tasks
might lead to long waiting period until the first element is yielded asnext
is calledscheduled_tasks
many times onsource_datapipe
before yielding.We encourage you to try out different values of
max_workers
andscheduled_tasks
in search for optimal values for your use-case.Example:
# fetching html from remote def fetch_html(url: str, **kwargs): r = requests.get(url, **kwargs) r.raise_for_status() return r.content dp = IterableWrapper(urls) dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x): time.sleep(0.1) return x * 10 dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1) print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1) print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]