Shortcuts

Source code for torchrl.modules.tensordict_module.common

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from __future__ import annotations

import importlib.util
import inspect
import re
import warnings
from typing import Iterable, List, Optional, Type, Union

import torch

from tensordict import TensorDictBase, unravel_key_list

from tensordict.nn import dispatch, TensorDictModule, TensorDictModuleBase
from tensordict.utils import NestedKey

from torch import nn
from torch.nn import functional as F

from torchrl.data.tensor_specs import Composite, TensorSpec

from torchrl.data.utils import DEVICE_TYPING

_has_functorch = importlib.util.find_spec("functorch") is not None
if _has_functorch:
    from functorch import FunctionalModule, FunctionalModuleWithBuffers
else:
    warnings.warn(
        "failed to import functorch. TorchRL's features that do not require "
        "functional programming should work, but functionality and performance "
        "may be affected. Consider installing functorch and/or upgrating pytorch."
    )

    class FunctionalModule:  # noqa: D101
        pass

    class FunctionalModuleWithBuffers:  # noqa: D101
        pass


def _check_all_str(list_of_str, first_level=True):
    if isinstance(list_of_str, str) and first_level:
        raise RuntimeError(
            f"Expected a list of strings but got a string: {list_of_str}"
        )
    elif not isinstance(list_of_str, str):
        try:
            return [_check_all_str(item, False) for item in list_of_str]
        except Exception as err:
            raise TypeError(
                f"Expected a list of strings but got: {list_of_str}."
            ) from err


def _forward_hook_safe_action(module, tensordict_in, tensordict_out):
    try:
        spec = module.spec
        if len(module.out_keys) > 1 and not isinstance(spec, Composite):
            raise RuntimeError(
                "safe TensorDictModules with multiple out_keys require a Composite with matching keys. Got "
                f"keys {module.out_keys}."
            )
        elif not isinstance(spec, Composite):
            out_key = module.out_keys[0]
            keys = [out_key]
            values = [spec]
        else:
            keys = list(spec.keys(True, True))
            values = [spec[key] for key in keys]
        for _spec, _key in zip(values, keys):
            if _spec is None:
                continue
            item = tensordict_out.get(_key, None)
            if item is None:
                # this will happen when an exploration (e.g. OU) writes a key only
                # during exploration, but is missing otherwise.
                # it's fine since what we want here it to make sure that a key
                # is within bounds if it is present
                continue
            if not _spec.is_in(item):
                try:
                    tensordict_out.set_(
                        _key,
                        _spec.project(tensordict_out.get(_key)),
                    )
                except RuntimeError:
                    tensordict_out.set(
                        _key,
                        _spec.project(tensordict_out.get(_key)),
                    )
    except RuntimeError as err:
        if re.search(
            "attempting to use a Tensor in some data-dependent control flow", str(err)
        ):
            # "_is_stateless" in module.__dict__ and module._is_stateless:
            raise RuntimeError(
                "vmap cannot be used with safe=True, consider turning the safe mode off."
            ) from err
        else:
            raise err


[docs]class SafeModule(TensorDictModule): """:class:`tensordict.nn.TensorDictModule` subclass that accepts a :class:`~torchrl.data.TensorSpec` as argument to control the output domain. Args: module (nn.Module): a nn.Module used to map the input to the output parameter space. Can be a functional module (FunctionalModule or FunctionalModuleWithBuffers), in which case the :obj:`forward` method will expect the params (and possibly) buffers keyword arguments. in_keys (iterable of str): keys to be read from input tensordict and passed to the module. If it contains more than one element, the values will be passed in the order given by the in_keys iterable. out_keys (iterable of str): keys to be written to the input tensordict. The length of out_keys must match the number of tensors returned by the embedded module. Using "_" as a key avoid writing tensor to output. spec (TensorSpec, optional): specs of the output tensor. If the module outputs multiple output tensors, spec characterize the space of the first output tensor. safe (bool): if ``True``, the value of the output is checked against the input spec. Out-of-domain sampling can occur because of exploration policies or numerical under/overflow issues. If this value is out of bounds, it is projected back onto the desired space using the :obj:`TensorSpec.project` method. Default is ``False``. Embedding a neural network in a TensorDictModule only requires to specify the input and output keys. The domain spec can be passed along if needed. TensorDictModule support functional and regular :obj:`nn.Module` objects. In the functional case, the 'params' (and 'buffers') keyword argument must be specified: Examples: >>> import torch >>> from tensordict import TensorDict >>> from torchrl.data import Unbounded >>> from torchrl.modules import TensorDictModule >>> td = TensorDict({"input": torch.randn(3, 4), "hidden": torch.randn(3, 8)}, [3,]) >>> spec = Unbounded(8) >>> module = torch.nn.GRUCell(4, 8) >>> td_fmodule = TensorDictModule( ... module=module, ... spec=spec, ... in_keys=["input", "hidden"], ... out_keys=["output"], ... ) >>> params = TensorDict.from_module(td_fmodule) >>> with params.to_module(td_module): ... td_functional = td_fmodule(td.clone()) >>> print(td_functional) TensorDict( fields={ hidden: Tensor(torch.Size([3, 8]), dtype=torch.float32), input: Tensor(torch.Size([3, 4]), dtype=torch.float32), output: Tensor(torch.Size([3, 8]), dtype=torch.float32)}, batch_size=torch.Size([3]), device=None, is_shared=False) In the stateful case: >>> td_module = TensorDictModule( ... module=torch.nn.GRUCell(4, 8), ... spec=spec, ... in_keys=["input", "hidden"], ... out_keys=["output"], ... ) >>> td_stateful = td_module(td.clone()) >>> print(td_stateful) TensorDict( fields={ hidden: Tensor(torch.Size([3, 8]), dtype=torch.float32), input: Tensor(torch.Size([3, 4]), dtype=torch.float32), output: Tensor(torch.Size([3, 8]), dtype=torch.float32)}, batch_size=torch.Size([3]), device=None, is_shared=False) One can use a vmap operator to call the functional module. In this case the tensordict is expanded to match the batch size (i.e. the tensordict isn't modified in-place anymore): >>> # Model ensemble using vmap >>> from torch import vmap >>> params_repeat = params.expand(4, *params.shape) >>> td_vmap = vmap(td_fmodule, (None, 0))(td.clone(), params_repeat) >>> print(td_vmap) TensorDict( fields={ hidden: Tensor(torch.Size([4, 3, 8]), dtype=torch.float32), input: Tensor(torch.Size([4, 3, 4]), dtype=torch.float32), output: Tensor(torch.Size([4, 3, 8]), dtype=torch.float32)}, batch_size=torch.Size([4, 3]), device=None, is_shared=False) """ def __init__( self, module: Union[ FunctionalModule, FunctionalModuleWithBuffers, TensorDictModule, nn.Module ], in_keys: Iterable[str], out_keys: Iterable[str], spec: Optional[TensorSpec] = None, safe: bool = False, ): super().__init__(module, in_keys, out_keys) self.register_spec(safe=safe, spec=spec) def register_spec(self, safe, spec): if spec is not None: spec = spec.clone() if spec is not None and not isinstance(spec, TensorSpec): raise TypeError("spec must be a TensorSpec subclass") elif spec is not None and not isinstance(spec, Composite): if len(self.out_keys) > 1: raise RuntimeError( f"got more than one out_key for the TensorDictModule: {self.out_keys},\nbut only one spec. " "Consider using a Composite object or no spec at all." ) spec = Composite({self.out_keys[0]: spec}) elif spec is not None and isinstance(spec, Composite): if "_" in spec.keys() and spec["_"] is not None: warnings.warn('got a spec with key "_": it will be ignored') elif spec is None: spec = Composite() # unravel_key_list(self.out_keys) can be removed once 473 is merged in tensordict spec_keys = set(unravel_key_list(list(spec.keys(True, True)))) out_keys = set(unravel_key_list(self.out_keys)) if spec_keys != out_keys: # then assume that all the non indicated specs are None for key in out_keys: if key not in spec_keys: spec[key] = None spec_keys = set(unravel_key_list(list(spec.keys(True, True)))) if spec_keys != out_keys: raise RuntimeError( f"spec keys and out_keys do not match, got: {spec_keys} and {out_keys} respectively" ) self._spec = spec self.safe = safe if safe: if spec is None or ( isinstance(spec, Composite) and all(_spec is None for _spec in spec.values()) ): raise RuntimeError( "`TensorDictModule(spec=None, safe=True)` is not a valid configuration as the tensor " "specs are not specified" ) self.register_forward_hook(_forward_hook_safe_action) @property def spec(self) -> Composite: return self._spec @spec.setter def spec(self, spec: Composite) -> None: if not isinstance(spec, Composite): raise RuntimeError( f"Trying to set an object of type {type(spec)} as a tensorspec but expected a Composite instance." ) self._spec = spec
[docs] def random(self, tensordict: TensorDictBase) -> TensorDictBase: """Samples a random element in the target space, irrespective of any input. If multiple output keys are present, only the first will be written in the input :obj:`tensordict`. Args: tensordict (TensorDictBase): tensordict where the output value should be written. Returns: the original tensordict with a new/updated value for the output key. """ key0 = self.out_keys[0] tensordict.set(key0, self.spec.rand(tensordict.batch_size)) return tensordict
[docs] def random_sample(self, tensordict: TensorDictBase) -> TensorDictBase: """See :obj:`TensorDictModule.random(...)`.""" return self.random(tensordict)
[docs] def to(self, dest: Union[torch.dtype, DEVICE_TYPING]) -> TensorDictModule: if hasattr(self, "spec") and self.spec is not None: self.spec = self.spec.to(dest) out = super().to(dest) return out
def is_tensordict_compatible(module: Union[TensorDictModule, nn.Module]): """Returns `True` if a module can be used as a TensorDictModule, and False if it can't. If the signature is misleading an error is raised. Examples: >>> module = nn.Linear(3, 4) >>> is_tensordict_compatible(module) False >>> class CustomModule(nn.Module): ... def __init__(self, module): ... super().__init__() ... self.linear = module ... self.in_keys = ["x"] ... self.out_keys = ["y"] ... def forward(self, tensordict): ... tensordict["y"] = self.linear(tensordict["x"]) ... return tensordict >>> tensordict_module = CustomModule(module) >>> is_tensordict_compatible(tensordict_module) True >>> class CustomModule(nn.Module): ... def __init__(self, module): ... super().__init__() ... self.linear = module ... self.in_keys = ["x"] ... self.out_keys = ["y"] ... def forward(self, tensordict, other_key): ... tensordict["y"] = self.linear(tensordict["x"]) ... return tensordict >>> tensordict_module = CustomModule(module) >>> try: ... is_tensordict_compatible(tensordict_module) ... except TypeError: ... print("passing") passing """ sig = inspect.signature(module.forward) if isinstance(module, TensorDictModule) or ( len(sig.parameters) == 1 and hasattr(module, "in_keys") and hasattr(module, "out_keys") ): # if the module is a TensorDictModule or takes a single argument and defines # in_keys and out_keys then we assume it can already deal with TensorDict input # to forward and we return True return True elif not hasattr(module, "in_keys") and not hasattr(module, "out_keys"): # if it's not a TensorDictModule, and in_keys and out_keys are not defined then # we assume no TensorDict compatibility and will try to wrap it. return False # if in_keys or out_keys were defined but module is not a TensorDictModule or # accepts multiple arguments then it's likely the user is trying to do something # that will have undetermined behavior, we raise an error raise TypeError( "Received a module that defines in_keys or out_keys and also expects multiple " "arguments to module.forward. If the module is compatible with TensorDict, it " "should take a single argument of type TensorDict to module.forward and define " "both in_keys and out_keys. Alternatively, module.forward can accept " "arbitrarily many tensor inputs and leave in_keys and out_keys undefined and " "TorchRL will attempt to automatically wrap the module with a TensorDictModule." ) def ensure_tensordict_compatible( module: Union[ FunctionalModule, FunctionalModuleWithBuffers, TensorDictModule, nn.Module ], in_keys: Optional[List[NestedKey]] = None, out_keys: Optional[List[NestedKey]] = None, safe: bool = False, wrapper_type: Optional[Type] = TensorDictModule, **kwargs, ): """Ensures module is compatible with TensorDictModule and, if not, it wraps it.""" in_keys = unravel_key_list(in_keys) if in_keys else in_keys out_keys = unravel_key_list(out_keys) if out_keys else out_keys """Checks and ensures an object with forward method is TensorDict compatible.""" if is_tensordict_compatible(module): if in_keys is not None and set(in_keys) != set(module.in_keys): raise TypeError( f"Arguments to module.forward ({set(module.in_keys)}) doesn't match " f"with the expected TensorDict in_keys ({set(in_keys)})." ) if out_keys is not None and set(module.out_keys) != set(out_keys): raise TypeError( f"Outputs of module.forward ({set(module.out_keys)}) doesn't match " f"with the expected TensorDict out_keys ({set(out_keys)})." ) # return module itself if it's already tensordict compatible return module if not isinstance(module, nn.Module): raise TypeError( "Argument to ensure_tensordict_compatible should be either " "a TensorDictModule or an nn.Module" ) sig = inspect.signature(module.forward) if in_keys is not None and set(sig.parameters) != set(in_keys): raise TypeError( "Arguments to module.forward are incompatible with entries in " "env.observation_spec. If you want TorchRL to automatically " "wrap your module with a TensorDictModule then the arguments " "to module must correspond one-to-one with entries in " "in_keys. For more complex behavior and more control you can " "consider writing your own TensorDictModule." ) # TODO: Check whether out_keys match (at least in number) if they are provided. if in_keys is not None: kwargs["in_keys"] = in_keys if out_keys is not None: kwargs["out_keys"] = out_keys return wrapper_type(module, **kwargs)
[docs]class VmapModule(TensorDictModuleBase): """A TensorDictModule wrapper to vmap over the input. It is intended to be used with modules that accept data with one less batch dimension than the one provided. By using this wrapper, one can hide a batch dimension and satisfy the wrapped module. Args: module (TensorDictModuleBase): the module to vmap over. vmap_dim (int, optional): the vmap input and output dim. If none is provided, the last dimension of the tensordict is assumed. .. note:: Since vmap requires to have control over the batch size of the input this module does not support dispatched arguments Example: >>> lam = TensorDictModule(lambda x: x[0], in_keys=["x"], out_keys=["y"]) >>> sample_in = torch.ones((10,3,2)) >>> sample_in_td = TensorDict({"x":sample_in}, batch_size=[10]) >>> lam(sample_in) >>> vm = VmapModule(lam, 0) >>> vm(sample_in_td) >>> assert (sample_in_td["x"][:, 0] == sample_in_td["y"]).all() """ def __init__(self, module: TensorDictModuleBase, vmap_dim=None): if not _has_functorch: raise ImportError("VmapModule requires torch>=2.0.") super().__init__() self.in_keys = module.in_keys self.out_keys = module.out_keys self.module = module self.vmap_dim = vmap_dim if torch.__version__ >= "2.0": self._vmap = torch.vmap else: import functorch self._vmap = functorch.vmap
[docs] def forward(self, tensordict): # TODO: there is a risk of segfault if input is not a tensordict. # We should investigate (possibly prevent it c++ side?) vmap_dim = self.vmap_dim if vmap_dim is None: ndim = tensordict.ndim vmap_dim = ndim - 1 td = self._vmap(self.module, (vmap_dim,), (vmap_dim,))(tensordict) return tensordict.update(td)
[docs]class DistributionalDQNnet(TensorDictModuleBase): """Distributional Deep Q-Network softmax layer. This layer should be used in between a regular model that predicts the action values and a distribution which acts on logits values. Args: in_keys (list of str or tuples of str): input keys to the log-softmax operation. Defaults to ``["action_value"]``. out_keys (list of str or tuples of str): output keys to the log-softmax operation. Defaults to ``["action_value"]``. Examples: >>> import torch >>> from tensordict import TensorDict >>> net = DistributionalDQNnet() >>> td = TensorDict({"action_value": torch.randn(10, 5)}, batch_size=[10]) >>> net(td) TensorDict( fields={ action_value: Tensor(shape=torch.Size([10, 5]), device=cpu, dtype=torch.float32, is_shared=False)}, batch_size=torch.Size([10]), device=None, is_shared=False) """ _wrong_out_feature_dims_error = ( "DistributionalDQNnet requires dqn output to be at least " "2-dimensional, with dimensions *Batch x #Atoms x #Actions. Got {0} " "instead." ) def __init__(self, *, in_keys=None, out_keys=None): super().__init__() if in_keys is None: in_keys = ["action_value"] if out_keys is None: out_keys = ["action_value"] self.in_keys = in_keys self.out_keys = out_keys
[docs] @dispatch(auto_batch_size=False) def forward(self, tensordict): for in_key, out_key in zip(self.in_keys, self.out_keys): q_values = tensordict.get(in_key) if q_values.ndimension() < 2: raise RuntimeError( self._wrong_out_feature_dims_error.format(q_values.shape) ) tensordict.set(out_key, F.log_softmax(q_values, dim=-2)) return tensordict

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources