Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
BucketBatcher¶
- class torchdata.datapipes.iter.BucketBatcher(datapipe: IterDataPipe[T_co], batch_size: int, drop_last: bool = False, batch_num: int = 100, bucket_num: int = 1, sort_key: Optional[Callable] = None, use_in_batch_shuffle: bool = True)¶
Creates mini-batches of data from sorted bucket (functional name:
bucketbatch
). An outer dimension will be added asbatch_size
ifdrop_last
is set toTrue
, orlength % batch_size
for the last batch ifdrop_last
is set toFalse
.The purpose of this DataPipe is to batch samples with some similarity according to the sorting function being passed. For an example in the text domain, it may be batching examples with similar number of tokens to minimize padding and to increase throughput.
- Parameters:
datapipe – Iterable DataPipe being batched
batch_size – The size of each batch
drop_last – Option to drop the last batch if it’s not full
batch_num – Number of batches within a bucket (i.e. bucket_size = batch_size * batch_num)
bucket_num – Number of buckets to consist a pool for shuffling (i.e. pool_size = bucket_size * bucket_num)
sort_key – Callable to sort a bucket (list)
use_in_batch_shuffle – if True, do in-batch shuffle; if False, buffer shuffle
Example
>>> from torchdata.datapipes.iter import IterableWrapper >>> source_dp = IterableWrapper(range(10)) >>> batch_dp = source_dp.bucketbatch(batch_size=3, drop_last=True) >>> list(batch_dp) [[5, 6, 7], [9, 0, 1], [4, 3, 2]] >>> def sort_bucket(bucket): >>> return sorted(bucket) >>> batch_dp = source_dp.bucketbatch( >>> batch_size=3, drop_last=True, batch_num=100, >>> bucket_num=1, use_in_batch_shuffle=False, sort_key=sort_bucket >>> ) >>> list(batch_dp) [[3, 4, 5], [6, 7, 8], [0, 1, 2]]