Source code for torchrl.modules.models.exploration
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
import warnings
from typing import Optional, Sequence, Union
import torch
from torch import distributions as d, nn
from torch.nn.modules.lazy import LazyModuleMixin
from torch.nn.parameter import UninitializedBuffer, UninitializedParameter
from torchrl._utils import prod
from torchrl.data.utils import DEVICE_TYPING, DEVICE_TYPING_ARGS
from torchrl.envs.utils import exploration_type, ExplorationType
from torchrl.modules.distributions.utils import _cast_transform_device
from torchrl.modules.utils import inv_softplus
[docs]class NoisyLinear(nn.Linear):
"""Noisy Linear Layer.
Presented in "Noisy Networks for Exploration", https://arxiv.org/abs/1706.10295v3
A Noisy Linear Layer is a linear layer with parametric noise added to the weights. This induced stochasticity can
be used in RL networks for the agent's policy to aid efficient exploration. The parameters of the noise are learned
with gradient descent along with any other remaining network weights. Factorized Gaussian
noise is the type of noise usually employed.
Args:
in_features (int): input features dimension
out_features (int): out features dimension
bias (bool, optional): if ``True``, a bias term will be added to the matrix multiplication: Ax + b.
Defaults to ``True``
device (DEVICE_TYPING, optional): device of the layer.
Defaults to ``"cpu"``
dtype (torch.dtype, optional): dtype of the parameters.
Defaults to ``None`` (default pytorch dtype)
std_init (scalar, optional): initial value of the Gaussian standard deviation before optimization.
Defaults to ``0.1``
"""
def __init__(
self,
in_features: int,
out_features: int,
bias: bool = True,
device: Optional[DEVICE_TYPING] = None,
dtype: Optional[torch.dtype] = None,
std_init: float = 0.1,
):
nn.Module.__init__(self)
self.in_features = int(in_features)
self.out_features = int(out_features)
self.std_init = std_init
self.weight_mu = nn.Parameter(
torch.empty(
out_features,
in_features,
device=device,
dtype=dtype,
requires_grad=True,
)
)
self.weight_sigma = nn.Parameter(
torch.empty(
out_features,
in_features,
device=device,
dtype=dtype,
requires_grad=True,
)
)
self.register_buffer(
"weight_epsilon",
torch.empty(out_features, in_features, device=device, dtype=dtype),
)
if bias:
self.bias_mu = nn.Parameter(
torch.empty(
out_features,
device=device,
dtype=dtype,
requires_grad=True,
)
)
self.bias_sigma = nn.Parameter(
torch.empty(
out_features,
device=device,
dtype=dtype,
requires_grad=True,
)
)
self.register_buffer(
"bias_epsilon",
torch.empty(out_features, device=device, dtype=dtype),
)
else:
self.bias_mu = None
self.reset_parameters()
self.reset_noise()
def reset_parameters(self) -> None:
mu_range = 1 / math.sqrt(self.in_features)
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.std_init / math.sqrt(self.in_features))
if self.bias_mu is not None:
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.std_init / math.sqrt(self.out_features))
def reset_noise(self) -> None:
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
self.weight_epsilon.copy_(epsilon_out.outer(epsilon_in))
if self.bias_mu is not None:
self.bias_epsilon.copy_(epsilon_out)
def _scale_noise(self, size: Union[int, torch.Size, Sequence]) -> torch.Tensor:
if isinstance(size, int):
size = (size,)
x = torch.randn(*size, device=self.weight_mu.device)
return x.sign().mul_(x.abs().sqrt_())
@property
def weight(self) -> torch.Tensor:
if self.training:
return self.weight_mu + self.weight_sigma * self.weight_epsilon
else:
return self.weight_mu
@property
def bias(self) -> Optional[torch.Tensor]:
if self.bias_mu is not None:
if self.training:
return self.bias_mu + self.bias_sigma * self.bias_epsilon
else:
return self.bias_mu
else:
return None
[docs]class NoisyLazyLinear(LazyModuleMixin, NoisyLinear):
"""Noisy Lazy Linear Layer.
This class makes the Noisy Linear layer lazy, in that the in_feature argument does not need to be passed at
initialization (but is inferred after the first call to the layer).
For more context on noisy layers, see the NoisyLinear class.
Args:
out_features (int): out features dimension
bias (bool, optional): if ``True``, a bias term will be added to the matrix multiplication: Ax + b.
Defaults to ``True``.
device (DEVICE_TYPING, optional): device of the layer.
Defaults to ``"cpu"``.
dtype (torch.dtype, optional): dtype of the parameters.
Defaults to the default PyTorch dtype.
std_init (scalar): initial value of the Gaussian standard deviation before optimization.
Defaults to 0.1
"""
def __init__(
self,
out_features: int,
bias: bool = True,
device: Optional[DEVICE_TYPING] = None,
dtype: Optional[torch.dtype] = None,
std_init: float = 0.1,
):
super().__init__(0, 0, False, device=device)
self.out_features = out_features
self.std_init = std_init
self.weight_mu = UninitializedParameter(device=device, dtype=dtype)
self.weight_sigma = UninitializedParameter(device=device, dtype=dtype)
self.register_buffer(
"weight_epsilon", UninitializedBuffer(device=device, dtype=dtype)
)
if bias:
self.bias_mu = UninitializedParameter(device=device, dtype=dtype)
self.bias_sigma = UninitializedParameter(device=device, dtype=dtype)
self.register_buffer(
"bias_epsilon", UninitializedBuffer(device=device, dtype=dtype)
)
else:
self.bias_mu = None
self.reset_parameters()
def reset_parameters(self) -> None:
if not self.has_uninitialized_params() and self.in_features != 0:
super().reset_parameters()
def reset_noise(self) -> None:
if not self.has_uninitialized_params() and self.in_features != 0:
super().reset_noise()
[docs] def initialize_parameters(self, input: torch.Tensor) -> None:
if self.has_uninitialized_params():
with torch.no_grad():
self.in_features = input.shape[-1]
self.weight_mu.materialize((self.out_features, self.in_features))
self.weight_sigma.materialize((self.out_features, self.in_features))
self.weight_epsilon.materialize((self.out_features, self.in_features))
if self.bias_mu is not None:
self.bias_mu.materialize((self.out_features,))
self.bias_sigma.materialize((self.out_features,))
self.bias_epsilon.materialize((self.out_features,))
self.reset_parameters()
self.reset_noise()
@property
def weight(self) -> torch.Tensor:
if not self.has_uninitialized_params() and self.in_features != 0:
return super().weight
@property
def bias(self) -> torch.Tensor:
if not self.has_uninitialized_params() and self.in_features != 0:
return super().bias
[docs]def reset_noise(layer: nn.Module) -> None:
"""Resets the noise of noisy layers."""
if hasattr(layer, "reset_noise"):
layer.reset_noise()
class gSDEModule(nn.Module):
"""A gSDE exploration module.
Presented in "Smooth Exploration for Robotic Reinforcement Learning" by Antonin Raffin, Jens Kober, Freek Stulp (https://arxiv.org/abs/2005.05719)
gSDEModule adds a state-dependent exploration noise to an input action.
It also outputs the mean, scale (standard deviation) of the normal
distribution, as well as the Gaussian noise used.
The noise input should be reset through a :obj:`torchrl.envs.transforms.gSDENoise`
instance: each time the environment is reset, the input noise will be set to
zero by the environment transform, indicating to gSDEModule that it has to be resampled.
This scheme allows us to have the environemt tell the module to resample a
noise only the latter knows the shape of.
A variable transform function can also be provided to map the noicy action
to the desired space (e.g. a SafeTanhTransform or similar).
Args:
policy_model (nn.Module): a model that reads observations and
outputs a distribution average.
action_dim (int): the dimension of the action.
state_dim (int): the state dimension.
sigma_init (float, optional): the initial value of the standard deviation. The
softplus non-linearity is used to map the log_sigma parameter to a
positive value. Defaults to ``1.0``.
scale_min (float, optional): min value of the scale. Defaults to ``0.01``.
scale_max (float, optional): max value of the scale. Defaults to ``10.0``.
learn_sigma (bool, optional): if ``True``, the value of the ``sigma``
variable will be included in the module parameters, making it learnable.
Defaults to ``True``.
transform (torch.distribution.Transform, optional): a transform to apply
to the sampled action. Defaults to ``None`` (no transform).
device (torch.device, optional): device to create the model on.
Defaults to ``"cpu"``.
Examples:
>>> from tensordict import TensorDict
>>> from torchrl.modules import ProbabilisticActor, TanhNormal
>>> from tensordict.nn import TensorDictModule, ProbabilisticTensorDictSequential
>>> batch, state_dim, action_dim = 3, 7, 5
>>> model = nn.Linear(state_dim, action_dim)
>>> deterministic_policy = TensorDictModule(model, in_keys=["obs"], out_keys=["action"])
>>> stochastic_part = TensorDictModule(
... gSDEModule(action_dim, state_dim),
... in_keys=["action", "obs", "_eps_gSDE"],
... out_keys=["loc", "scale", "action", "_eps_gSDE"])
>>> stochastic_part = ProbabilisticActor(stochastic_part,
... in_keys=["loc", "scale"],
... distribution_class=TanhNormal)
>>> stochastic_policy = ProbabilisticTensorDictSequential(deterministic_policy, *stochastic_part)
>>> tensordict = TensorDict({'obs': torch.randn(state_dim), '_epx_gSDE': torch.zeros(1)}, [])
>>> _ = stochastic_policy(tensordict)
>>> print(tensordict)
TensorDict(
fields={
_eps_gSDE: Tensor(shape=torch.Size([5, 7]), device=cpu, dtype=torch.float32, is_shared=False),
_epx_gSDE: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
action: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False),
loc: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False),
obs: Tensor(shape=torch.Size([7]), device=cpu, dtype=torch.float32, is_shared=False),
scale: Tensor(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
>>> action_first_call = tensordict.get("action").clone()
>>> dist = stochastic_policy.get_dist(tensordict)
>>> print(dist)
TanhNormal(loc: torch.Size([5]), scale: torch.Size([5]))
>>> _ = stochastic_policy(tensordict)
>>> action_second_call = tensordict.get("action").clone()
>>> assert (action_second_call == action_first_call).all() # actions are the same
>>> assert (action_first_call != dist.base_dist.base_dist.loc).all() # actions are truly stochastic
"""
def __init__(
self,
action_dim: int,
state_dim: int,
sigma_init: float = None,
scale_min: float = 0.01,
scale_max: float = 10.0,
learn_sigma: bool = True,
transform: Optional[d.Transform] = None,
device: Optional[DEVICE_TYPING] = None,
) -> None:
super().__init__()
self.action_dim = action_dim
self.state_dim = state_dim
self.scale_min = scale_min
self.scale_max = scale_max
self.transform = transform
self.learn_sigma = learn_sigma
if learn_sigma:
if sigma_init is None:
sigma_init = inv_softplus(math.sqrt((1.0 - scale_min) / state_dim))
self.register_parameter(
"log_sigma",
nn.Parameter(
torch.zeros(
(action_dim, state_dim), requires_grad=True, device=device
)
),
)
else:
if sigma_init is None:
sigma_init = math.sqrt((1.0 - scale_min) / state_dim)
self.register_buffer(
"_sigma",
torch.full((action_dim, state_dim), sigma_init, device=device),
)
if sigma_init != 0.0:
self.register_buffer(
"sigma_init", torch.as_tensor(sigma_init, device=device)
)
@property
def sigma(self):
if self.learn_sigma:
sigma = torch.nn.functional.softplus(self.log_sigma)
return sigma.clamp_min(self.scale_min)
else:
return self._sigma.clamp_min(self.scale_min)
def forward(self, mu, state, _eps_gSDE):
sigma = self.sigma.clamp_max(self.scale_max)
_err_explo = f"gSDE behaviour for exploration mode {exploration_type()} is not defined. Choose from 'random' or 'mode'."
if state.shape[:-1] != mu.shape[:-1]:
_err_msg = f"mu and state are expected to have matching batch size, got shapes {mu.shape} and {state.shape}"
raise RuntimeError(_err_msg)
if _eps_gSDE is not None and (
_eps_gSDE.shape[: state.ndimension() - 1] != state.shape[:-1]
):
_err_msg = f"noise and state are expected to have matching batch size, got shapes {_eps_gSDE.shape} and {state.shape}"
raise RuntimeError(_err_msg)
if _eps_gSDE is None and exploration_type() != ExplorationType.RANDOM:
# noise is irrelevant in with no exploration
_eps_gSDE = torch.zeros(
*state.shape[:-1], *sigma.shape, device=sigma.device, dtype=sigma.dtype
)
elif (_eps_gSDE is None and exploration_type() == ExplorationType.RANDOM) or (
_eps_gSDE is not None
and _eps_gSDE.numel() == prod(state.shape[:-1])
and (_eps_gSDE == 0).all()
):
_eps_gSDE = torch.randn(
*state.shape[:-1], *sigma.shape, device=sigma.device, dtype=sigma.dtype
)
elif _eps_gSDE is None:
raise RuntimeError(_err_explo)
gSDE_noise = sigma * _eps_gSDE
eps = (gSDE_noise @ state.unsqueeze(-1)).squeeze(-1)
if exploration_type() in (ExplorationType.RANDOM,):
action = mu + eps
elif exploration_type() in (
ExplorationType.MODE,
ExplorationType.MEAN,
ExplorationType.DETERMINISTIC,
):
action = mu
else:
raise RuntimeError(_err_explo)
sigma = (sigma * state.unsqueeze(-2)).pow(2).sum(-1).clamp_min(1e-5).sqrt()
if not torch.isfinite(sigma).all():
warnings.warn("inf sigma")
if self.transform is not None:
action = self.transform(action)
return mu, sigma, action, _eps_gSDE
def to(self, device_or_dtype: Union[torch.dtype, DEVICE_TYPING]):
if isinstance(device_or_dtype, DEVICE_TYPING_ARGS):
self.transform = _cast_transform_device(self.transform, device_or_dtype)
return super().to(device_or_dtype)
class LazygSDEModule(LazyModuleMixin, gSDEModule):
"""Lazy gSDE Module.
This module behaves exactly as gSDEModule except that it does not require the
user to specify the action and state dimension.
If the input state is multi-dimensional (i.e. more than one state is provided), the
sigma value is initialized such that the resulting variance will match ``sigma_init``
(or 1 if no ``sigma_init`` value is provided).
Args:
sigma_init (float, optional): the initial value of the standard deviation. The
softplus non-linearity is used to map the log_sigma parameter to a
positive value. Defaults to ``None`` (learned).
scale_min (float, optional): min value of the scale. Defaults to ``0.01``.
scale_max (float, optional): max value of the scale. Defaults to ``10.0``.
learn_sigma (bool, optional): if ``True``, the value of the ``sigma``
variable will be included in the module parameters, making it learnable.
Defaults to ``True``.
transform (torch.distribution.Transform, optional): a transform to apply
to the sampled action. Defaults to ``None`` (no transform).
device (torch.device, optional): device to create the model on.
Defaults to ``"cpu"``.
"""
cls_to_become = gSDEModule
log_sigma: UninitializedParameter
_sigma: UninitializedBuffer
sigma_init: UninitializedBuffer
def __init__(
self,
sigma_init: float = None,
scale_min: float = 0.01,
scale_max: float = 10.0,
learn_sigma: bool = True,
transform: Optional[d.Transform] = None,
device: Optional[DEVICE_TYPING] = None,
) -> None:
super().__init__(
0,
0,
sigma_init=0.0,
scale_min=scale_min,
scale_max=scale_max,
learn_sigma=learn_sigma,
transform=transform,
device=device,
)
factory_kwargs = {
"device": device,
"dtype": torch.get_default_dtype(),
}
self._sigma_init = sigma_init
self.sigma_init = UninitializedBuffer(**factory_kwargs)
if learn_sigma:
self.log_sigma = UninitializedParameter(**factory_kwargs)
else:
self._sigma = UninitializedBuffer(**factory_kwargs)
def reset_parameters(self) -> None:
pass
def initialize_parameters(
self, mu: torch.Tensor, state: torch.Tensor, _eps_gSDE: torch.Tensor
) -> None:
if self.has_uninitialized_params():
action_dim = mu.shape[-1]
state_dim = state.shape[-1]
with torch.no_grad():
if state.ndimension() > 2:
state = state.flatten(0, -2).squeeze(0)
if state.ndimension() == 1:
state_flatten_var = torch.ones(1, device=mu.device)
else:
state_flatten_var = state.pow(2).mean(dim=0).reciprocal()
self.sigma_init.materialize(state_flatten_var.shape)
if self.learn_sigma:
if self._sigma_init is None:
state_flatten_var.clamp_min_(self.scale_min)
self.sigma_init.data.copy_(
inv_softplus((state_flatten_var / state_dim).sqrt())
)
else:
self.sigma_init.data.copy_(
inv_softplus(
self._sigma_init
* (state_flatten_var / state_dim).sqrt()
)
)
self.log_sigma.materialize((action_dim, state_dim))
self.log_sigma.data.copy_(self.sigma_init.expand_as(self.log_sigma))
else:
if self._sigma_init is None:
self.sigma_init.data.copy_(
(state_flatten_var / state_dim).sqrt()
)
else:
self.sigma_init.data.copy_(
(state_flatten_var / state_dim).sqrt() * self._sigma_init
)
self._sigma.materialize((action_dim, state_dim))
self._sigma.data.copy_(self.sigma_init.expand_as(self._sigma))