Shortcuts

Source code for torchrl.envs.libs.robohive

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import importlib
import os
import warnings

from copy import copy
from pathlib import Path

import numpy as np
import torch
from tensordict import TensorDict
from torchrl.data.tensor_specs import Unbounded
from torchrl.envs.libs.gym import (
    _AsyncMeta,
    _gym_to_torchrl_spec_transform,
    gym_backend,
    GymEnv,
)
from torchrl.envs.utils import _classproperty, make_composite_from_td

_has_gym = (
    importlib.util.find_spec("gym") is not None
    or importlib.util.find_spec("gymnasium") is not None
)
_has_robohive = importlib.util.find_spec("robohive") is not None and _has_gym

if _has_robohive:
    os.environ.setdefault("sim_backend", "MUJOCO")


class set_directory(object):
    """Sets the cwd within the context.

    Args:
        path (Path): The path to the cwd
    """

    def __init__(self, path: Path):
        self.path = path
        self.origin = Path().absolute()

    def __enter__(self):
        os.chdir(self.path)

    def __exit__(self, *args, **kwargs):
        os.chdir(self.origin)

    def __call__(self, fun):
        def new_fun(*args, **kwargs):
            with set_directory(Path(self.path)):
                return fun(*args, **kwargs)

        return new_fun


class _RoboHiveBuild(_AsyncMeta):
    def __call__(self, *args, **kwargs):
        instance: RoboHiveEnv = super().__call__(*args, **kwargs)
        instance._refine_specs()
        return instance


[docs]class RoboHiveEnv(GymEnv, metaclass=_RoboHiveBuild): """A wrapper for RoboHive gym environments. RoboHive is a collection of environments/tasks simulated with the MuJoCo physics engine exposed using the OpenAI-Gym API. Github: https://github.com/vikashplus/robohive/ Doc: https://github.com/vikashplus/robohive/wiki Paper: https://arxiv.org/abs/2310.06828 .. warning:: RoboHive requires gym 0.13. Args: env_name (str): the environment name to build. Must be one of :attr:`.available_envs` categorical_action_encoding (bool, optional): if ``True``, categorical specs will be converted to the TorchRL equivalent (:class:`torchrl.data.Categorical`), otherwise a one-hot encoding will be used (:class:`torchrl.data.OneHot`). Defaults to ``False``. Keyword Args: from_pixels (bool, optional): if ``True``, an attempt to return the pixel observations from the env will be performed. By default, these observations will be written under the ``"pixels"`` entry. The method being used varies depending on the gym version and may involve a ``wrappers.pixel_observation.PixelObservationWrapper``. Defaults to ``False``. pixels_only (bool, optional): if ``True``, only the pixel observations will be returned (by default under the ``"pixels"`` entry in the output tensordict). If ``False``, observations (eg, states) and pixels will be returned whenever ``from_pixels=True``. Defaults to ``True``. from_depths (bool, optional): if ``True``, an attempt to return the depth observations from the env will be performed. By default, these observations will be written under the ``"depths"`` entry. Requires ``from_pixels`` to be ``True``. Defaults to ``False``. frame_skip (int, optional): if provided, indicates for how many steps the same action is to be repeated. The observation returned will be the last observation of the sequence, whereas the reward will be the sum of rewards across steps. device (torch.device, optional): if provided, the device on which the data is to be cast. Defaults to ``torch.device("cpu")``. batch_size (torch.Size, optional): Only ``torch.Size([])`` will work with ``RoboHiveEnv`` since vectorized environments are not supported within the class. To execute more than one environment at a time, see :class:`~torchrl.envs.ParallelEnv`. allow_done_after_reset (bool, optional): if ``True``, it is tolerated for envs to be ``done`` just after :meth:`~.reset` is called. Defaults to ``False``. Attributes: available_envs (list): a list of available envs to build. Examples: >>> from torchrl.envs import RoboHiveEnv >>> env = RoboHiveEnv(RoboHiveEnv.available_envs[0]) >>> env.rollout(3) """ env_list = [] @_classproperty def CURR_DIR(cls): if _has_robohive: import robohive.envs.multi_task.substeps1 return robohive.envs.multi_task.substeps1.CURR_DIR else: return None @_classproperty def available_envs(cls): if not _has_robohive: return [] cls.register_envs() return cls.env_list @classmethod def register_envs(cls): if not _has_robohive: raise ImportError( "Cannot load robohive from the current virtual environment." ) from robohive import robohive_env_suite as robohive_envs from robohive.utils.prompt_utils import Prompt, set_prompt_verbosity set_prompt_verbosity(Prompt.WARN) cls.env_list += robohive_envs if not len(robohive_envs): raise RuntimeError("did not load any environment.") def _build_env( # noqa: F811 self, env_name: str, from_pixels: bool = False, pixels_only: bool = False, from_depths: bool = False, **kwargs, ) -> "gym.core.Env": # noqa: F821 if from_pixels: if "cameras" not in kwargs: warnings.warn( "from_pixels=True will lead to a registration of ALL available cameras, " "which may lead to performance issue. " "Consider passing only the needed cameras through cameras=list_of_cameras. " "The list of available cameras for a specific environment can be obtained via " "RobohiveEnv.get_available_cams(env_name)." ) kwargs["cameras"] = self.get_available_cams(env_name) cams = list(kwargs.pop("cameras")) env_name = self.register_visual_env( cams=cams, env_name=env_name, from_depths=from_depths ) elif "cameras" in kwargs and kwargs["cameras"]: raise RuntimeError("Got a list of cameras but from_pixels is set to False.") self.pixels_only = pixels_only try: render_device = int(str(self.device)[-1]) except ValueError: render_device = 0 if not _has_robohive: raise ImportError( f"gym/robohive not found, unable to create {env_name}. " f"Consider downloading and installing dm_control from" f" {self.git_url}" ) try: env = self.lib.make( env_name, frameskip=self.frame_skip, device_id=render_device, return_dict=True, **kwargs, ) self.wrapper_frame_skip = 1 except TypeError as err: if "unexpected keyword argument 'frameskip" not in str(err): raise err kwargs.pop("framek_skip") env = self.lib.make( env_name, return_dict=True, device_id=render_device, **kwargs ) self.wrapper_frame_skip = self.frame_skip # except Exception as err: # raise RuntimeError(f"Failed to build env {env_name}.") from err self.from_pixels = from_pixels self.from_depths = from_depths self.render_device = render_device if kwargs.get("read_info", True): self.set_info_dict_reader(self.read_info) return env def _make_specs(self, env: "gym.Env", batch_size=None) -> None: # noqa: F821 out = super()._make_specs(env=env, batch_size=batch_size) self.env.reset() *_, info = self.env.step(self.env.action_space.sample()) info = self.read_info(info, TensorDict({}, [])) info = info.get("info") self.observation_spec["info"] = make_composite_from_td(info) return out @classmethod def register_visual_env(cls, env_name, cams, from_depths): with set_directory(cls.CURR_DIR): from robohive.envs.env_variants import register_env_variant if not len(cams): raise RuntimeError("Cannot create a visual envs without cameras.") cams = sorted(cams) cams_rep = [i.replace("A:", "A_") for i in cams] new_env_name = "-".join([cam[:-3] for cam in cams_rep] + [env_name]) visual_keys = [f"rgb:{c}:224x224:2d" for c in cams] if from_depths: visual_keys.extend([f"d:{c}:224x224:2d" for c in cams]) register_env_variant( env_name, variants={ "visual_keys": visual_keys, }, variant_id=new_env_name, ) env_name = new_env_name cls.env_list += [env_name] return env_name def _refine_specs(self) -> None: # noqa: F821 env = self._env self.action_spec = _gym_to_torchrl_spec_transform( env.action_space, device=self.device ) # get a np rollout rollout = TensorDict({"done": torch.zeros(3, 1)}, [3]) env.reset() def get_obs(): _dict = {} obs_dict = copy(env.obs_dict) if self.from_pixels: visual = self.env.get_exteroception() obs_dict.update(visual) pixel_list, depth_list = [], [] for obs_key in obs_dict: if obs_key.startswith("rgb"): pix = obs_dict[obs_key] if not pix.shape[0] == 1: pix = pix[None] pixel_list.append(pix) elif obs_key.startswith("d:"): dep = obs_dict[obs_key] dep = dep[None] depth_list.append(dep) elif obs_key in env.obs_keys: value = env.obs_dict[obs_key] if not value.shape: value = value[None] _dict[obs_key] = value if pixel_list: _dict["pixels"] = np.concatenate(pixel_list, 0) if depth_list: _dict["depths"] = np.concatenate(depth_list, 0) return _dict for i in range(3): _dict = {} _dict.update(get_obs()) _dict["action"] = action = env.action_space.sample() _, r, trunc, term, done, _ = self._output_transform(env.step(action)) _dict[("next", "reward")] = r.reshape(1) _dict[("next", "done")] = [1] _dict[("next", "terminated")] = [1] _dict[("next", "truncated")] = [1] _dict["next"] = get_obs() rollout[i] = TensorDict(_dict, []) observation_spec = make_composite_from_td( rollout.get("next").exclude("done", "reward", "terminated", "truncated")[0] ) self.observation_spec = observation_spec self.reward_spec = Unbounded( shape=(1,), device=self.device, ) # default rollout = self.rollout(2, return_contiguous=False).get("next") rollout = rollout.exclude( self.reward_key, *self.done_keys, *self.observation_spec.keys(True, True) ) rollout = rollout[..., 0] spec = make_composite_from_td(rollout) self.observation_spec.update(spec) self.empty_cache() def _reset_output_transform(self, reset_data): if not (isinstance(reset_data, tuple) and len(reset_data) == 2): return reset_data, {} return reset_data def set_from_pixels(self, from_pixels: bool) -> None: """Sets the from_pixels attribute to an existing environment. Args: from_pixels (bool): new value for the from_pixels attribute """ if from_pixels is self.from_pixels: return self.from_pixels = from_pixels self._refine_specs() def read_obs(self, observation): # the info is missing from the reset observations = self.env.obs_dict try: del observations["t"] except KeyError: pass # recover vec obsdict = {} pixel_list, depth_list = [], [] if self.from_pixels: visual = self.env.get_exteroception() observations.update(visual) for key in observations: if key.startswith("rgb"): pix = observations[key] if not pix.shape[0] == 1: pix = pix[None] pixel_list.append(pix) elif key.startswith("d:"): dep = observations[key] dep = dep[None] depth_list.append(dep) elif key in self._env.obs_keys: value = observations[key] if not value.shape: value = value[None] obsdict[key] = value # ravel helps with images # if obsvec: # obsvec = np.concatenate(obsvec, 0) if self.from_pixels: obsdict.update({"pixels": np.concatenate(pixel_list, 0)}) if self.from_pixels and self.from_depths: obsdict.update({"depths": np.concatenate(depth_list, 0)}) out = obsdict return super().read_obs(out) def read_info(self, info, tensordict_out): if not info: info_spec = self.observation_spec.get("info", None) if info_spec is None: return tensordict_out tensordict_out.set("info", info_spec.zero()) return tensordict_out out = ( TensorDict(info, []) .filter_non_tensor_data() .exclude("obs_dict", "done", "reward", *self._env.obs_keys, "act") .apply(lambda x: x, filter_empty=True) ) if "info" in self.observation_spec.keys(): info_spec = self.observation_spec["info"] def func(name, x): spec = info_spec.get(name, None) if spec is None: return None return x.reshape(info_spec[name].shape) out.update(out.named_apply(func, nested_keys=True, filter_empty=True)) else: out.update( out.apply( lambda x: x.reshape((1,)) if not x.shape else x, filter_empty=True ) ) tensordict_out.set("info", out) return tensordict_out def _init_env(self): pass def to(self, *args, **kwargs): out = super().to(*args, **kwargs) try: render_device = int(str(out.device)[-1]) except ValueError: render_device = 0 if render_device != self.render_device: out._build_env(**self._constructor_kwargs) return out @classmethod def get_available_cams(cls, env_name): env = gym_backend().make(env_name) cams = [env.sim.model.id2name(ic, 7) for ic in range(env.sim.model.ncam)] return cams

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources