Shortcuts

TensorDictPrioritizedReplayBuffer

class torchrl.data.TensorDictPrioritizedReplayBuffer(*, alpha: float, beta: float, priority_key: str = 'td_error', eps: float = 1e-08, storage: Storage | None = None, collate_fn: Callable | None = None, pin_memory: bool = False, prefetch: int | None = None, transform: 'Transform' | None = None, reduction: str = 'max', batch_size: int | None = None, dim_extend: int | None = None, generator: torch.Generator | None = None, shared: bool = False)[source]

TensorDict-specific wrapper around the PrioritizedReplayBuffer class.

This class returns tensordicts with a new key "index" that represents the index of each element in the replay buffer. It also provides the update_tensordict_priority() method that only requires for the tensordict to be passed to it with its new priority value.

Keyword Arguments:
  • alpha (float) – exponent α determines how much prioritization is used, with α = 0 corresponding to the uniform case.

  • beta (float) – importance sampling negative exponent.

  • eps (float) – delta added to the priorities to ensure that the buffer does not contain null priorities.

  • storage (Storage, optional) – the storage to be used. If none is provided a default ListStorage with max_size of 1_000 will be created.

  • collate_fn (callable, optional) – merges a list of samples to form a mini-batch of Tensor(s)/outputs. Used when using batched loading from a map-style dataset. The default value will be decided based on the storage type.

  • pin_memory (bool) – whether pin_memory() should be called on the rb samples.

  • prefetch (int, optional) – number of next batches to be prefetched using multithreading. Defaults to None (no prefetching).

  • transform (Transform, optional) – Transform to be executed when sample() is called. To chain transforms use the Compose class. Transforms should be used with tensordict.TensorDict content. If used with other structures, the transforms should be encoded with a "data" leading key that will be used to construct a tensordict from the non-tensordict content.

  • batch_size (int, optional) –

    the batch size to be used when sample() is called. .. note:

    The batch-size can be specified at construction time via the
    ``batch_size`` argument, or at sampling time. The former should
    be preferred whenever the batch-size is consistent across the
    experiment. If the batch-size is likely to change, it can be
    passed to the :meth:`~.sample` method. This option is
    incompatible with prefetching (since this requires to know the
    batch-size in advance) as well as with samplers that have a
    ``drop_last`` argument.
    

  • priority_key (str, optional) – the key at which priority is assumed to be stored within TensorDicts added to this ReplayBuffer. This is to be used when the sampler is of type PrioritizedSampler. Defaults to "td_error".

  • reduction (str, optional) – the reduction method for multidimensional tensordicts (ie stored trajectories). Can be one of “max”, “min”, “median” or “mean”.

  • dim_extend (int, optional) –

    indicates the dim to consider for extension when calling extend(). Defaults to storage.ndim-1. When using dim_extend > 0, we recommend using the ndim argument in the storage instantiation if that argument is available, to let storages know that the data is multi-dimensional and keep consistent notions of storage-capacity and batch-size during sampling.

    Note

    This argument has no effect on add() and therefore should be used with caution when both add() and extend() are used in a codebase. For example:

    >>> data = torch.zeros(3, 4)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(10, ndim=2),
    ...     dim_extend=1)
    >>> # these two approaches are equivalent:
    >>> for d in data.unbind(1):
    ...     rb.add(d)
    >>> rb.extend(data)
    

  • generator (torch.Generator, optional) –

    a generator to use for sampling. Using a dedicated generator for the replay buffer can allow a fine-grained control over seeding, for instance keeping the global seed different but the RB seed identical for distributed jobs. Defaults to None (global default generator).

    Warning

    As of now, the generator has no effect on the transforms.

  • shared (bool, optional) – whether the buffer will be shared using multiprocessing or not. Defaults to False.

Examples

>>> import torch
>>>
>>> from torchrl.data import LazyTensorStorage, TensorDictPrioritizedReplayBuffer
>>> from tensordict import TensorDict
>>>
>>> torch.manual_seed(0)
>>>
>>> rb = TensorDictPrioritizedReplayBuffer(alpha=0.7, beta=1.1, storage=LazyTensorStorage(10), batch_size=5)
>>> data = TensorDict({"a": torch.ones(10, 3), ("b", "c"): torch.zeros(10, 3, 1)}, [10])
>>> rb.extend(data)
>>> print("len of rb", len(rb))
len of rb 10
>>> sample = rb.sample(5)
>>> print(sample)
TensorDict(
    fields={
        _weight: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False),
        a: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([5, 3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([5]),
            device=cpu,
            is_shared=False),
        index: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.int64, is_shared=False)},
    batch_size=torch.Size([5]),
    device=cpu,
    is_shared=False)
>>> print("index", sample["index"])
index tensor([9, 5, 2, 2, 7])
>>> # give a high priority to these samples...
>>> sample.set("td_error", 100*torch.ones(sample.shape))
>>> # and update priority
>>> rb.update_tensordict_priority(sample)
>>> # the new sample should have a high overlap with the previous one
>>> sample = rb.sample(5)
>>> print(sample)
TensorDict(
    fields={
        _weight: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False),
        a: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([5, 3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([5]),
            device=cpu,
            is_shared=False),
        index: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.int64, is_shared=False)},
    batch_size=torch.Size([5]),
    device=cpu,
    is_shared=False)
>>> print("index", sample["index"])
index tensor([2, 5, 5, 9, 7])
add(data: TensorDictBase) int

Add a single element to the replay buffer.

Parameters:

data (Any) – data to be added to the replay buffer

Returns:

index where the data lives in the replay buffer.

append_transform(transform: Transform, *, invert: bool = False) ReplayBuffer

Appends transform at the end.

Transforms are applied in order when sample is called.

Parameters:

transform (Transform) – The transform to be appended

Keyword Arguments:

invert (bool, optional) – if True, the transform will be inverted (forward calls will be called during writing and inverse calls during reading). Defaults to False.

Example

>>> rb = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=4)
>>> data = TensorDict({"a": torch.zeros(10)}, [10])
>>> def t(data):
...     data += 1
...     return data
>>> rb.append_transform(t, invert=True)
>>> rb.extend(data)
>>> assert (data == 1).all()
dump(*args, **kwargs)

Alias for dumps().

dumps(path)

Saves the replay buffer on disk at the specified path.

Parameters:

path (Path or str) – path where to save the replay buffer.

Examples

>>> import tempfile
>>> import tqdm
>>> from torchrl.data import LazyMemmapStorage, TensorDictReplayBuffer
>>> from torchrl.data.replay_buffers.samplers import PrioritizedSampler, RandomSampler
>>> import torch
>>> from tensordict import TensorDict
>>> # Build and populate the replay buffer
>>> S = 1_000_000
>>> sampler = PrioritizedSampler(S, 1.1, 1.0)
>>> # sampler = RandomSampler()
>>> storage = LazyMemmapStorage(S)
>>> rb = TensorDictReplayBuffer(storage=storage, sampler=sampler)
>>>
>>> for _ in tqdm.tqdm(range(100)):
...     td = TensorDict({"obs": torch.randn(100, 3, 4), "next": {"obs": torch.randn(100, 3, 4)}, "td_error": torch.rand(100)}, [100])
...     rb.extend(td)
...     sample = rb.sample(32)
...     rb.update_tensordict_priority(sample)
>>> # save and load the buffer
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     rb.dumps(tmpdir)
...
...     sampler = PrioritizedSampler(S, 1.1, 1.0)
...     # sampler = RandomSampler()
...     storage = LazyMemmapStorage(S)
...     rb_load = TensorDictReplayBuffer(storage=storage, sampler=sampler)
...     rb_load.loads(tmpdir)
...     assert len(rb) == len(rb_load)
empty()

Empties the replay buffer and reset cursor to 0.

extend(tensordicts: TensorDictBase) Tensor

Extends the replay buffer with one or more elements contained in an iterable.

If present, the inverse transforms will be called.`

Parameters:

data (iterable) – collection of data to be added to the replay buffer.

Returns:

Indices of the data added to the replay buffer.

Warning

extend() can have an ambiguous signature when dealing with lists of values, which should be interpreted either as PyTree (in which case all elements in the list will be put in a slice in the stored PyTree in the storage) or a list of values to add one at a time. To solve this, TorchRL makes the clear-cut distinction between list and tuple: a tuple will be viewed as a PyTree, a list (at the root level) will be interpreted as a stack of values to add one at a time to the buffer. For ListStorage instances, only unbound elements can be provided (no PyTrees).

insert_transform(index: int, transform: Transform, *, invert: bool = False) ReplayBuffer

Inserts transform.

Transforms are executed in order when sample is called.

Parameters:
  • index (int) – Position to insert the transform.

  • transform (Transform) – The transform to be appended

Keyword Arguments:

invert (bool, optional) – if True, the transform will be inverted (forward calls will be called during writing and inverse calls during reading). Defaults to False.

load(*args, **kwargs)

Alias for loads().

loads(path)

Loads a replay buffer state at the given path.

The buffer should have matching components and be saved using dumps().

Parameters:

path (Path or str) – path where the replay buffer was saved.

See dumps() for more info.

register_load_hook(hook: Callable[[Any], Any])

Registers a load hook for the storage.

Note

Hooks are currently not serialized when saving a replay buffer: they must be manually re-initialized every time the buffer is created.

register_save_hook(hook: Callable[[Any], Any])

Registers a save hook for the storage.

Note

Hooks are currently not serialized when saving a replay buffer: they must be manually re-initialized every time the buffer is created.

sample(batch_size: int | None = None, return_info: bool = False, include_info: bool = None) TensorDictBase

Samples a batch of data from the replay buffer.

Uses Sampler to sample indices, and retrieves them from Storage.

Parameters:
  • batch_size (int, optional) – size of data to be collected. If none is provided, this method will sample a batch-size as indicated by the sampler.

  • return_info (bool) – whether to return info. If True, the result is a tuple (data, info). If False, the result is the data.

Returns:

A tensordict containing a batch of data selected in the replay buffer. A tuple containing this tensordict and info if return_info flag is set to True.

property sampler

The sampler of the replay buffer.

The sampler must be an instance of Sampler.

save(*args, **kwargs)

Alias for dumps().

set_sampler(sampler: Sampler)

Sets a new sampler in the replay buffer and returns the previous sampler.

set_storage(storage: Storage, collate_fn: Callable | None = None)

Sets a new storage in the replay buffer and returns the previous storage.

Parameters:
  • storage (Storage) – the new storage for the buffer.

  • collate_fn (callable, optional) – if provided, the collate_fn is set to this value. Otherwise it is reset to a default value.

set_writer(writer: Writer)

Sets a new writer in the replay buffer and returns the previous writer.

property storage

The storage of the replay buffer.

The storage must be an instance of Storage.

property write_count

The total number of items written so far in the buffer through add and extend.

property writer

The writer of the replay buffer.

The writer must be an instance of Writer.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources