D4RLExperienceReplay¶
- class torchrl.data.datasets.D4RLExperienceReplay(dataset_id, batch_size: int, sampler: Sampler | None = None, writer: Writer | None = None, collate_fn: Callable | None = None, pin_memory: bool = False, prefetch: int | None = None, transform: 'torchrl.envs.Transform' | None = None, split_trajs: bool = False, from_env: bool = False, use_truncated_as_done: bool = True, direct_download: bool = None, terminate_on_end: bool = None, download: bool = True, root: str | Path | None = None, **env_kwargs)[source]¶
An Experience replay class for D4RL.
To install D4RL, follow the instructions on the official repo.
The data format follows the TED convention. The replay buffer contains the env specs under D4RLExperienceReplay.specs.
If present, metadata will be written in
D4RLExperienceReplay.metadata
and excluded from the dataset.The transitions are reconstructed using
done = terminated | truncated
and the("next", "observation")
of"done"
states are zeroed.- Parameters:
dataset_id (str) – the dataset_id of the D4RL env to get the data from.
batch_size (int) – the batch size to use during sampling.
sampler (Sampler, optional) – the sampler to be used. If none is provided a default RandomSampler() will be used.
writer (Writer, optional) – the writer to be used. If none is provided a default
ImmutableDatasetWriter
will be used.collate_fn (callable, optional) – merges a list of samples to form a mini-batch of Tensor(s)/outputs. Used when using batched loading from a map-style dataset.
pin_memory (bool) – whether pin_memory() should be called on the rb samples.
prefetch (int, optional) – number of next batches to be prefetched using multithreading.
transform (Transform, optional) – Transform to be executed when sample() is called. To chain transforms use the
Compose
class.split_trajs (bool, optional) – if
True
, the trajectories will be split along the first dimension and padded to have a matching shape. To split the trajectories, the"done"
signal will be used, which is recovered viadone = truncated | terminated
. In other words, it is assumed that anytruncated
orterminated
signal is equivalent to the end of a trajectory. For some datasets fromD4RL
, this may not be true. It is up to the user to make accurate choices regarding this usage ofsplit_trajs
. Defaults toFalse
.from_env (bool, optional) –
if
True
,env.get_dataset()
will be used to retrieve the dataset. Otherwised4rl.qlearning_dataset()
will be used. Defaults toTrue
.Note
Using
from_env=False
will provide fewer data thanfrom_env=True
. For instance, the info keys will be left out. Usually,from_env=False
withterminate_on_end=True
will lead to the same result asfrom_env=True
, with the latter containing meta-data and info entries that the former does not possess.Note
The keys in
from_env=True
andfrom_env=False
may unexpectedly differ. In particular, the"truncated"
key (used to determine the end of an episode) may be absent whenfrom_env=False
but present otherwise, leading to a different slicing whentraj_splits
is enabled.direct_download (bool) – if
True
, the data will be downloaded without requiring D4RL. IfNone
, ifd4rl
is present in the env it will be used to download the dataset, otherwise the download will fall back ondirect_download=True
. This is not compatible withfrom_env=True
. Defaults toNone
.use_truncated_as_done (bool, optional) – if
True
,done = terminated | truncated
. Otherwise, only theterminated
key is used. Defaults toTrue
.terminate_on_end (bool, optional) – Set
done=True
on the last timestep in a trajectory. Default isFalse
, and will discard the last timestep in each trajectory. This is to be used only withdirect_download=False
.root (Path or str, optional) – The D4RL dataset root directory. The actual dataset memory-mapped files will be saved under <root>/<dataset_id>. If none is provided, it defaults to ``~/.cache/torchrl/d4rl`.
download (bool, optional) – Whether the dataset should be downloaded if not found. Defaults to
True
.**env_kwargs (key-value pairs) – additional kwargs for
d4rl.qlearning_dataset()
.
Examples
>>> from torchrl.data.datasets.d4rl import D4RLExperienceReplay >>> from torchrl.envs import ObservationNorm >>> data = D4RLExperienceReplay("maze2d-umaze-v1", 128) >>> # we can append transforms to the dataset >>> data.append_transform(ObservationNorm(loc=-1, scale=1.0, in_keys=["observation"])) >>> data.sample(128)
- add(data: TensorDictBase) int ¶
Add a single element to the replay buffer.
- Parameters:
data (Any) – data to be added to the replay buffer
- Returns:
index where the data lives in the replay buffer.
- append_transform(transform: Transform, *, invert: bool = False) ReplayBuffer ¶
Appends transform at the end.
Transforms are applied in order when sample is called.
- Parameters:
transform (Transform) – The transform to be appended
- Keyword Arguments:
invert (bool, optional) – if
True
, the transform will be inverted (forward calls will be called during writing and inverse calls during reading). Defaults toFalse
.
Example
>>> rb = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=4) >>> data = TensorDict({"a": torch.zeros(10)}, [10]) >>> def t(data): ... data += 1 ... return data >>> rb.append_transform(t, invert=True) >>> rb.extend(data) >>> assert (data == 1).all()
- property data_path: Path¶
Path to the dataset, including split.
- property data_path_root: Path¶
Path to the dataset root.
- delete()¶
Deletes a dataset storage from disk.
- dumps(path)¶
Saves the replay buffer on disk at the specified path.
- Parameters:
path (Path or str) – path where to save the replay buffer.
Examples
>>> import tempfile >>> import tqdm >>> from torchrl.data import LazyMemmapStorage, TensorDictReplayBuffer >>> from torchrl.data.replay_buffers.samplers import PrioritizedSampler, RandomSampler >>> import torch >>> from tensordict import TensorDict >>> # Build and populate the replay buffer >>> S = 1_000_000 >>> sampler = PrioritizedSampler(S, 1.1, 1.0) >>> # sampler = RandomSampler() >>> storage = LazyMemmapStorage(S) >>> rb = TensorDictReplayBuffer(storage=storage, sampler=sampler) >>> >>> for _ in tqdm.tqdm(range(100)): ... td = TensorDict({"obs": torch.randn(100, 3, 4), "next": {"obs": torch.randn(100, 3, 4)}, "td_error": torch.rand(100)}, [100]) ... rb.extend(td) ... sample = rb.sample(32) ... rb.update_tensordict_priority(sample) >>> # save and load the buffer >>> with tempfile.TemporaryDirectory() as tmpdir: ... rb.dumps(tmpdir) ... ... sampler = PrioritizedSampler(S, 1.1, 1.0) ... # sampler = RandomSampler() ... storage = LazyMemmapStorage(S) ... rb_load = TensorDictReplayBuffer(storage=storage, sampler=sampler) ... rb_load.loads(tmpdir) ... assert len(rb) == len(rb_load)
- empty()¶
Empties the replay buffer and reset cursor to 0.
- extend(tensordicts: TensorDictBase) Tensor ¶
Extends the replay buffer with one or more elements contained in an iterable.
If present, the inverse transforms will be called.`
- Parameters:
data (iterable) – collection of data to be added to the replay buffer.
- Returns:
Indices of the data added to the replay buffer.
Warning
extend()
can have an ambiguous signature when dealing with lists of values, which should be interpreted either as PyTree (in which case all elements in the list will be put in a slice in the stored PyTree in the storage) or a list of values to add one at a time. To solve this, TorchRL makes the clear-cut distinction between list and tuple: a tuple will be viewed as a PyTree, a list (at the root level) will be interpreted as a stack of values to add one at a time to the buffer. ForListStorage
instances, only unbound elements can be provided (no PyTrees).
- insert_transform(index: int, transform: Transform, *, invert: bool = False) ReplayBuffer ¶
Inserts transform.
Transforms are executed in order when sample is called.
- Parameters:
index (int) – Position to insert the transform.
transform (Transform) – The transform to be appended
- Keyword Arguments:
invert (bool, optional) – if
True
, the transform will be inverted (forward calls will be called during writing and inverse calls during reading). Defaults toFalse
.
- loads(path)¶
Loads a replay buffer state at the given path.
The buffer should have matching components and be saved using
dumps()
.- Parameters:
path (Path or str) – path where the replay buffer was saved.
See
dumps()
for more info.
- preprocess(fn: Callable[[TensorDictBase], TensorDictBase], dim: int = 0, num_workers: int | None = None, *, chunksize: int | None = None, num_chunks: int | None = None, pool: mp.Pool | None = None, generator: torch.Generator | None = None, max_tasks_per_child: int | None = None, worker_threads: int = 1, index_with_generator: bool = False, pbar: bool = False, mp_start_method: str | None = None, num_frames: int | None = None, dest: str | Path) TensorStorage ¶
Preprocesses a dataset and returns a new storage with the formatted data.
The data transform must be unitary (work on a single sample of the dataset).
Args and Keyword Args are forwarded to
map()
.The dataset can subsequently be deleted using
delete()
.- Keyword Arguments:
dest (path or equivalent) – a path to the location of the new dataset.
num_frames (int, optional) – if provided, only the first num_frames will be transformed. This is useful to debug the transform at first.
Returns: A new storage to be used within a
ReplayBuffer
instance.Examples
>>> from torchrl.data.datasets import MinariExperienceReplay >>> >>> data = MinariExperienceReplay( ... list(MinariExperienceReplay.available_datasets)[0], ... batch_size=32 ... ) >>> print(data) MinariExperienceReplay( storages=TensorStorage(TensorDict( fields={ action: MemoryMappedTensor(shape=torch.Size([1000000, 8]), device=cpu, dtype=torch.float32, is_shared=True), episode: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.int64, is_shared=True), info: TensorDict( fields={ distance_from_origin: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), forward_reward: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), goal: MemoryMappedTensor(shape=torch.Size([1000000, 2]), device=cpu, dtype=torch.float64, is_shared=True), qpos: MemoryMappedTensor(shape=torch.Size([1000000, 15]), device=cpu, dtype=torch.float64, is_shared=True), qvel: MemoryMappedTensor(shape=torch.Size([1000000, 14]), device=cpu, dtype=torch.float64, is_shared=True), reward_ctrl: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), reward_forward: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), reward_survive: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), success: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.bool, is_shared=True), x_position: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), x_velocity: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), y_position: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), y_velocity: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False), next: TensorDict( fields={ done: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.bool, is_shared=True), info: TensorDict( fields={ distance_from_origin: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), forward_reward: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), goal: MemoryMappedTensor(shape=torch.Size([1000000, 2]), device=cpu, dtype=torch.float64, is_shared=True), qpos: MemoryMappedTensor(shape=torch.Size([1000000, 15]), device=cpu, dtype=torch.float64, is_shared=True), qvel: MemoryMappedTensor(shape=torch.Size([1000000, 14]), device=cpu, dtype=torch.float64, is_shared=True), reward_ctrl: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), reward_forward: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), reward_survive: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), success: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.bool, is_shared=True), x_position: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), x_velocity: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), y_position: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True), y_velocity: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.float64, is_shared=True)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False), observation: TensorDict( fields={ achieved_goal: MemoryMappedTensor(shape=torch.Size([1000000, 2]), device=cpu, dtype=torch.float64, is_shared=True), desired_goal: MemoryMappedTensor(shape=torch.Size([1000000, 2]), device=cpu, dtype=torch.float64, is_shared=True), observation: MemoryMappedTensor(shape=torch.Size([1000000, 27]), device=cpu, dtype=torch.float64, is_shared=True)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False), reward: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.float64, is_shared=True), terminated: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.bool, is_shared=True), truncated: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.bool, is_shared=True)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False), observation: TensorDict( fields={ achieved_goal: MemoryMappedTensor(shape=torch.Size([1000000, 2]), device=cpu, dtype=torch.float64, is_shared=True), desired_goal: MemoryMappedTensor(shape=torch.Size([1000000, 2]), device=cpu, dtype=torch.float64, is_shared=True), observation: MemoryMappedTensor(shape=torch.Size([1000000, 27]), device=cpu, dtype=torch.float64, is_shared=True)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False)), samplers=RandomSampler, writers=ImmutableDatasetWriter(), batch_size=32, transform=Compose( ), collate_fn=<function _collate_id at 0x120e21dc0>) >>> from torchrl.envs import CatTensors, Compose >>> from tempfile import TemporaryDirectory >>> >>> cat_tensors = CatTensors( ... in_keys=[("observation", "observation"), ("observation", "achieved_goal"), ... ("observation", "desired_goal")], ... out_key="obs" ... ) >>> cat_next_tensors = CatTensors( ... in_keys=[("next", "observation", "observation"), ... ("next", "observation", "achieved_goal"), ... ("next", "observation", "desired_goal")], ... out_key=("next", "obs") ... ) >>> t = Compose(cat_tensors, cat_next_tensors) >>> >>> def func(td): ... td = td.select( ... "action", ... "episode", ... ("next", "done"), ... ("next", "observation"), ... ("next", "reward"), ... ("next", "terminated"), ... ("next", "truncated"), ... "observation" ... ) ... td = t(td) ... return td >>> with TemporaryDirectory() as tmpdir: ... new_storage = data.preprocess(func, num_workers=4, pbar=True, mp_start_method="fork", dest=tmpdir) ... rb = ReplayBuffer(storage=new_storage) ... print(rb) ReplayBuffer( storage=TensorStorage( data=TensorDict( fields={ action: MemoryMappedTensor(shape=torch.Size([1000000, 8]), device=cpu, dtype=torch.float32, is_shared=True), episode: MemoryMappedTensor(shape=torch.Size([1000000]), device=cpu, dtype=torch.int64, is_shared=True), next: TensorDict( fields={ done: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.bool, is_shared=True), obs: MemoryMappedTensor(shape=torch.Size([1000000, 31]), device=cpu, dtype=torch.float64, is_shared=True), observation: TensorDict( fields={ }, batch_size=torch.Size([1000000]), device=cpu, is_shared=False), reward: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.float64, is_shared=True), terminated: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.bool, is_shared=True), truncated: MemoryMappedTensor(shape=torch.Size([1000000, 1]), device=cpu, dtype=torch.bool, is_shared=True)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False), obs: MemoryMappedTensor(shape=torch.Size([1000000, 31]), device=cpu, dtype=torch.float64, is_shared=True), observation: TensorDict( fields={ }, batch_size=torch.Size([1000000]), device=cpu, is_shared=False)}, batch_size=torch.Size([1000000]), device=cpu, is_shared=False), shape=torch.Size([1000000]), len=1000000, max_size=1000000), sampler=RandomSampler(), writer=RoundRobinWriter(cursor=0, full_storage=True), batch_size=None, collate_fn=<function _collate_id at 0x168406fc0>)
- register_load_hook(hook: Callable[[Any], Any])¶
Registers a load hook for the storage.
Note
Hooks are currently not serialized when saving a replay buffer: they must be manually re-initialized every time the buffer is created.
- register_save_hook(hook: Callable[[Any], Any])¶
Registers a save hook for the storage.
Note
Hooks are currently not serialized when saving a replay buffer: they must be manually re-initialized every time the buffer is created.
- sample(batch_size: Optional[int] = None, return_info: bool = False, include_info: Optional[bool] = None) TensorDictBase ¶
Samples a batch of data from the replay buffer.
Uses Sampler to sample indices, and retrieves them from Storage.
- Parameters:
batch_size (int, optional) – size of data to be collected. If none is provided, this method will sample a batch-size as indicated by the sampler.
return_info (bool) – whether to return info. If True, the result is a tuple (data, info). If False, the result is the data.
- Returns:
A tensordict containing a batch of data selected in the replay buffer. A tuple containing this tensordict and info if return_info flag is set to True.
- set_sampler(sampler: Sampler)¶
Sets a new sampler in the replay buffer and returns the previous sampler.
- set_storage(storage: Storage, collate_fn: Optional[Callable] = None)¶
Sets a new storage in the replay buffer and returns the previous storage.
- Parameters:
storage (Storage) – the new storage for the buffer.
collate_fn (callable, optional) – if provided, the collate_fn is set to this value. Otherwise it is reset to a default value.
- property write_count¶
The total number of items written so far in the buffer through add and extend.