Note
Go to the end to download the full example code.
Introduction to TorchRL¶
This demo was presented at ICML 2022 on the industry demo day.
It gives a good overview of TorchRL functionalities. Feel free to reach out to vmoens@fb.com or submit issues if you have questions or comments about it.
TorchRL is an open-source Reinforcement Learning (RL) library for PyTorch.
The PyTorch ecosystem team (Meta) has decided to invest in that library to provide a leading platform to develop RL solutions in research settings.
It provides pytorch and python-first, low and high level abstractions # for RL that are intended to be efficient, documented and properly tested. The code is aimed at supporting research in RL. Most of it is written in python in a highly modular way, such that researchers can easily swap components, transform them or write new ones with little effort.
This repo attempts to align with the existing pytorch ecosystem libraries in that it has a dataset pillar (torchrl/envs), transforms, models, data utilities (e.g. collectors and containers), etc. TorchRL aims at having as few dependencies as possible (python standard library, numpy and pytorch). Common environment libraries (e.g. OpenAI gym) are only optional.
Unlike other domains, RL is less about media than algorithms. As such, it is harder to make truly independent components.
What TorchRL is not:
a collection of algorithms: we do not intend to provide SOTA implementations of RL algorithms, but we provide these algorithms only as examples of how to use the library.
a research framework: modularity in TorchRL comes in two flavors. First, we try to build re-usable components, such that they can be easily swapped with each other. Second, we make our best such that components can be used independently of the rest of the library.
TorchRL has very few core dependencies, predominantly PyTorch and numpy. All other dependencies (gym, torchvision, wandb / tensorboard) are optional.
Data¶
TensorDict¶
import torch
from tensordict import TensorDict
Let’s create a TensorDict. The constructor accepts many different formats, like passing a dict or with keyword arguments:
batch_size = 5
data = TensorDict(
key1=torch.zeros(batch_size, 3),
key2=torch.zeros(batch_size, 5, 6, dtype=torch.bool),
batch_size=[batch_size],
)
print(data)
TensorDict(
fields={
key1: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([5]),
device=None,
is_shared=False)
You can index a TensorDict along its batch_size
, as well as query keys.
print(data[2])
print(data["key1"] is data.get("key1"))
TensorDict(
fields={
key1: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
True
The following shows how to stack multiple TensorDicts. This is particularly useful when writing rollout loops!
data1 = TensorDict(
{
"key1": torch.zeros(batch_size, 1),
"key2": torch.zeros(batch_size, 5, 6, dtype=torch.bool),
},
batch_size=[batch_size],
)
data2 = TensorDict(
{
"key1": torch.ones(batch_size, 1),
"key2": torch.ones(batch_size, 5, 6, dtype=torch.bool),
},
batch_size=[batch_size],
)
data = torch.stack([data1, data2], 0)
data.batch_size, data["key1"]
(torch.Size([2, 5]), tensor([[[0.],
[0.],
[0.],
[0.],
[0.]],
[[1.],
[1.],
[1.],
[1.],
[1.]]]))
Here are some other functionalities of TensorDict: viewing, permute, sharing memory or expanding.
print(
"view(-1): ",
data.view(-1).batch_size,
data.view(-1).get("key1").shape,
)
print("to device: ", data.to("cpu"))
# print("pin_memory: ", data.pin_memory())
print("share memory: ", data.share_memory_())
print(
"permute(1, 0): ",
data.permute(1, 0).batch_size,
data.permute(1, 0).get("key1").shape,
)
print(
"expand: ",
data.expand(3, *data.batch_size).batch_size,
data.expand(3, *data.batch_size).get("key1").shape,
)
view(-1): torch.Size([10]) torch.Size([10, 1])
to device: TensorDict(
fields={
key1: Tensor(shape=torch.Size([2, 5, 1]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([2, 5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 5]),
device=cpu,
is_shared=False)
share memory: TensorDict(
fields={
key1: Tensor(shape=torch.Size([2, 5, 1]), device=cpu, dtype=torch.float32, is_shared=True),
key2: Tensor(shape=torch.Size([2, 5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=True)},
batch_size=torch.Size([2, 5]),
device=None,
is_shared=True)
permute(1, 0): torch.Size([5, 2]) torch.Size([5, 2, 1])
expand: torch.Size([3, 2, 5]) torch.Size([3, 2, 5, 1])
You can create a nested data as well.
data = TensorDict(
source={
"key1": torch.zeros(batch_size, 3),
"key2": TensorDict(
source={"sub_key1": torch.zeros(batch_size, 2, 1)},
batch_size=[batch_size, 2],
),
},
batch_size=[batch_size],
)
data
TensorDict(
fields={
key1: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: TensorDict(
fields={
sub_key1: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([5, 2]),
device=None,
is_shared=False)},
batch_size=torch.Size([5]),
device=None,
is_shared=False)
Replay buffers¶
Replay buffers are a crucial component in many RL algorithms. TorchRL provides a range of replay buffer implementations. Most basic features will work with any data scturcture (list, tuples, dict) but to use the replay buffers to their full extend and with fast read and write access, TensorDict APIs should be preferred.
from torchrl.data import PrioritizedReplayBuffer, ReplayBuffer
rb = ReplayBuffer(collate_fn=lambda x: x)
Adding can be done with add()
(n=1)
or extend()
(n>1).
rb.add(1)
rb.sample(1)
rb.extend([2, 3])
rb.sample(3)
[2, 1, 3]
Prioritized Replay Buffers can also be used:
rb = PrioritizedReplayBuffer(alpha=0.7, beta=1.1, collate_fn=lambda x: x)
rb.add(1)
rb.sample(1)
rb.update_priority(1, 0.5)
Here are examples of using a replaybuffer with data_stack. Using them makes it easy to abstract away the behaviour of the replay buffer for multiple use cases.
collate_fn = torch.stack
rb = ReplayBuffer(collate_fn=collate_fn)
rb.add(TensorDict({"a": torch.randn(3)}, batch_size=[]))
len(rb)
rb.extend(TensorDict({"a": torch.randn(2, 3)}, batch_size=[2]))
print(len(rb))
print(rb.sample(10))
print(rb.sample(2).contiguous())
torch.manual_seed(0)
from torchrl.data import TensorDictPrioritizedReplayBuffer
rb = TensorDictPrioritizedReplayBuffer(alpha=0.7, beta=1.1, priority_key="td_error")
rb.extend(TensorDict({"a": torch.randn(2, 3)}, batch_size=[2]))
data_sample = rb.sample(2).contiguous()
print(data_sample)
print(data_sample["index"])
data_sample["td_error"] = torch.rand(2)
rb.update_tensordict_priority(data_sample)
for i, val in enumerate(rb._sampler._sum_tree):
print(i, val)
if i == len(rb):
break
3
TensorDict(
fields={
a: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
TensorDict(
fields={
a: Tensor(shape=torch.Size([2, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([2]),
device=None,
is_shared=False)
TensorDict(
fields={
_weight: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, is_shared=False),
a: Tensor(shape=torch.Size([2, 3]), device=cpu, dtype=torch.float32, is_shared=False),
index: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([2]),
device=None,
is_shared=False)
tensor([0, 0])
0 0.28791671991348267
1 1.0
2 0.0
Envs¶
TorchRL provides a range of environment wrappers and utilities.
Gym Environment¶
try:
import gymnasium as gym
except ModuleNotFoundError:
import gym
from torchrl.envs.libs.gym import GymEnv, GymWrapper, set_gym_backend
gym_env = gym.make("Pendulum-v1")
env = GymWrapper(gym_env)
env = GymEnv("Pendulum-v1")
data = env.reset()
env.rand_step(data)
TensorDict(
fields={
action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
Changing environments config¶
env = GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
env.reset()
env.close()
del env
from torchrl.envs import (
Compose,
NoopResetEnv,
ObservationNorm,
ToTensorImage,
TransformedEnv,
)
base_env = GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
env = TransformedEnv(base_env, Compose(NoopResetEnv(3), ToTensorImage()))
env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
TransformedEnv(
env=GymEnv(env=Pendulum-v1, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels']),
ObservationNorm(loc=2.0000, scale=1.0000, keys=['pixels'])))
Environment Transforms¶
Transforms act like Gym wrappers but with an API closer to torchvision’s torch.distributions
’ transforms.
There is a wide range of transforms to choose from.
from torchrl.envs import (
Compose,
NoopResetEnv,
ObservationNorm,
StepCounter,
ToTensorImage,
TransformedEnv,
)
base_env = GymEnv("HalfCheetah-v4", frame_skip=3, from_pixels=True, pixels_only=False)
env = TransformedEnv(base_env, Compose(NoopResetEnv(3), ToTensorImage()))
env = env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
env.reset()
print("env: ", env)
print("last transform parent: ", env.transform[2].parent)
env: TransformedEnv(
env=GymEnv(env=HalfCheetah-v4, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels']),
ObservationNorm(loc=2.0000, scale=1.0000, keys=['pixels'])))
last transform parent: TransformedEnv(
env=GymEnv(env=HalfCheetah-v4, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels'])))
Vectorized Environments¶
Vectorized / parallel environments can provide some significant speed-ups.
from torchrl.envs import ParallelEnv
def make_env():
# You can control whether to use gym or gymnasium for your env
with set_gym_backend("gym"):
return GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
base_env = ParallelEnv(
4,
make_env,
mp_start_method="fork", # This will break on Windows machines! Remove and decorate with if __name__ == "__main__"
)
env = TransformedEnv(
base_env, Compose(StepCounter(), ToTensorImage())
) # applies transforms on batch of envs
env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
env.reset()
print(env.action_spec)
env.close()
del env
BoundedContinuous(
shape=torch.Size([4, 1]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous)
Modules¶
Multiple modules (utils, models and wrappers) can be found in the library.
Models¶
Example of a MLP model:
from torch import nn
from torchrl.modules import ConvNet, MLP
from torchrl.modules.models.utils import SquashDims
net = MLP(num_cells=[32, 64], out_features=4, activation_class=nn.ELU)
print(net)
print(net(torch.randn(10, 3)).shape)
MLP(
(0): LazyLinear(in_features=0, out_features=32, bias=True)
(1): ELU(alpha=1.0)
(2): Linear(in_features=32, out_features=64, bias=True)
(3): ELU(alpha=1.0)
(4): Linear(in_features=64, out_features=4, bias=True)
)
torch.Size([10, 4])
Example of a CNN model:
cnn = ConvNet(
num_cells=[32, 64],
kernel_sizes=[8, 4],
strides=[2, 1],
aggregator_class=SquashDims,
)
print(cnn)
print(cnn(torch.randn(10, 3, 32, 32)).shape) # last tensor is squashed
ConvNet(
(0): LazyConv2d(0, 32, kernel_size=(8, 8), stride=(2, 2))
(1): ELU(alpha=1.0)
(2): Conv2d(32, 64, kernel_size=(4, 4), stride=(1, 1))
(3): ELU(alpha=1.0)
(4): SquashDims()
)
torch.Size([10, 6400])
TensorDictModules¶
Some modules are specifically designed to work with tensordict inputs.
from tensordict.nn import TensorDictModule
data = TensorDict({"key1": torch.randn(10, 3)}, batch_size=[10])
module = nn.Linear(3, 4)
td_module = TensorDictModule(module, in_keys=["key1"], out_keys=["key2"])
td_module(data)
print(data)
TensorDict(
fields={
key1: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key2: Tensor(shape=torch.Size([10, 4]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
Sequences of Modules¶
Making sequences of modules is made easy by TensorDictSequential
:
from tensordict.nn import TensorDictSequential
backbone_module = nn.Linear(5, 3)
backbone = TensorDictModule(
backbone_module, in_keys=["observation"], out_keys=["hidden"]
)
actor_module = nn.Linear(3, 4)
actor = TensorDictModule(actor_module, in_keys=["hidden"], out_keys=["action"])
value_module = MLP(out_features=1, num_cells=[4, 5])
value = TensorDictModule(value_module, in_keys=["hidden", "action"], out_keys=["value"])
sequence = TensorDictSequential(backbone, actor, value)
print(sequence)
print(sequence.in_keys, sequence.out_keys)
data = TensorDict(
{"observation": torch.randn(3, 5)},
[3],
)
backbone(data)
actor(data)
value(data)
data = TensorDict(
{"observation": torch.randn(3, 5)},
[3],
)
sequence(data)
print(data)
TensorDictSequential(
module=ModuleList(
(0): TensorDictModule(
module=Linear(in_features=5, out_features=3, bias=True),
device=cpu,
in_keys=['observation'],
out_keys=['hidden'])
(1): TensorDictModule(
module=Linear(in_features=3, out_features=4, bias=True),
device=cpu,
in_keys=['hidden'],
out_keys=['action'])
(2): TensorDictModule(
module=MLP(
(0): LazyLinear(in_features=0, out_features=4, bias=True)
(1): Tanh()
(2): Linear(in_features=4, out_features=5, bias=True)
(3): Tanh()
(4): Linear(in_features=5, out_features=1, bias=True)
),
device=cpu,
in_keys=['hidden', 'action'],
out_keys=['value'])
),
device=cpu,
in_keys=['observation'],
out_keys=['hidden', 'action', 'value'])
['observation'] ['hidden', 'action', 'value']
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
hidden: Tensor(shape=torch.Size([3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
value: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
Functional Programming (Ensembling / Meta-RL)¶
Functional calls have never been easier. Extract the parameters with from_module()
, and
replace them with to_module()
:
from tensordict import from_module
params = from_module(sequence)
print("extracted params", params)
extracted params TensorDict(
fields={
module: TensorDict(
fields={
0: TensorDict(
fields={
module: TensorDict(
fields={
bias: Parameter(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
1: TensorDict(
fields={
module: TensorDict(
fields={
bias: Parameter(shape=torch.Size([4]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([4, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
2: TensorDict(
fields={
module: TensorDict(
fields={
0: TensorDict(
fields={
bias: Parameter(shape=torch.Size([4]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([4, 7]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
2: TensorDict(
fields={
bias: Parameter(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
4: TensorDict(
fields={
bias: Parameter(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([1, 5]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
functional call using tensordict:
with params.to_module(sequence):
data = sequence(data)
VMAP¶
Fast execution of multiple copies of a similar architecture is key to train your models fast.
vmap()
is tailored to do just that:
TensorDict(
fields={
action: Tensor(shape=torch.Size([4, 3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
hidden: Tensor(shape=torch.Size([4, 3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([4, 3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
value: Tensor(shape=torch.Size([4, 3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([4, 3]),
device=None,
is_shared=False)
Specialized Classes¶
TorchRL provides also some specialized modules that run checks on the output values.
torch.manual_seed(0)
from torchrl.data import Bounded
from torchrl.modules import SafeModule
spec = Bounded(-torch.ones(3), torch.ones(3))
base_module = nn.Linear(5, 3)
module = SafeModule(
module=base_module, spec=spec, in_keys=["obs"], out_keys=["action"], safe=True
)
data = TensorDict({"obs": torch.randn(5)}, batch_size=[])
module(data)["action"]
data = TensorDict({"obs": torch.randn(5) * 100}, batch_size=[])
module(data)["action"] # safe=True projects the result within the set
tensor([-1., 1., -1.], grad_fn=<AsStridedBackward0>)
The Actor
class has has a predefined output key ("action"
):
from torchrl.modules import Actor
base_module = nn.Linear(5, 3)
actor = Actor(base_module, in_keys=["obs"])
data = TensorDict({"obs": torch.randn(5)}, batch_size=[])
actor(data) # action is the default value
from tensordict.nn import (
ProbabilisticTensorDictModule,
ProbabilisticTensorDictSequential,
)
Working with probabilistic models is also made easy thanks to the tensordict.nn
API:
from torchrl.modules import NormalParamExtractor, TanhNormal
td = TensorDict({"input": torch.randn(3, 5)}, [3])
net = nn.Sequential(
nn.Linear(5, 4), NormalParamExtractor()
) # splits the output in loc and scale
module = TensorDictModule(net, in_keys=["input"], out_keys=["loc", "scale"])
td_module = ProbabilisticTensorDictSequential(
module,
ProbabilisticTensorDictModule(
in_keys=["loc", "scale"],
out_keys=["action"],
distribution_class=TanhNormal,
return_log_prob=False,
),
)
td_module(td)
print(td)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
input: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
loc: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
scale: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
# returning the log-probability
td = TensorDict({"input": torch.randn(3, 5)}, [3])
td_module = ProbabilisticTensorDictSequential(
module,
ProbabilisticTensorDictModule(
in_keys=["loc", "scale"],
out_keys=["action"],
distribution_class=TanhNormal,
return_log_prob=True,
),
)
td_module(td)
print(td)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
input: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
loc: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
sample_log_prob: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
scale: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
Controlling randomness and sampling strategies is achieved via a context manager,
set_exploration_type
:
from torchrl.envs.utils import ExplorationType, set_exploration_type
td = TensorDict({"input": torch.randn(3, 5)}, [3])
torch.manual_seed(0)
with set_exploration_type(ExplorationType.RANDOM):
td_module(td)
print("random:", td["action"])
with set_exploration_type(ExplorationType.DETERMINISTIC):
td_module(td)
print("mode:", td["action"])
random: tensor([[ 0.8728, -0.1334],
[-0.9833, 0.3494],
[-0.6887, -0.6402]], grad_fn=<_SafeTanhNoEpsBackward>)
mode: tensor([[-0.1132, 0.1762],
[-0.3430, -0.2668],
[ 0.2918, 0.6239]], grad_fn=<_SafeTanhNoEpsBackward>)
Using Environments and Modules¶
Let us see how environments and modules can be combined:
from torchrl.envs.utils import step_mdp
env = GymEnv("Pendulum-v1")
action_spec = env.action_spec
actor_module = nn.Linear(3, 1)
actor = SafeModule(
actor_module, spec=action_spec, in_keys=["observation"], out_keys=["action"]
)
torch.manual_seed(0)
env.set_seed(0)
max_steps = 100
data = env.reset()
data_stack = TensorDict(batch_size=[max_steps])
for i in range(max_steps):
actor(data)
data_stack[i] = env.step(data)
if data["done"].any():
break
data = step_mdp(data) # roughly equivalent to obs = next_obs
tensordicts_prealloc = data_stack.clone()
print("total steps:", i)
print(data_stack)
total steps: 99
TensorDict(
fields={
action: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False)
# equivalent
torch.manual_seed(0)
env.set_seed(0)
max_steps = 100
data = env.reset()
data_stack = []
for _ in range(max_steps):
actor(data)
data_stack.append(env.step(data))
if data["done"].any():
break
data = step_mdp(data) # roughly equivalent to obs = next_obs
tensordicts_stack = torch.stack(data_stack, 0)
print("total steps:", i)
print(tensordicts_stack)
total steps: 99
TensorDict(
fields={
action: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False)
(tensordicts_stack == tensordicts_prealloc).all()
True
torch.manual_seed(0)
env.set_seed(0)
tensordict_rollout = env.rollout(policy=actor, max_steps=max_steps)
tensordict_rollout
(tensordict_rollout == tensordicts_prealloc).all()
from tensordict.nn import TensorDictModule
Collectors¶
We also provide a set of data collectors, that automaticall gather as many frames per batch as required. They work from single-node, single worker to multi-nodes, multi-workers settings.
from torchrl.collectors import MultiaSyncDataCollector, MultiSyncDataCollector
from torchrl.envs import EnvCreator, SerialEnv
from torchrl.envs.libs.gym import GymEnv
EnvCreator makes sure that we can send a lambda function from process to process
We use a SerialEnv
for simplicity (single worker), but for larger jobs a
ParallelEnv
(multi-workers) would be better suited.
Note
Multiprocessed envs and multiprocessed collectors can be combined!
parallel_env = SerialEnv(
3,
EnvCreator(lambda: GymEnv("Pendulum-v1")),
)
create_env_fn = [parallel_env, parallel_env]
actor_module = nn.Linear(3, 1)
actor = TensorDictModule(actor_module, in_keys=["observation"], out_keys=["action"])
Sync multiprocessed data collector¶
devices = ["cpu", "cpu"]
collector = MultiSyncDataCollector(
create_env_fn=create_env_fn, # either a list of functions or a ParallelEnv
policy=actor,
total_frames=240,
max_frames_per_traj=-1, # envs are terminating, we don't need to stop them early
frames_per_batch=60, # we want 60 frames at a time (we have 3 envs per sub-collector)
device=devices,
)
for i, d in enumerate(collector):
if i == 0:
print(d) # trajectories are split automatically in [6 workers x 10 steps]
collector.update_policy_weights_() # make sure that our policies have the latest weights if working on multiple devices
print(i)
collector.shutdown()
del collector
TensorDict(
fields={
action: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([2, 3, 10]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([2, 3, 10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([2, 3, 10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False)
3
Async multiprocessed data collector¶
This class allows you to collect data while the model is training. This is particularily useful in off-policy settings as it decouples the inference and the model trainning. Data is delived in a first-ready-first-served basis (workers will queue their results):
collector = MultiaSyncDataCollector(
create_env_fn=create_env_fn, # either a list of functions or a ParallelEnv
policy=actor,
total_frames=240,
max_frames_per_traj=-1, # envs are terminating, we don't need to stop them early
frames_per_batch=60, # we want 60 frames at a time (we have 3 envs per sub-collector)
device=devices,
)
for i, d in enumerate(collector):
if i == 0:
print(d) # trajectories are split automatically in [6 workers x 10 steps]
collector.update_policy_weights_() # make sure that our policies have the latest weights if working on multiple devices
print(i)
collector.shutdown()
del collector
del create_env_fn
del parallel_env
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.float32, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([3, 20]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3, 20, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([3, 20, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False)
3
Objectives¶
Objectives are the main entry points when coding up a new algorithm.
from torchrl.objectives import DDPGLoss
actor_module = nn.Linear(3, 1)
actor = TensorDictModule(actor_module, in_keys=["observation"], out_keys=["action"])
class ConcatModule(nn.Linear):
def forward(self, obs, action):
return super().forward(torch.cat([obs, action], -1))
value_module = ConcatModule(4, 1)
value = TensorDictModule(
value_module, in_keys=["observation", "action"], out_keys=["state_action_value"]
)
loss_fn = DDPGLoss(actor, value)
loss_fn.make_value_estimator(loss_fn.default_value_estimator, gamma=0.99)
data = TensorDict(
{
"observation": torch.randn(10, 3),
"next": {
"observation": torch.randn(10, 3),
"reward": torch.randn(10, 1),
"done": torch.zeros(10, 1, dtype=torch.bool),
},
"action": torch.randn(10, 1),
},
batch_size=[10],
device="cpu",
)
loss_td = loss_fn(data)
print(loss_td)
print(data)
TensorDict(
fields={
loss_actor: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
loss_value: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
pred_value: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False),
pred_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
target_value: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False),
target_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
td_error: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
td_error: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False)
Installing the Library¶
The library is on PyPI: pip install torchrl See the README for more information.
Contributing¶
We are actively looking for contributors and early users. If you’re working in RL (or just curious), try it! Give us feedback: what will make the success of TorchRL is how well it covers researchers needs. To do that, we need their input! Since the library is nascent, it is a great time for you to shape it the way you want!
See the Contributing guide for more info.
Total running time of the script: (2 minutes 27.321 seconds)
Estimated memory usage: 334 MB