Source code for torchrl.envs.libs.gym
# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import collections
import importlib
import warnings
from copy import copy
from types import ModuleType
from typing import Dict, List, Tuple
from warnings import warn
import numpy as np
import torch
from packaging import version
from tensordict import TensorDict, TensorDictBase
from torch.utils._pytree import tree_map
from torchrl._utils import implement_for
from import (
from import numpy_to_torch_dtype_dict, torch_to_numpy_dtype_dict
from torchrl.envs.batched_envs import CloudpickleWrapper
from torchrl.envs.common import _EnvPostInit
from torchrl.envs.gym_like import default_info_dict_reader, GymLikeEnv
from torchrl.envs.utils import _classproperty
from torch.utils._contextlib import _DecoratorContextManager
except ModuleNotFoundError:
from torchrl._utils import _DecoratorContextManager
# check gym presence without importing it
_has_gym = importlib.util.find_spec("gym") is not None
if not _has_gym:
_has_gym = importlib.util.find_spec("gymnasium") is not None
_has_mo = importlib.util.find_spec("mo_gymnasium") is not None
_has_sb3 = importlib.util.find_spec("stable_baselines3") is not None
_has_minigrid = importlib.util.find_spec("minigrid") is not None
GYMNASIUM_1_ERROR = """RuntimeError: TorchRL does not support gymnasium 1.0 or later versions due to incompatible
changes in the Gym API.
Using gymnasium 1.0 with TorchRL would require significant modifications to your code and may result in:
* Inaccurate step counting, as the auto-reset feature can cause unpredictable numbers of steps to be executed.
* Potential data corruption, as the environment may require/produce garbage data during reset steps.
* Trajectory overlap during data collection.
* Increased computational overhead, as the library would need to handle the additional complexity of auto-resets.
* Manual filtering and boilerplate code to mitigate these issues, which would compromise the modularity and ease of
use of TorchRL.
To maintain the integrity and efficiency of our library, we cannot support this version of gymnasium at this time.
If you need to use gymnasium 1.0 or later, we recommend exploring alternative solutions or waiting for future updates
to TorchRL and gymnasium that may address this compatibility issue.
For more information, please refer to discussion in torchrl.
def _minigrid_lib():
assert _has_minigrid, "minigrid not found"
import minigrid
return minigrid
[docs]class set_gym_backend(_DecoratorContextManager):
"""Sets the gym-backend to a certain value.
backend (python module, string or callable returning a module): the
gym backend to use. Use a string or callable whenever you wish to
avoid importing gym at loading time.
>>> import gym
>>> import gymnasium
>>> with set_gym_backend("gym"):
... assert gym_backend() == gym
>>> with set_gym_backend(lambda: gym):
... assert gym_backend() == gym
>>> with set_gym_backend(gym):
... assert gym_backend() == gym
>>> with set_gym_backend("gymnasium"):
... assert gym_backend() == gymnasium
>>> with set_gym_backend(lambda: gymnasium):
... assert gym_backend() == gymnasium
>>> with set_gym_backend(gymnasium):
... assert gym_backend() == gymnasium
This class can also be used as a function decorator.
>>> @set_gym_backend("gym")
... def fun():
... gym = gym_backend()
... print(gym)
>>> fun()
<module 'gym' from '/path/to/env/site-packages/gym/'>
>>> @set_gym_backend("gymnasium")
... def fun():
... gym = gym_backend()
... print(gym)
>>> fun()
<module 'gymnasium' from '/path/to/env/site-packages/gymnasium/'>
def __init__(self, backend):
self.backend = backend
def _call(self):
"""Sets the backend as default."""
DEFAULT_GYM = self.backend
found_setters = collections.defaultdict(lambda: False)
for setter in copy(implement_for._setters):
check_module = (
and setter.module_name.__name__ == self.backend.__name__
) or setter.module_name == self.backend.__name__
check_version = setter.check_version(
self.backend.__version__, setter.from_version, setter.to_version
if check_module and check_version:
found_setter = True
elif check_module:
found_setter = False
found_setter = None
if found_setter is not None:
found_setters[setter.func_name] = (
found_setters[setter.func_name] or found_setter
# we keep only the setters we need. This is safe because a copy is saved under self._setters_saved
for func_name, found_setter in found_setters.items():
if not found_setter:
raise ImportError(
f"could not set anything related to gym backend "
f"{self.backend.__name__} with version={self.backend.__version__} for the function with name {func_name}. "
f"Check that the gym versions match!"
def set(self):
"""Irreversibly sets the gym backend in the script."""
def __enter__(self):
# we save a complete list of setters as well as whether they should be set.
# we want the full list becasue we want to be able to nest the calls to set_gym_backend.
# we also want to keep track of which ones are set to reproduce what was set before.
self._setters_saved = copy(implement_for._implementations)
def __exit__(self, exc_type, exc_val, exc_tb):
delattr(self, "_setters_saved")
def clone(self):
# override this method if your children class takes __init__ parameters
return self.__class__(self.backend)
def backend(self):
if isinstance(self._backend, str):
return importlib.import_module(self._backend)
elif callable(self._backend):
return self._backend()
return self._backend
def backend(self, value):
self._backend = value
[docs]def gym_backend(submodule=None):
"""Returns the gym backend, or a sumbodule of it.
submodule (str): the submodule to import. If ``None``, the backend
itself is returned.
>>> import mo_gymnasium
>>> with set_gym_backend("gym"):
... wrappers = gym_backend('wrappers')
... print(wrappers)
>>> with set_gym_backend("gymnasium"):
... wrappers = gym_backend('wrappers')
... print(wrappers)
if DEFAULT_GYM is None:
# rule of thumbs: gymnasium precedes
import gymnasium as gym
except ImportError as err:
import gym as gym
except ImportError as err:
gym = None
if submodule is not None:
if not submodule.startswith("."):
submodule = "." + submodule
submodule = importlib.import_module(submodule, package=DEFAULT_GYM.__name__)
return submodule
__all__ = ["GymWrapper", "GymEnv"]
def _gym_to_torchrl_spec_transform(
remap_state_to_observation: bool = True,
batch_size: tuple = (),
) -> TensorSpec:
"""Maps the gym specs to the TorchRL specs.
spec (gym.spaces member): the gym space to transform.
dtype (torch.dtype): a dtype to use for the spec.
Defaults to`spec.dtype`.
device (torch.device): the device for the spec.
Defaults to ``None`` (no device for composite and default device for specs).
categorical_action_encoding (bool): whether discrete spaces should be mapped to categorical or one-hot.
Defaults to ``False`` (one-hot).
remap_state_to_observation (bool): whether to rename the 'state' key of
Dict specs to "observation". Default is true.
batch_size (torch.Size): batch size to which expand the spec. Defaults to
if batch_size:
return _gym_to_torchrl_spec_transform(
gym_spaces = gym_backend("spaces")
if isinstance(spec, gym_spaces.tuple.Tuple):
result = torch.stack(
for s in spec
return result
if isinstance(spec, gym_spaces.discrete.Discrete):
action_space_cls = Categorical if categorical_action_encoding else OneHot
dtype = (
if categorical_action_encoding
else torch.long
return action_space_cls(spec.n, device=device, dtype=dtype)
elif isinstance(spec, gym_spaces.multi_binary.MultiBinary):
return Binary(
spec.n, device=device, dtype=numpy_to_torch_dtype_dict[spec.dtype]
# a spec type cannot be a string, so we're sure that versions of gym that don't have Sequence will just skip through this
elif isinstance(spec, getattr(gym_spaces, "Sequence", str)):
if not hasattr(spec, "stack"):
# gym does not have a stack attribute in sequence
raise ValueError(
"gymnasium should be used whenever a Sequence is present, as it needs to be stacked. "
"If you need the gym backend at all price, please raise an issue on the TorchRL GitHub repository."
if not getattr(spec, "stack", False):
raise ValueError(
"Sequence spaces must have the stack argument set to ``True``. "
space = spec.feature_space
out = _gym_to_torchrl_spec_transform(space, device=device, dtype=dtype)
out = out.unsqueeze(0)
return out
elif isinstance(spec, gym_spaces.multi_discrete.MultiDiscrete):
if len(spec.nvec.shape) == 1 and len(np.unique(spec.nvec)) > 1:
dtype = (
if categorical_action_encoding
else torch.long
return (
MultiCategorical(spec.nvec, device=device, dtype=dtype)
if categorical_action_encoding
else MultiOneHot(spec.nvec, device=device, dtype=dtype)
return torch.stack(
for i in range(len(spec.nvec))
elif isinstance(spec, gym_spaces.Box):
shape = spec.shape
if not len(shape):
shape = torch.Size([1])
if dtype is None:
dtype = numpy_to_torch_dtype_dict[spec.dtype]
low = torch.as_tensor(spec.low, device=device, dtype=dtype)
high = torch.as_tensor(spec.high, device=device, dtype=dtype)
is_unbounded = low.isinf().all() and high.isinf().all()
minval, maxval = _minmax_dtype(dtype)
minval = torch.as_tensor(minval).to(low.device, dtype)
maxval = torch.as_tensor(maxval).to(low.device, dtype)
is_unbounded = is_unbounded or (
torch.isclose(low, torch.as_tensor(minval, dtype=dtype)).all()
and torch.isclose(high, torch.as_tensor(maxval, dtype=dtype)).all()
return (
Unbounded(shape, device=device, dtype=dtype)
if is_unbounded
else Bounded(
elif isinstance(spec, (Dict,)):
spec_out = {}
for k in spec.keys():
key = k
if (
and k == "state"
and "observation" not in spec.keys()
# we rename "state" in "observation" as "observation" is the conventional name
# for single observation in torchrl.
# naming it 'state' will result in envs that have a different name for the state vector
# when queried with and without pixels
key = "observation"
spec_out[key] = _gym_to_torchrl_spec_transform(
# the batch-size must be set later
return Composite(spec_out, device=device)
elif isinstance(spec, gym_spaces.dict.Dict):
return _gym_to_torchrl_spec_transform(
elif _has_minigrid and isinstance(spec, _minigrid_lib().core.mission.MissionSpace):
return NonTensor((), device=device)
raise NotImplementedError(
f"spec of type {type(spec).__name__} is currently unaccounted for"
@implement_for("gym", None, "0.18")
def _box_convert(spec, gym_spaces, shape):
low = spec.low.detach().unique().cpu().item()
high = spec.high.detach().unique().cpu().item()
return gym_spaces.Box(low=low, high=high, shape=shape)
@implement_for("gym", "0.18")
def _box_convert(spec, gym_spaces, shape): # noqa: F811
low = spec.low.detach().cpu().numpy()
high = spec.high.detach().cpu().numpy()
return gym_spaces.Box(low=low, high=high, shape=shape)
@implement_for("gymnasium", None, "1.0.0")
def _box_convert(spec, gym_spaces, shape): # noqa: F811
low = spec.low.detach().cpu().numpy()
high = spec.high.detach().cpu().numpy()
return gym_spaces.Box(low=low, high=high, shape=shape)
@implement_for("gymnasium", "1.0.0")
def _box_convert(spec, gym_spaces, shape): # noqa: F811
raise ImportError(GYMNASIUM_1_ERROR)
@implement_for("gym", "0.21", None)
def _multidiscrete_convert(gym_spaces, spec):
return gym_spaces.multi_discrete.MultiDiscrete(
spec.nvec, dtype=torch_to_numpy_dtype_dict[spec.dtype]
@implement_for("gymnasium", None, "1.0.0")
def _multidiscrete_convert(gym_spaces, spec): # noqa: F811
return gym_spaces.multi_discrete.MultiDiscrete(
spec.nvec, dtype=torch_to_numpy_dtype_dict[spec.dtype]
@implement_for("gymnasium", "1.0.0")
def _multidiscrete_convert(gym_spaces, spec): # noqa: F811
raise ImportError(GYMNASIUM_1_ERROR)
@implement_for("gym", None, "0.21")
def _multidiscrete_convert(gym_spaces, spec): # noqa: F811
return gym_spaces.multi_discrete.MultiDiscrete(spec.nvec)
def _torchrl_to_gym_spec_transform(
) -> TensorSpec:
"""Maps TorchRL specs to gym spaces.
spec: the torchrl spec to transform.
categorical_action_encoding: whether discrete spaces should be mapped to categorical or one-hot.
Defaults to one-hot.
gym_spaces = gym_backend("spaces")
shape = spec.shape
if any(s == -1 for s in spec.shape):
if spec.shape[0] == -1:
spec = spec.clone()
spec = spec[0]
return gym_spaces.Sequence(_torchrl_to_gym_spec_transform(spec), stack=True)
return gym_spaces.Tuple(
tuple(_torchrl_to_gym_spec_transform(spec) for spec in spec.unbind(0))
if isinstance(spec, MultiCategorical):
return _multidiscrete_convert(gym_spaces, spec)
if isinstance(spec, MultiOneHot):
return gym_spaces.multi_discrete.MultiDiscrete(spec.nvec)
if isinstance(spec, Binary):
return gym_spaces.multi_binary.MultiBinary(spec.shape[-1])
if isinstance(spec, Categorical):
return gym_spaces.discrete.Discrete(
) # dtype=torch_to_numpy_dtype_dict[spec.dtype])
if isinstance(spec, OneHot):
return gym_spaces.discrete.Discrete(spec.n)
if isinstance(spec, Unbounded):
minval, maxval = _minmax_dtype(spec.dtype)
return gym_spaces.Box(
if isinstance(spec, Unbounded):
minval, maxval = _minmax_dtype(spec.dtype)
return gym_spaces.Box(
if isinstance(spec, Bounded):
return _box_convert(spec, gym_spaces, shape)
if isinstance(spec, Composite):
# remove batch size
while spec.shape:
spec = spec[0]
return gym_spaces.Dict(
key: _torchrl_to_gym_spec_transform(
for key, val in spec.items()
raise NotImplementedError(
f"spec of type {type(spec).__name__} is currently unaccounted for"
def _get_envs(to_dict=False) -> List:
if not _has_gym:
raise ImportError("Gym(nasium) could not be found in your virtual environment.")
envs = _get_gym_envs()
envs = list(envs)
envs = sorted(envs)
return envs
@implement_for("gym", None, "0.26.0")
def _get_gym_envs(): # noqa: F811
gym = gym_backend()
return gym.envs.registration.registry.env_specs.keys()
@implement_for("gym", "0.26.0", None)
def _get_gym_envs(): # noqa: F811
gym = gym_backend()
return gym.envs.registration.registry.keys()
@implement_for("gymnasium", None, "1.0.0")
def _get_gym_envs(): # noqa: F811
gym = gym_backend()
return gym.envs.registration.registry.keys()
@implement_for("gymnasium", "1.0.0")
def _get_gym_envs(): # noqa: F811
raise ImportError(GYMNASIUM_1_ERROR)
def _is_from_pixels(env):
observation_spec = env.observation_space
PixelObservationWrapper = gym_backend(
except ModuleNotFoundError:
class PixelObservationWrapper:
from torchrl.envs.libs.utils import (
GymPixelObservationWrapper as LegacyPixelObservationWrapper,
gDict = gym_backend("spaces").dict.Dict
Box = gym_backend("spaces").Box
if isinstance(observation_spec, (Dict,)):
if "pixels" in set(observation_spec.keys()):
return True
if isinstance(observation_spec, (gDict,)):
if "pixels" in set(observation_spec.spaces.keys()):
return True
elif (
isinstance(observation_spec, Box)
and (observation_spec.low == 0).all()
and (observation_spec.high == 255).all()
and observation_spec.low.shape[-1] == 3
and observation_spec.low.ndim == 3
return True
while True:
if isinstance(
env, (LegacyPixelObservationWrapper, PixelObservationWrapper)
return True
if hasattr(env, "env"):
env = env.env
return False
class _AsyncMeta(_EnvPostInit):
def __call__(cls, *args, **kwargs):
instance: GymWrapper = super().__call__(*args, **kwargs)
# before gym 0.22, there was no final_observation
if instance._is_batched:
gym_backend = instance.get_library_name(instance._env)
from torchrl.envs.transforms.transforms import (
if _has_sb3:
from stable_baselines3.common.vec_env.base_vec_env import VecEnv
if isinstance(instance._env, VecEnv):
backend = "sb3"
backend = "gym"
backend = "gym"
# we need 3 checks: the backend is not sb3 (if so, gymnasium is used),
# it is gym and not gymnasium and the version is before 0.22.0
add_info_dict = True
if backend == "gym" and gym_backend == "gym": # check gym against gymnasium
import gym
if version.parse(gym.__version__) < version.parse("0.22.0"):
"A batched gym environment is being wrapped in a GymWrapper with gym version < 0.22. "
"This implies that the next-observation is wrongly tracked (as the batched environment auto-resets "
"and discards the true next observation to return the result of the step). "
"This isn't compatible with TorchRL API and should be used with caution.",
add_info_dict = False
if add_info_dict:
# register terminal_obs_reader
instance.observation_spec, backend=backend
return TransformedEnv(instance, VecGymEnvTransform())
return instance
[docs]class GymWrapper(GymLikeEnv, metaclass=_AsyncMeta):
"""OpenAI Gym environment wrapper.
Works accross `gymnasium <>`_ and `OpenAI/gym <>`_.
env (gym.Env): the environment to wrap. Batched environments (:class:`~stable_baselines3.common.vec_env.base_vec_env.VecEnv`
or :class:`gym.VectorEnv`) are supported and the environment batch-size
will reflect the number of environments executed in parallel.
categorical_action_encoding (bool, optional): if ``True``, categorical
specs will be converted to the TorchRL equivalent (:class:``),
otherwise a one-hot encoding will be used (:class:``).
Defaults to ``False``.
Keyword Args:
from_pixels (bool, optional): if ``True``, an attempt to return the pixel
observations from the env will be performed. By default, these observations
will be written under the ``"pixels"`` entry.
The method being used varies
depending on the gym version and may involve a ``wrappers.pixel_observation.PixelObservationWrapper``.
Defaults to ``False``.
pixels_only (bool, optional): if ``True``, only the pixel observations will
be returned (by default under the ``"pixels"`` entry in the output tensordict).
If ``False``, observations (eg, states) and pixels will be returned
whenever ``from_pixels=True``. Defaults to ``True``.
frame_skip (int, optional): if provided, indicates for how many steps the
same action is to be repeated. The observation returned will be the
last observation of the sequence, whereas the reward will be the sum
of rewards across steps.
device (torch.device, optional): if provided, the device on which the data
is to be cast. Defaults to ``torch.device("cpu")``.
batch_size (torch.Size, optional): the batch size of the environment.
Should match the leading dimensions of all observations, done states,
rewards, actions and infos.
Defaults to ``torch.Size([])``.
allow_done_after_reset (bool, optional): if ``True``, it is tolerated
for envs to be ``done`` just after :meth:`~.reset` is called.
Defaults to ``False``.
convert_actions_to_numpy (bool, optional): if ``True``, actions will be
converted from tensors to numpy arrays and moved to CPU before being passed to the
env step function. Set this to ``False`` if the environment is evaluated
on GPU, such as IsaacLab.
Defaults to ``True``.
available_envs (List[str]): a list of environments to build.
.. note::
If an attribute cannot be found, this class will attempt to retrieve it from
the nested env:
>>> from torchrl.envs import GymWrapper
>>> import gymnasium as gym
>>> env = GymWrapper(gym.make("Pendulum-v1"))
>>> print(env.spec.max_episode_steps)
>>> import gymnasium as gym
>>> from torchrl.envs import GymWrapper
>>> base_env = gym.make("Pendulum-v1")
>>> env = GymWrapper(base_env)
>>> td = env.rand_step()
>>> print(td)
action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
next: TensorDict(
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
>>> print(env.available_envs)
['ALE/Adventure-ram-v5', 'ALE/Adventure-v5', 'ALE/AirRaid-ram-v5', 'ALE/AirRaid-v5', 'ALE/Alien-ram-v5', 'ALE/Alien-v5',
.. note::
info dictionaries will be read using :class:`~torchrl.envs.gym_like.default_info_dict_reader`
if no other reader is provided. To provide another reader, refer to
:meth:`~.set_info_dict_reader`. To automatically register the info_dict
content, refer to :meth:`torchrl.envs.GymLikeEnv.auto_register_info_dict`.
For parallel (Vectorized) environments, the info dictionary reader is automatically set and should
not be set manually.
.. note:: Gym spaces are not completely covered.
The following spaces are accounted for provided that they can be represented by a torch.Tensor, a nested tensor
and/or within a tensordict:
- spaces.Box
- spaces.Sequence
- spaces.Tuple
- spaces.Discrete
- spaces.MultiBinary
- spaces.MultiDiscrete
- spaces.Dict
Some considerations should be made when working with gym spaces. For instance, a tuple of spaces
can only be supported if the spaces are semantically identical (same dtype and same number of dimensions).
Ragged dimension can be supported through :func:`~torch.nested.nested_tensor`, but then there should be only
one level of tuple and data should be stacked along the first dimension (as nested_tensors can only be
stacked along the first dimension).
Check the example in examples/envs/ to know more!
git_url = ""
libname = "gym"
def available_envs(cls):
if not _has_gym:
return []
return list(_get_envs())
def get_library_name(env) -> str:
"""Given a gym environment, returns the backend name (either gym or gymnasium).
This can be used to set the appropriate backend when needed:
>>> env = gymnasium.make("Pendulum-v1")
>>> with set_gym_backend(env):
... env = GymWrapper(env)
:class:`~GymWrapper` and similar use this method to set their method
to the right backend during instantiation.
import gym
if isinstance(env.action_space,
return "gym"
except ImportError:
import gymnasium
if isinstance(env.action_space,
return "gymnasium"
except ImportError:
raise ImportError(
f"Could not find the library of env {env}. Please file an issue on torchrl github repo."
def __init__(self, env=None, categorical_action_encoding=False, **kwargs):
self._seed_calls_reset = None
self._categorical_action_encoding = categorical_action_encoding
if env is not None:
env_str = str(env)
except TypeError:
# MiniGrid has a bug where the __str__ method fails
if (
"EnvCompatibility" in env_str
): # a hacky way of knowing if EnvCompatibility is part of the wrappers of env
raise ValueError(
"GymWrapper does not support the gym.wrapper.compatibility.EnvCompatibility wrapper. "
"If this feature is needed, detail your use case in an issue of "
libname = self.get_library_name(env)
with set_gym_backend(libname):
kwargs["env"] = env
def _post_init(self):
# writes the functions that are gym-version specific to the instance
# once and for all. This is aimed at avoiding the need of decorating code
# with set_gym_backend + allowing for parallel execution (which would
# be troublesome when both an old version of gym and recent gymnasium
# are present within the same virtual env).
# These calls seemingly do nothing but they actually get rid of the @implement_for decorator.
# We execute them within the set_gym_backend context manager to make sure we get
# the right implementation.
# This method is executed by the metaclass of GymWrapper.
with set_gym_backend(self.get_library_name(self._env)):
self._reset_output_transform = self._reset_output_transform
self._output_transform = self._output_transform
def _is_batched(self):
if _has_sb3:
from stable_baselines3.common.vec_env.base_vec_env import VecEnv
tuple_of_classes = (VecEnv,)
tuple_of_classes = ()
return isinstance(
self._env, tuple_of_classes + (gym_backend("vector").VectorEnv,)
def _get_batch_size(self, env):
if hasattr(env, "num_envs"):
batch_size = torch.Size([env.num_envs, *self.batch_size])
batch_size = self.batch_size
return batch_size
@implement_for("gymnasium", None, "1.0.0") # gymnasium wants the unwrapped env
def _get_batch_size(self, env): # noqa: F811
env_unwrapped = env.unwrapped
if hasattr(env_unwrapped, "num_envs"):
batch_size = torch.Size([env_unwrapped.num_envs, *self.batch_size])
batch_size = self.batch_size
return batch_size
@implement_for("gymnasium", "1.0.0")
def _get_batch_size(self, env): # noqa: F811
raise ImportError(GYMNASIUM_1_ERROR)
def _check_kwargs(self, kwargs: Dict):
if "env" not in kwargs:
raise TypeError("Could not find environment key 'env' in kwargs.")
env = kwargs["env"]
if not (hasattr(env, "action_space") and hasattr(env, "observation_space")):
raise TypeError("env is not of type 'gym.Env'.")
def _build_env(
from_pixels: bool = False,
pixels_only: bool = False,
) -> "gym.core.Env": # noqa: F821
self.batch_size = self._get_batch_size(env)
env_from_pixels = _is_from_pixels(env)
from_pixels = from_pixels or env_from_pixels
self.from_pixels = from_pixels
self.pixels_only = pixels_only
if from_pixels and not env_from_pixels:
PixelObservationWrapper = gym_backend(
if isinstance(env, PixelObservationWrapper):
raise TypeError(
"PixelObservationWrapper cannot be used to wrap an environment"
"that is already a PixelObservationWrapper instance."
except ModuleNotFoundError:
env = self._build_gym_env(env, pixels_only)
return env
def read_action(self, action):
action = super().read_action(action)
if isinstance(self.action_spec, (OneHot, Categorical)) and action.size == 1:
# some envs require an integer for indexing
action = int(action)
return action
@implement_for("gym", None, "0.19.0")
def _build_gym_env(self, env, pixels_only): # noqa: F811
from .utils import GymPixelObservationWrapper as PixelObservationWrapper
return PixelObservationWrapper(env, pixels_only=pixels_only)
@implement_for("gym", "0.19.0", "0.26.0")
def _build_gym_env(self, env, pixels_only): # noqa: F811
pixel_observation = gym_backend("wrappers.pixel_observation")
return pixel_observation.PixelObservationWrapper(env, pixels_only=pixels_only)
@implement_for("gym", "0.26.0", None)
def _build_gym_env(self, env, pixels_only): # noqa: F811
compatibility = gym_backend("wrappers.compatibility")
pixel_observation = gym_backend("wrappers.pixel_observation")
if env.render_mode:
return pixel_observation.PixelObservationWrapper(
env, pixels_only=pixels_only
"Environments provided to GymWrapper that need to be wrapped in PixelObservationWrapper "
"should be created with `gym.make(env_name, render_mode=mode)` where possible,"
'where mode is either "rgb_array" or any other supported mode.'
# resetting as 0.26 comes with a very 'nice' OrderEnforcing wrapper
env = compatibility.EnvCompatibility(env)
from torchrl.envs.libs.utils import (
GymPixelObservationWrapper as LegacyPixelObservationWrapper,
return LegacyPixelObservationWrapper(env, pixels_only=pixels_only)
@implement_for("gymnasium", "1.0.0")
def _build_gym_env(self, env, pixels_only): # noqa: F811
raise ImportError(GYMNASIUM_1_ERROR)
@implement_for("gymnasium", None, "1.0.0")
def _build_gym_env(self, env, pixels_only): # noqa: F811
compatibility = gym_backend("wrappers.compatibility")
pixel_observation = gym_backend("wrappers.pixel_observation")
if env.render_mode:
return pixel_observation.PixelObservationWrapper(
env, pixels_only=pixels_only
"Environments provided to GymWrapper that need to be wrapped in PixelObservationWrapper "
"should be created with `gym.make(env_name, render_mode=mode)` where possible,"
'where mode is either "rgb_array" or any other supported mode.'
# resetting as 0.26 comes with a very 'nice' OrderEnforcing wrapper
env = compatibility.EnvCompatibility(env)
from torchrl.envs.libs.utils import (
GymPixelObservationWrapper as LegacyPixelObservationWrapper,
return LegacyPixelObservationWrapper(env, pixels_only=pixels_only)
def lib(self) -> ModuleType:
return gym_backend()
def _set_seed(self, seed: int) -> int: # noqa: F811
if self._seed_calls_reset is None:
# Determine basing on gym version whether `reset` is called when setting seed.
elif self._seed_calls_reset:
return seed
@implement_for("gym", None, "0.15.0")
def _set_seed_initial(self, seed: int) -> None: # noqa: F811
self._seed_calls_reset = False
@implement_for("gym", "0.15.0", "0.19.0")
def _set_seed_initial(self, seed: int) -> None: # noqa: F811
self._seed_calls_reset = False
@implement_for("gym", "0.19.0", None)
def _set_seed_initial(self, seed: int) -> None: # noqa: F811
self._seed_calls_reset = True
except TypeError as err:
f"reset with seed kwarg returned an exception: {err}.\n"
f"Calling env.seed from now on."
self._seed_calls_reset = False
except AttributeError as err2:
raise err from err2
@implement_for("gymnasium", "1.0.0")
def _set_seed_initial(self, seed: int) -> None: # noqa: F811
raise ImportError(GYMNASIUM_1_ERROR)
@implement_for("gymnasium", None, "1.0.0")
def _set_seed_initial(self, seed: int) -> None: # noqa: F811
self._seed_calls_reset = True
except TypeError as err:
f"reset with seed kwarg returned an exception: {err}.\n"
f"Calling env.seed from now on."
self._seed_calls_reset = False
def _reward_space(self, env):
if hasattr(env, "reward_space") and env.reward_space is not None:
return env.reward_space
@implement_for("gymnasium", "1.0.0")
def _reward_space(self, env): # noqa: F811
raise ImportError(GYMNASIUM_1_ERROR)
@implement_for("gymnasium", None, "1.0.0")
def _reward_space(self, env): # noqa: F811
env = env.unwrapped
if hasattr(env, "reward_space") and env.reward_space is not None:
rs = env.reward_space
return rs
def _make_specs(self, env: "gym.Env", batch_size=None) -> None: # noqa: F821
# If batch_size is provided, we se it to tell what batch size must be used
# instead of self.batch_size
cur_batch_size = self.batch_size if batch_size is None else torch.Size([])
action_spec = _gym_to_torchrl_spec_transform(
observation_spec = _gym_to_torchrl_spec_transform(
if not isinstance(observation_spec, Composite):
if self.from_pixels:
observation_spec = Composite(
pixels=observation_spec, shape=cur_batch_size
observation_spec = Composite(
observation=observation_spec, shape=cur_batch_size
elif observation_spec.shape[: len(cur_batch_size)] != cur_batch_size:
observation_spec.shape = cur_batch_size
reward_space = self._reward_space(env)
if reward_space is not None:
reward_spec = _gym_to_torchrl_spec_transform(
reward_spec = Unbounded(
if batch_size is not None:
action_spec = action_spec.expand(*batch_size, *action_spec.shape)
reward_spec = reward_spec.expand(*batch_size, *reward_spec.shape)
observation_spec = observation_spec.expand(
*batch_size, *observation_spec.shape
self.done_spec = self._make_done_spec()
self.action_spec = action_spec
if reward_spec.shape[: len(cur_batch_size)] != cur_batch_size:
self.reward_spec = reward_spec.expand(*cur_batch_size, *reward_spec.shape)
self.reward_spec = reward_spec
self.observation_spec = observation_spec
@implement_for("gym", None, "0.26")
def _make_done_spec(self): # noqa: F811
return Composite(
"done": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
"terminated": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
"truncated": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
@implement_for("gym", "0.26", None)
def _make_done_spec(self): # noqa: F811
return Composite(
"done": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
"terminated": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
"truncated": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
@implement_for("gymnasium", "0.27", None)
def _make_done_spec(self): # noqa: F811
return Composite(
"done": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
"terminated": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
"truncated": Categorical(
2, dtype=torch.bool, device=self.device, shape=(*self.batch_size, 1)
@implement_for("gym", None, "0.26")
def _reset_output_transform(self, reset_data): # noqa: F811
if (
isinstance(reset_data, tuple)
and len(reset_data) == 2
and isinstance(reset_data[1], dict)
return reset_data
return reset_data, None
@implement_for("gym", "0.26", None)
def _reset_output_transform(self, reset_data): # noqa: F811
return reset_data
@implement_for("gymnasium", "0.27", None)
def _reset_output_transform(self, reset_data): # noqa: F811
return reset_data
@implement_for("gym", None, "0.24")
def _output_transform(self, step_outputs_tuple): # noqa: F811
observations, reward, done, info = step_outputs_tuple
if self._is_batched:
# info needs to be flipped
info = _flip_info_tuple(info)
# The variable naming follows torchrl's convention here.
# A done is interpreted the union of terminated and truncated.
# (as in earlier versions of gym).
truncated = info.pop("TimeLimit.truncated", False)
if not isinstance(done, bool) and isinstance(truncated, bool):
# if bool is an array, make truncated an array
truncated = [truncated] * len(done)
truncated = np.array(truncated)
elif not isinstance(truncated, bool):
# make sure it's a boolean np.array
truncated = np.array(truncated, dtype=np.dtype("bool"))
terminated = done & ~truncated
if not isinstance(terminated, np.ndarray):
# if it's not a ndarray, we must return bool
# since it's not a bool, we make it so
terminated = bool(terminated)
if isinstance(observations, list) and len(observations) == 1:
# Until gym 0.25.2 we had rendered frames returned in lists of length 1
observations = observations[0]
return (observations, reward, terminated, truncated, done, info)
@implement_for("gym", "0.24", "0.26")
def _output_transform(self, step_outputs_tuple): # noqa: F811
observations, reward, done, info = step_outputs_tuple
# The variable naming follows torchrl's convention here.
# A done is interpreted the union of terminated and truncated.
# (as in earlier versions of gym).
truncated = info.pop("TimeLimit.truncated", False)
if not isinstance(done, bool) and isinstance(truncated, bool):
# if bool is an array, make truncated an array
truncated = [truncated] * len(done)
truncated = np.array(truncated)
elif not isinstance(truncated, bool):
# make sure it's a boolean np.array
truncated = np.array(truncated, dtype=np.dtype("bool"))
terminated = done & ~truncated
if not isinstance(terminated, np.ndarray):
# if it's not a ndarray, we must return bool
# since it's not a bool, we make it so
terminated = bool(terminated)
if isinstance(observations, list) and len(observations) == 1:
# Until gym 0.25.2 we had rendered frames returned in lists of length 1
observations = observations[0]
return (observations, reward, terminated, truncated, done, info)
@implement_for("gym", "0.26", None)
def _output_transform(self, step_outputs_tuple): # noqa: F811
# The variable naming follows torchrl's convention here.
observations, reward, terminated, truncated, info = step_outputs_tuple
return (
terminated | truncated,
@implement_for("gymnasium", "0.27", None)
def _output_transform(self, step_outputs_tuple): # noqa: F811
# The variable naming follows torchrl's convention here.
observations, reward, terminated, truncated, info = step_outputs_tuple
return (
terminated | truncated,
def _init_env(self):
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(env={self._env}, batch_size={self.batch_size})"
def rebuild_with_kwargs(self, **new_kwargs):
self._env = self._build_env(**self._constructor_kwargs)
def _reset(
self, tensordict: TensorDictBase | None = None, **kwargs
) -> TensorDictBase:
if self._is_batched:
# batched (aka 'vectorized') env reset is a bit special: envs are
# automatically reset. What we do here is just to check if _reset
# is present. If it is not, we just reset. Otherwise, we just skip.
if tensordict is None:
return super()._reset(tensordict, **kwargs)
reset = tensordict.get("_reset", None)
if reset is not None:
# we must copy the tensordict because the transform
# expects a tuple (tensordict, tensordict_reset) where the
# first still carries a _reset
tensordict = tensordict.exclude("_reset")
if reset is None or reset.all():
return super()._reset(tensordict, **kwargs)
return tensordict
return super()._reset(tensordict, **kwargs)
"render_mode": "__init__() got an unexpected keyword argument 'render_mode'",
"frame_skip": "unexpected keyword argument 'frameskip'",
[docs]class GymEnv(GymWrapper):
"""OpenAI Gym environment wrapper constructed by environment ID directly.
Works accross `gymnasium <>`_ and `OpenAI/gym <>`_.
env_name (str): the environment id registered in `gym.registry`.
categorical_action_encoding (bool, optional): if ``True``, categorical
specs will be converted to the TorchRL equivalent (:class:``),
otherwise a one-hot encoding will be used (:class:``).
Defaults to ``False``.
Keyword Args:
num_envs (int, optional): the number of envs to run in parallel. Defaults to
``None`` (a single env is to be run). :class:`~gym.vector.AsyncVectorEnv`
will be used by default.
disable_env_checker (bool, optional): for gym > 0.24 only. If ``True`` (default
for these versions), the environment checker won't be run.
from_pixels (bool, optional): if ``True``, an attempt to return the pixel
observations from the env will be performed. By default, these observations
will be written under the ``"pixels"`` entry.
The method being used varies
depending on the gym version and may involve a ``wrappers.pixel_observation.PixelObservationWrapper``.
Defaults to ``False``.
pixels_only (bool, optional): if ``True``, only the pixel observations will
be returned (by default under the ``"pixels"`` entry in the output tensordict).
If ``False``, observations (eg, states) and pixels will be returned
whenever ``from_pixels=True``. Defaults to ``False``.
frame_skip (int, optional): if provided, indicates for how many steps the
same action is to be repeated. The observation returned will be the
last observation of the sequence, whereas the reward will be the sum
of rewards across steps.
device (torch.device, optional): if provided, the device on which the data
is to be cast. Defaults to ``torch.device("cpu")``.
batch_size (torch.Size, optional): the batch size of the environment.
Should match the leading dimensions of all observations, done states,
rewards, actions and infos.
Defaults to ``torch.Size([])``.
allow_done_after_reset (bool, optional): if ``True``, it is tolerated
for envs to be ``done`` just after :meth:`~.reset` is called.
Defaults to ``False``.
available_envs (List[str]): the list of envs that can be built.
.. note::
If an attribute cannot be found, this class will attempt to retrieve it from
the nested env:
>>> from torchrl.envs import GymEnv
>>> env = GymEnv("Pendulum-v1")
>>> print(env.spec.max_episode_steps)
If a use-case is not covered by TorchRL, please submit an issue on GitHub.
>>> from torchrl.envs import GymEnv
>>> env = GymEnv("Pendulum-v1")
>>> td = env.rand_step()
>>> print(td)
action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
next: TensorDict(
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
>>> print(env.available_envs)
['ALE/Adventure-ram-v5', 'ALE/Adventure-v5', 'ALE/AirRaid-ram-v5', 'ALE/AirRaid-v5', 'ALE/Alien-ram-v5', 'ALE/Alien-v5',
.. note::
If both `OpenAI/gym` and `gymnasium` are present in the virtual environment,
one can swap backend using :func:`~torchrl.envs.libs.gym.set_gym_backend`:
>>> from torchrl.envs import set_gym_backend, GymEnv
>>> with set_gym_backend("gym"):
... env = GymEnv("Pendulum-v1")
... print(env._env)
<class 'gym.wrappers.time_limit.TimeLimit'>
>>> with set_gym_backend("gymnasium"):
... env = GymEnv("Pendulum-v1")
... print(env._env)
<class 'gymnasium.wrappers.time_limit.TimeLimit'>
.. note::
info dictionaries will be read using :class:`~torchrl.envs.gym_like.default_info_dict_reader`
if no other reader is provided. To provide another reader, refer to
:meth:`~.set_info_dict_reader`. To automatically register the info_dict
content, refer to :meth:`torchrl.envs.GymLikeEnv.auto_register_info_dict`.
.. note:: Gym spaces are not completely covered.
The following spaces are accounted for provided that they can be represented by a torch.Tensor, a nested tensor
and/or within a tensordict:
- spaces.Box
- spaces.Sequence
- spaces.Tuple
- spaces.Discrete
- spaces.MultiBinary
- spaces.MultiDiscrete
- spaces.Dict
Some considerations should be made when working with gym spaces. For instance, a tuple of spaces
can only be supported if the spaces are semantically identical (same dtype and same number of dimensions).
Ragged dimension can be supported through :func:`~torch.nested.nested_tensor`, but then there should be only
one level of tuple and data should be stacked along the first dimension (as nested_tensors can only be
stacked along the first dimension).
Check the example in examples/envs/ to know more!
def __init__(self, env_name, **kwargs):
kwargs["env_name"] = env_name
@implement_for("gym", None, "0.24.0")
def _set_gym_args(self, kwargs) -> None: # noqa: F811
disable_env_checker = kwargs.pop("disable_env_checker", None)
if disable_env_checker is not None:
raise RuntimeError(
"disable_env_checker should only be set if gym version is > 0.24"
@implement_for("gym", "0.24.0", None)
def _set_gym_args( # noqa: F811
) -> None:
kwargs.setdefault("disable_env_checker", True)
@implement_for("gymnasium", "1.0.0")
def _set_gym_args( # noqa: F811
) -> None:
raise ImportError(GYMNASIUM_1_ERROR)
@implement_for("gymnasium", None, "1.0.0")
def _set_gym_args( # noqa: F811
) -> None:
kwargs.setdefault("disable_env_checker", True)
def _async_env(self, *args, **kwargs):
return gym_backend("vector").AsyncVectorEnv(*args, **kwargs)
def _build_env(
env_name: str,
) -> "gym.core.Env": # noqa: F821
if not _has_gym:
raise RuntimeError(
f"gym not found, unable to create {env_name}. "
f"Consider downloading and installing gym from"
f" {self.git_url}"
from_pixels = kwargs.pop("from_pixels", False)
self._set_gym_default(kwargs, from_pixels)
pixels_only = kwargs.pop("pixels_only", True)
num_envs = kwargs.pop("num_envs", 0)
made_env = False
kwargs["frameskip"] = self.frame_skip
self.wrapper_frame_skip = 1
while not made_env:
# env.__init__ may not be compatible with all the kwargs that
# have been preset. We iterate through the various solutions
# to find the config that works.
with warnings.catch_warnings(record=True) as w:
# we catch warnings as they may cause silent bugs
env = self.lib.make(env_name, **kwargs)
if len(w) and "frameskip" in str(w[-1].message):
raise TypeError("unexpected keyword argument 'frameskip'")
made_env = True
except TypeError as err:
if ACCEPTED_TYPE_ERRORS["frame_skip"] in str(err):
# we can disable this, not strictly indispensable to know
# warn(
# "Discarding frameskip arg. This will be taken care of by TorchRL env wrapper."
# )
self.wrapper_frame_skip = kwargs.pop("frameskip")
elif ACCEPTED_TYPE_ERRORS["render_mode"] in str(err):
warn("Discarding render_mode from the env constructor.")
raise err
env = super()._build_env(env, pixels_only=pixels_only, from_pixels=from_pixels)
if num_envs > 0:
env = self._async_env([CloudpickleWrapper(lambda: env)] * num_envs)
except RuntimeError:
# It would fail if the environment is not pickable. In that case,
# delegating environment instantiation to each subprocess as a fallback.
env = self._async_env(
[lambda: self.lib.make(env_name, **kwargs)] * num_envs
self.batch_size = torch.Size([num_envs, *self.batch_size])
return env
@implement_for("gym", None, "0.25.1")
def _set_gym_default(self, kwargs, from_pixels: bool) -> None: # noqa: F811
# Do nothing for older gym versions.
@implement_for("gym", "0.25.1", None)
def _set_gym_default(self, kwargs, from_pixels: bool) -> None: # noqa: F811
if from_pixels:
kwargs.setdefault("render_mode", "rgb_array")
@implement_for("gymnasium", "0.27.0", None)
def _set_gym_default(self, kwargs, from_pixels: bool) -> None: # noqa: F811
if from_pixels:
kwargs.setdefault("render_mode", "rgb_array")
def env_name(self):
return self._constructor_kwargs["env_name"]
def _check_kwargs(self, kwargs: Dict):
if "env_name" not in kwargs:
raise TypeError("Expected 'env_name' to be part of kwargs")
def __repr__(self) -> str:
return f"{self.__class__.__name__}(env={self.env_name}, batch_size={self.batch_size}, device={self.device})"
[docs]class MOGymWrapper(GymWrapper):
"""FARAMA MO-Gymnasium environment wrapper.
>>> import mo_gymnasium as mo_gym
>>> env = MOGymWrapper(mo_gym.make('minecart-v0'), frame_skip=4)
>>> td = env.rand_step()
>>> print(td)
git_url = ""
libname = "mo-gymnasium"
_make_specs = set_gym_backend("gymnasium")(GymEnv._make_specs)
def available_envs(cls):
if not _has_mo:
return []
return [
[docs]class MOGymEnv(GymEnv):
"""FARAMA MO-Gymnasium environment wrapper.
>>> env = MOGymEnv(env_name="minecart-v0", frame_skip=4)
>>> td = env.rand_step()
>>> print(td)
>>> print(env.available_envs)
git_url = ""
libname = "mo-gymnasium"
available_envs = MOGymWrapper.available_envs
def lib(self) -> ModuleType:
if _has_mo:
import mo_gymnasium as mo_gym
return mo_gym
import mo_gymnasium # noqa: F401
except ImportError as err:
raise ImportError("MO-gymnasium not found, check installation") from err
_make_specs = set_gym_backend("gymnasium")(GymEnv._make_specs)
class terminal_obs_reader(default_info_dict_reader):
"""Terminal observation reader for 'vectorized' gym environments.
When running envs in parallel, Gym(nasium) writes the result of the true call
to `step` in `"final_observation"` entry within the `info` dictionary.
This breaks the natural flow and makes single-processed and multiprocessed envs
This class reads the info obs, removes the `"final_observation"` from
the env and writes its content in the data.
Next, a :class:`torchrl.envs.VecGymEnvTransform` transform will reorganise the
data by caching the result of the (implicit) reset and swap the true next
observation with the reset one. At reset time, the true reset data will be
observation_spec (Composite): The observation spec of the gym env.
backend (str, optional): the backend of the env. One of `"sb3"` for
stable-baselines3 or `"gym"` for gym/gymnasium.
.. note:: In general, this class should not be handled directly. It is
created whenever a vectorized environment is placed within a :class:`GymWrapper`.
backend_key = {
"sb3": "terminal_observation",
"gym": "final_observation",
backend_info_key = {
"sb3": "terminal_info",
"gym": "final_info",
def __init__(self, observation_spec: Composite, backend, name="final"):
super().__init__() = name
self._obs_spec = observation_spec.clone()
self.backend = backend
self._final_validated = False
def info_spec(self):
return self._info_spec
def _read_obs(self, obs, key, tensor, index):
if obs is None:
if isinstance(obs, np.ndarray):
# Simplest case: there is one observation,
# presented as a np.ndarray. The key should be pixels or observation.
# We just write that value at its location in the tensor
tensor[index] = torch.as_tensor(obs, device=tensor.device)
if isinstance(obs, torch.Tensor):
# Simplest case: there is one observation,
# presented as a np.ndarray. The key should be pixels or observation.
# We just write that value at its location in the tensor
tensor[index] =
elif isinstance(obs, dict):
if key not in obs:
raise KeyError(
f"The observation {key} could not be found in the final observation dict."
subobs = obs[key]
if subobs is not None:
# if the obs is a dict, we expect that the key points also to
# a value in the obs. We retrieve this value and write it in the
# tensor
tensor[index] = torch.as_tensor(subobs, device=tensor.device)
elif isinstance(obs, (list, tuple)):
# tuples are stacked along the first dimension when passing gym spaces
# to torchrl specs. As such, we can simply stack the tuple and set it
# at the relevant index (assuming stacking can be achieved)
tensor[index] = torch.as_tensor(obs, device=tensor.device)
raise NotImplementedError(
f"Observations of type {type(obs)} are not supported yet."
def __call__(self, info_dict, tensordict):
def replace_none(nparray):
if not isinstance(nparray, np.ndarray) or nparray.dtype != np.dtype("O"):
return nparray
is_none = np.array([info is None for info in nparray])
if is_none.any():
# Then it is a final observation and we delegate the registration to the appropriate reader
nz = (~is_none).nonzero()[0][0]
zero_like = tree_map(lambda x: np.zeros_like(x), nparray[nz])
for idx in is_none.nonzero()[0]:
nparray[idx] = zero_like
return tree_map(lambda *x: np.stack(x), *nparray)
info_dict = tree_map(replace_none, info_dict)
# convert info_dict to a tensordict
info_dict = TensorDict(info_dict)
# get the terminal observation
terminal_obs = info_dict.pop(self.backend_key[self.backend], None)
# get the terminal info dict
terminal_info = info_dict.pop(self.backend_info_key[self.backend], None)
if terminal_info is None:
terminal_info = {}
super().__call__(info_dict, tensordict)
if not self._final_validated:
self.info_spec[] = self._obs_spec.update(self.info_spec)
self._final_validated = True
final_info = terminal_info.copy()
if terminal_obs is not None:
final_info["observation"] = terminal_obs
for key in self.info_spec[].keys():
spec = self.info_spec[, key]
final_obs_buffer =
terminal_obs = final_info.get(key, None)
if terminal_obs is not None:
for i, obs in enumerate(terminal_obs):
# writes final_obs inplace with terminal_obs content
self._read_obs(obs, key, final_obs_buffer, index=i)
tensordict.set((, key), final_obs_buffer)
return tensordict
def reset(self):
self._final_validated = False
def _flip_info_tuple(info: Tuple[Dict]) -> Dict[str, tuple]:
# In Gym < 0.24, batched envs returned tuples of dict, and not dict of tuples.
# We patch this by flipping the tuple -> dict order.
info_example = set(info[0])
for item in info[1:]:
info_example = info_example.union(item)
result = {}
for key in info_example:
result[key] = tuple(_info.get(key, None) for _info in info)
return result