Source code for torchrl.envs.libs.isaacgym

# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations

import importlib.util

import itertools
import warnings
from typing import Any, Dict, Tuple, Union

import numpy as np
import torch

from tensordict import TensorDictBase
from torchrl.envs.libs.gym import GymWrapper
from torchrl.envs.utils import _classproperty, make_composite_from_td

_has_isaac = importlib.util.find_spec("isaacgym") is not None

[docs]class IsaacGymWrapper(GymWrapper): """Wrapper for IsaacGymEnvs environments. The original library can be found `here <>`_ and is based on IsaacGym which can be downloaded `through NVIDIA's webpage <>_`. .. note:: IsaacGym environments cannot be executed consecutively, ie. instantiating one environment after another (even if it has been cleared) will cause CUDA memory issues. We recommend creating one environment per process only. If you need more than one environment, the best way to achieve that is to spawn them across processes. .. note:: IsaacGym works on CUDA devices by essence. Make sure your machine has GPUs available and the required setup for IsaacGym (eg, Ubuntu 20.04). """ @property def lib(self): import isaacgym return isaacgym def __init__( self, env: "isaacgymenvs.tasks.base.vec_task.Env", **kwargs # noqa: F821 ): warnings.warn( "IsaacGym environment support is an experimental feature that may change in the future." ) num_envs = env.num_envs super().__init__( env, torch.device(env.device), batch_size=torch.Size([num_envs]), **kwargs ) if not hasattr(self, "task"): # by convention in IsaacGymEnvs self.task = env.__name__ def _make_specs(self, env: "gym.Env") -> None: # noqa: F821 super()._make_specs(env, batch_size=self.batch_size) self.full_done_spec = { key: spec.squeeze(-1) for key, spec in self.full_done_spec.items(True, True) } self.observation_spec["obs"] = self.observation_spec["observation"] del self.observation_spec["observation"] data = self.rollout(3).get("next")[..., 0] del data[self.reward_key] for done_key in self.done_keys: try: del data[done_key] except KeyError: continue specs = make_composite_from_td(data) obs_spec = self.observation_spec obs_spec.unlock_() obs_spec.update(specs) obs_spec.lock_() self.__dict__["full_observation_spec"] = obs_spec @classmethod def _make_envs(cls, *, task, num_envs, device, seed=None, headless=True, **kwargs): import isaacgym # noqa import isaacgymenvs # noqa envs = isaacgymenvs.make( seed=seed, task=task, num_envs=num_envs, sim_device=str(device), rl_device=str(device), headless=headless, **kwargs, ) return envs def _set_seed(self, seed: int) -> int: # as of #665c32170d84b4be66722eea405a1e08b6e7f761 the seed points nowhere in gym.make for IsaacGymEnvs return seed def read_action(self, action): """Reads the action obtained from the input TensorDict and transforms it in the format expected by the contained environment. Args: action (Tensor or TensorDict): an action to be taken in the environment Returns: an action in a format compatible with the contained environment. """ return action def read_done( self, terminated: bool = None, truncated: bool | None = None, done: bool | None = None, ) -> Tuple[bool, bool, bool]: if terminated is not None: terminated = terminated.bool() if truncated is not None: truncated = truncated.bool() if done is not None: done = done.bool() return terminated, truncated, done, done.any() def read_reward(self, total_reward, step_reward): """Reads a reward and the total reward so far (in the frame skip loop) and returns a sum of the two. Args: total_reward (torch.Tensor or TensorDict): total reward so far in the step step_reward (reward in the format provided by the inner env): reward of this particular step """ return total_reward + step_reward def read_obs( self, observations: Union[Dict[str, Any], torch.Tensor, np.ndarray] ) -> Dict[str, Any]: """Reads an observation from the environment and returns an observation compatible with the output TensorDict. Args: observations (observation under a format dictated by the inner env): observation to be read. """ if isinstance(observations, dict): if "state" in observations and "observation" not in observations: # we rename "state" in "observation" as "observation" is the conventional name # for single observation in torchrl. # naming it 'state' will result in envs that have a different name for the state vector # when queried with and without pixels observations["observation"] = observations.pop("state") if not isinstance(observations, (TensorDictBase, dict)): (key,) = itertools.islice(self.observation_spec.keys(True, True), 1) observations = {key: observations} return observations
[docs]class IsaacGymEnv(IsaacGymWrapper): """A TorchRL Env interface for IsaacGym environments. See :class:`~.IsaacGymWrapper` for more information. Examples: >>> env = IsaacGymEnv(task="Ant", num_envs=2000, device="cuda:0") >>> rollout = env.rollout(3) >>> assert env.batch_size == (2000,) """ @_classproperty def available_envs(cls): if not _has_isaac: return [] import isaacgymenvs # noqa return list(isaacgymenvs.tasks.isaacgym_task_map.keys()) def __init__(self, task=None, *, env=None, num_envs, device, **kwargs): if env is not None and task is not None: raise RuntimeError("Cannot provide both `task` and `env` arguments.") elif env is not None: task = env envs = self._make_envs(task=task, num_envs=num_envs, device=device, **kwargs) self.task = task super().__init__(envs, **kwargs)


Access comprehensive developer documentation for PyTorch

View Docs


Get in-depth tutorials for beginners and advanced developers

View Tutorials


Find development resources and get your questions answered

View Resources