PromptData¶
- class torchrl.data.PromptData(input_ids: 'torch.Tensor', attention_mask: 'torch.Tensor', prompt_rindex: 'torch.Tensor', labels: 'Optional[torch.Tensor]' = None, logits: 'Optional[torch.Tensor]' = None, loss: 'Optional[torch.Tensor]' = None, *, batch_size, device=None, names=None)[source]¶
- property batch_size: Size¶
Retrieves the batch size for the tensor class.
- Returns:
batch size (torch.Size)
- classmethod fields()¶
Return a tuple describing the fields of this dataclass.
Accepts a dataclass or an instance of one. Tuple elements are of type Field.
- classmethod from_dataset(split, dataset_name=None, max_length=550, root_dir=None, from_disk=False, num_workers: Optional[int] = None)[source]¶
Returns a
PromptData
from a dataset name.- Parameters:
split (str) –
"train"
or"valid"
depending on the data split needed.dataset_name (str, optional) – name of the dataset to be processed. Defaults to
"CarperAI/openai_summarize_comparisons"
.max_length (int, optional) – maximum length of the dataset sequenes. Defaults to 550.
root_dir (path, optional) – the path where the datasets are stored. Defaults to
"$HOME/.cache/torchrl/data"
from_disk (bool, optional) – if
True
,datasets.load_from_disk()
will be used. Otherwise,datasets.load_dataset()
will be used. Defaults toFalse
.num_workers (int, optional) – number of workers for
datasets.dataset.map()
which is called during tokenization. Defaults tomax(os.cpu_count() // 2, 1)
.
- Returns: a
PromptData
instance containing a memory-mapped version of the required dataset.
Examples
>>> data = PromptData.from_dataset("train") >>> print(data) PromptDataTLDR( attention_mask=MemoryMappedTensor(shape=torch.Size([116722, 550]), device=cpu, dtype=torch.int64, is_shared=False), input_ids=MemoryMappedTensor(shape=torch.Size([116722, 550]), device=cpu, dtype=torch.int64, is_shared=False), prompt_rindex=MemoryMappedTensor(shape=torch.Size([116722]), device=cpu, dtype=torch.int64, is_shared=False), labels=MemoryMappedTensor(shape=torch.Size([116722, 550]), device=cpu, dtype=torch.int64, is_shared=False), logits=None, loss=None, batch_size=torch.Size([116722]), device=None, is_shared=False) >>> # data can be sampled from using regular indexing >>> sub_data = data[:3]
- classmethod from_tensordict(tensordict, non_tensordict=None, safe=True)¶
Tensor class wrapper to instantiate a new tensor class object.
- Parameters:
tensordict (TensorDict) – Dictionary of tensor types
non_tensordict (dict) – Dictionary with non-tensor and nested tensor class objects
- get(key: NestedKey, default: Any = _NoDefault.ZERO)¶
Gets the value stored with the input key.
- Parameters:
key (str, tuple of str) – key to be queried. If tuple of str it is equivalent to chained calls of getattr.
default – default value if the key is not found in the tensorclass.
- Returns:
value stored with the input key
- classmethod load(prefix: str | pathlib.Path, *args, **kwargs) T ¶
Loads a tensordict from disk.
This class method is a proxy to
load_memmap()
.
- load_(prefix: str | pathlib.Path, *args, **kwargs)¶
Loads a tensordict from disk within the current tensordict.
This class method is a proxy to
load_memmap_()
.
- classmethod load_memmap(prefix: str | pathlib.Path, device: Optional[device] = None, non_blocking: bool = False, *, out: Optional[TensorDictBase] = None) T ¶
Loads a memory-mapped tensordict from disk.
- Parameters:
prefix (str or Path to folder) – the path to the folder where the saved tensordict should be fetched.
device (torch.device or equivalent, optional) – if provided, the data will be asynchronously cast to that device. Supports “meta” device, in which case the data isn’t loaded but a set of empty “meta” tensors are created. This is useful to get a sense of the total model size and structure without actually opening any file.
non_blocking (bool, optional) – if
True
, synchronize won’t be called after loading tensors on device. Defaults toFalse
.out (TensorDictBase, optional) – optional tensordict where the data should be written.
Examples
>>> from tensordict import TensorDict >>> td = TensorDict.fromkeys(["a", "b", "c", ("nested", "e")], 0) >>> td.memmap("./saved_td") >>> td_load = TensorDict.load_memmap("./saved_td") >>> assert (td == td_load).all()
This method also allows loading nested tensordicts.
Examples
>>> nested = TensorDict.load_memmap("./saved_td/nested") >>> assert nested["e"] == 0
A tensordict can also be loaded on “meta” device or, alternatively, as a fake tensor.
Examples
>>> import tempfile >>> td = TensorDict({"a": torch.zeros(()), "b": {"c": torch.zeros(())}}) >>> with tempfile.TemporaryDirectory() as path: ... td.save(path) ... td_load = TensorDict.load_memmap(path, device="meta") ... print("meta:", td_load) ... from torch._subclasses import FakeTensorMode ... with FakeTensorMode(): ... td_load = TensorDict.load_memmap(path) ... print("fake:", td_load) meta: TensorDict( fields={ a: Tensor(shape=torch.Size([]), device=meta, dtype=torch.float32, is_shared=False), b: TensorDict( fields={ c: Tensor(shape=torch.Size([]), device=meta, dtype=torch.float32, is_shared=False)}, batch_size=torch.Size([]), device=meta, is_shared=False)}, batch_size=torch.Size([]), device=meta, is_shared=False) fake: TensorDict( fields={ a: FakeTensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False), b: TensorDict( fields={ c: FakeTensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)}, batch_size=torch.Size([]), device=cpu, is_shared=False)}, batch_size=torch.Size([]), device=cpu, is_shared=False)
- load_state_dict(state_dict: dict[str, Any], strict=True, assign=False, from_flatten=False)¶
Loads a state_dict attemptedly in-place on the destination tensorclass.
- memmap(prefix: Optional[str] = None, copy_existing: bool = False, *, num_threads: int = 0, return_early: bool = False, share_non_tensor: bool = False, existsok: bool = True) T ¶
Writes all tensors onto a corresponding memory-mapped Tensor in a new tensordict.
- Parameters:
prefix (str) – directory prefix where the memory-mapped tensors will be stored. The directory tree structure will mimic the tensordict’s.
copy_existing (bool) – If False (default), an exception will be raised if an entry in the tensordict is already a tensor stored on disk with an associated file, but is not saved in the correct location according to prefix. If
True
, any existing Tensor will be copied to the new location.
- Keyword Arguments:
num_threads (int, optional) – the number of threads used to write the memmap tensors. Defaults to 0.
return_early (bool, optional) – if
True
andnum_threads>0
, the method will return a future of the tensordict.share_non_tensor (bool, optional) – if
True
, the non-tensor data will be shared between the processes and writing operation (such as inplace update or set) on any of the workers within a single node will update the value on all other workers. If the number of non-tensor leaves is high (e.g., sharing large stacks of non-tensor data) this may result in OOM or similar errors. Defaults toFalse
.existsok (bool, optional) – if
False
, an exception will be raised if a tensor already exists in the same path. Defaults toTrue
.
The TensorDict is then locked, meaning that any writing operations that isn’t in-place will throw an exception (eg, rename, set or remove an entry). Once the tensordict is unlocked, the memory-mapped attribute is turned to
False
, because cross-process identity is not guaranteed anymore.- Returns:
A new tensordict with the tensors stored on disk if
return_early=False
, otherwise aTensorDictFuture
instance.
Note
Serialising in this fashion might be slow with deeply nested tensordicts, so it is not recommended to call this method inside a training loop.
- memmap_(prefix: Optional[str] = None, copy_existing: bool = False, *, num_threads: int = 0, return_early: bool = False, share_non_tensor: bool = False, existsok: bool = True) T ¶
Writes all tensors onto a corresponding memory-mapped Tensor, in-place.
- Parameters:
prefix (str) – directory prefix where the memory-mapped tensors will be stored. The directory tree structure will mimic the tensordict’s.
copy_existing (bool) – If False (default), an exception will be raised if an entry in the tensordict is already a tensor stored on disk with an associated file, but is not saved in the correct location according to prefix. If
True
, any existing Tensor will be copied to the new location.
- Keyword Arguments:
num_threads (int, optional) – the number of threads used to write the memmap tensors. Defaults to 0.
return_early (bool, optional) – if
True
andnum_threads>0
, the method will return a future of the tensordict. The resulting tensordict can be queried using future.result().share_non_tensor (bool, optional) – if
True
, the non-tensor data will be shared between the processes and writing operation (such as inplace update or set) on any of the workers within a single node will update the value on all other workers. If the number of non-tensor leaves is high (e.g., sharing large stacks of non-tensor data) this may result in OOM or similar errors. Defaults toFalse
.existsok (bool, optional) – if
False
, an exception will be raised if a tensor already exists in the same path. Defaults toTrue
.
The TensorDict is then locked, meaning that any writing operations that isn’t in-place will throw an exception (eg, rename, set or remove an entry). Once the tensordict is unlocked, the memory-mapped attribute is turned to
False
, because cross-process identity is not guaranteed anymore.- Returns:
self if
return_early=False
, otherwise aTensorDictFuture
instance.
Note
Serialising in this fashion might be slow with deeply nested tensordicts, so it is not recommended to call this method inside a training loop.
- memmap_like(prefix: Optional[str] = None, copy_existing: bool = False, *, existsok: bool = True, num_threads: int = 0, return_early: bool = False, share_non_tensor: bool = False) T ¶
Creates a contentless Memory-mapped tensordict with the same shapes as the original one.
- Parameters:
prefix (str) – directory prefix where the memory-mapped tensors will be stored. The directory tree structure will mimic the tensordict’s.
copy_existing (bool) – If False (default), an exception will be raised if an entry in the tensordict is already a tensor stored on disk with an associated file, but is not saved in the correct location according to prefix. If
True
, any existing Tensor will be copied to the new location.
- Keyword Arguments:
num_threads (int, optional) – the number of threads used to write the memmap tensors. Defaults to 0.
return_early (bool, optional) – if
True
andnum_threads>0
, the method will return a future of the tensordict.share_non_tensor (bool, optional) – if
True
, the non-tensor data will be shared between the processes and writing operation (such as inplace update or set) on any of the workers within a single node will update the value on all other workers. If the number of non-tensor leaves is high (e.g., sharing large stacks of non-tensor data) this may result in OOM or similar errors. Defaults toFalse
.existsok (bool, optional) – if
False
, an exception will be raised if a tensor already exists in the same path. Defaults toTrue
.
The TensorDict is then locked, meaning that any writing operations that isn’t in-place will throw an exception (eg, rename, set or remove an entry). Once the tensordict is unlocked, the memory-mapped attribute is turned to
False
, because cross-process identity is not guaranteed anymore.- Returns:
A new
TensorDict
instance with data stored as memory-mapped tensors ifreturn_early=False
, otherwise aTensorDictFuture
instance.
Note
This is the recommended method to write a set of large buffers on disk, as
memmap_()
will copy the information, which can be slow for large content.Examples
>>> td = TensorDict({ ... "a": torch.zeros((3, 64, 64), dtype=torch.uint8), ... "b": torch.zeros(1, dtype=torch.int64), ... }, batch_size=[]).expand(1_000_000) # expand does not allocate new memory >>> buffer = td.memmap_like("/path/to/dataset")
- memmap_refresh_()¶
Refreshes the content of the memory-mapped tensordict if it has a
saved_path
.This method will raise an exception if no path is associated with it.
- save(prefix: Optional[str] = None, copy_existing: bool = False, *, num_threads: int = 0, return_early: bool = False, share_non_tensor: bool = False) T ¶
Saves the tensordict to disk.
This function is a proxy to
memmap()
.
- set(key: NestedKey, value: Any, inplace: bool = False, non_blocking: bool = False)¶
Sets a new key-value pair.
- Parameters:
key (str, tuple of str) – name of the key to be set. If tuple of str it is equivalent to chained calls of getattr followed by a final setattr.
value (Any) – value to be stored in the tensorclass
inplace (bool, optional) – if
True
, set will tentatively try to update the value in-place. IfFalse
or if the key isn’t present, the value will be simply written at its destination.
- Returns:
self
- state_dict(destination=None, prefix='', keep_vars=False, flatten=False) dict[str, Any] ¶
Returns a state_dict dictionary that can be used to save and load data from a tensorclass.
- to_tensordict(*, retain_none: Optional[bool] = None) TensorDict ¶
Convert the tensorclass into a regular TensorDict.
Makes a copy of all entries. Memmap and shared memory tensors are converted to regular tensors.
- Parameters:
retain_none (bool) –
if
True
, theNone
values will be written in the tensordict. Otherwise they will be discrarded. Default:True
.Note
from v0.8, the default value will be switched to
False
.- Returns:
A new TensorDict object containing the same values as the tensorclass.
- unbind(dim: int)¶
Returns a tuple of indexed tensorclass instances unbound along the indicated dimension.
Resulting tensorclass instances will share the storage of the initial tensorclass instance.