
Source code for torchrl.collectors.utils

# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations

from typing import Callable

import torch

from tensordict import NestedKey, pad, set_lazy_legacy, TensorDictBase

def _stack_output(fun) -> Callable:
    def stacked_output_fun(*args, **kwargs):
        out = fun(*args, **kwargs)
        return tuple(torch.stack(_o, 0) for _o in out)

    return stacked_output_fun

def _stack_output_zip(fun) -> Callable:
    def stacked_output_fun(*args, **kwargs):
        out = fun(*args, **kwargs)
        return tuple(torch.stack(_o, 0) for _o in zip(*out))

    return stacked_output_fun

[docs]@set_lazy_legacy(False) def split_trajectories( rollout_tensordict: TensorDictBase, *, prefix=None, trajectory_key: NestedKey | None = None, done_key: NestedKey | None = None, ) -> TensorDictBase: """A util function for trajectory separation. Takes a tensordict with a key traj_ids that indicates the id of each trajectory. From there, builds a B x T x ... zero-padded tensordict with B batches on max duration T Args: rollout_tensordict (TensorDictBase): a rollout with adjacent trajectories along the last dimension. prefix (NestedKey, optional): the prefix used to read and write meta-data, such as ``"traj_ids"`` (the optional integer id of each trajectory) and the ``"mask"`` entry indicating which data are valid and which aren't. Defaults to ``"collector"`` if the input has a ``"collector"`` entry, ``()`` (no prefix) otherwise. ``prefix`` is kept as a legacy feature and will be deprecated eventually. Prefer ``trajectory_key`` or ``done_key`` whenever possible. trajectory_key (NestedKey, optional): the key pointing to the trajectory ids. Supersedes ``done_key`` and ``prefix``. If not provided, defaults to ``(prefix, "traj_ids")``. done_key (NestedKey, optional): the key pointing to the ``"done""`` signal, if the trajectory could not be directly recovered. Defaults to ``"done"``. Returns: A new tensordict with a leading dimension corresponding to the trajectory. A ``"mask"`` boolean entry sharing the ``trajectory_key`` prefix and the tensordict shape is also added. It indicated the valid elements of the tensordict, as well as a ``"traj_ids"`` entry if ``trajectory_key`` could not be found. Examples: >>> from tensordict import TensorDict >>> import torch >>> from torchrl.collectors.utils import split_trajectories >>> obs =[torch.arange(10), torch.arange(5)]) >>> obs_ =[torch.arange(1, 11), torch.arange(1, 6)]) >>> done = torch.zeros(15, dtype=torch.bool) >>> done[9] = True >>> trajectory_id =[torch.zeros(10, dtype=torch.int32), ... torch.ones(5, dtype=torch.int32)]) >>> data = TensorDict({"obs": obs, ("next", "obs"): obs_, ("next", "done"): done, "trajectory": trajectory_id}, batch_size=[15]) >>> data_split = split_trajectories(data, done_key="done") >>> print(data_split) TensorDict( fields={ mask: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.bool, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.bool, is_shared=False), obs: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.int64, is_shared=False)}, batch_size=torch.Size([2, 10]), device=None, is_shared=False), obs: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.int64, is_shared=False), traj_ids: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.int64, is_shared=False), trajectory: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.int32, is_shared=False)}, batch_size=torch.Size([2, 10]), device=None, is_shared=False) >>> # check that split_trajectory got the trajectories right with the done signal >>> assert (data_split["traj_ids"] == data_split["trajectory"]).all() >>> print(data_split["mask"]) tensor([[ True, True, True, True, True, True, True, True, True, True], [ True, True, True, True, True, False, False, False, False, False]]) >>> data_split = split_trajectories(data, trajectory_key="trajectory") >>> print(data_split) TensorDict( fields={ mask: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.bool, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.bool, is_shared=False), obs: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.int64, is_shared=False)}, batch_size=torch.Size([2, 10]), device=None, is_shared=False), obs: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.int64, is_shared=False), trajectory: Tensor(shape=torch.Size([2, 10]), device=cpu, dtype=torch.int32, is_shared=False)}, batch_size=torch.Size([2, 10]), device=None, is_shared=False) """ mask_key = None if trajectory_key is not None: from torchrl.envs.utils import _replace_last traj_ids_key = trajectory_key mask_key = _replace_last(trajectory_key, "mask") else: if prefix is None and "collector" in rollout_tensordict.keys(): prefix = "collector" if prefix is None: traj_ids_key = "traj_ids" mask_key = "mask" else: traj_ids_key = (prefix, "traj_ids") mask_key = (prefix, "mask") rollout_tensordict = rollout_tensordict.copy() traj_ids = rollout_tensordict.get(traj_ids_key, None) if traj_ids is None: if done_key is None: done_key = "done" done_key = ("next", done_key) done = rollout_tensordict.get(done_key) idx = (slice(None),) * (rollout_tensordict.ndim - 1) + (slice(None, -1),) done_sel = done[idx] pads = [1, 0] pads = [0, 0] * (done.ndim - rollout_tensordict.ndim) + pads done_sel = torch.nn.functional.pad(done_sel, pads) if done_sel.shape != done.shape: raise RuntimeError( f"done and done_sel have different shape {done.shape} - {done_sel.shape} " ) traj_ids = done_sel.cumsum(rollout_tensordict.ndim - 1) traj_ids = traj_ids.squeeze(-1) if rollout_tensordict.ndim > 1: for i in range(1, rollout_tensordict.shape[0]): traj_ids[i] += traj_ids[i - 1].max() + 1 rollout_tensordict.set(traj_ids_key, traj_ids) splits = traj_ids.reshape(-1) splits = [(splits == i).sum().item() for i in splits.unique_consecutive()] # if all splits are identical then we can skip this function if len(set(splits)) == 1 and splits[0] == traj_ids.shape[-1]: rollout_tensordict.set( mask_key, torch.ones( rollout_tensordict.shape, device=rollout_tensordict.device, dtype=torch.bool, ), ) if rollout_tensordict.ndimension() == 1: rollout_tensordict = rollout_tensordict.unsqueeze(0) return rollout_tensordict out_splits = rollout_tensordict.reshape(-1).split(splits, 0) for out_split in out_splits: out_split.set( mask_key, torch.ones( out_split.shape, dtype=torch.bool, device=out_split.device, ), ) if len(out_splits) > 1: MAX = max(*[out_split.shape[0] for out_split in out_splits]) else: MAX = out_splits[0].shape[0] td = torch.stack( [pad(out_split, [0, MAX - out_split.shape[0]]) for out_split in out_splits], 0 ) return td


Access comprehensive developer documentation for PyTorch

View Docs


Get in-depth tutorials for beginners and advanced developers

View Tutorials


Find development resources and get your questions answered

View Resources