Source code for torchrl.envs.libs.jumanji

# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import importlib.util

from typing import Dict, Optional, Tuple, Union

import numpy as np
import torch
from tensordict.tensordict import TensorDict, TensorDictBase
from torchrl.envs.utils import _classproperty

_has_jumanji = importlib.util.find_spec("jumanji") is not None

from import (
from import numpy_to_torch_dtype_dict
from torchrl.envs.gym_like import GymLikeEnv

from torchrl.envs.libs.jax_utils import (

def _get_envs():
    if not _has_jumanji:
        raise ImportError("Jumanji is not installed in your virtual environment.")
    import jumanji

    return jumanji.registered_environments()

def _jumanji_to_torchrl_spec_transform(
    dtype: Optional[torch.dtype] = None,
    device: DEVICE_TYPING = None,
    categorical_action_encoding: bool = True,
) -> TensorSpec:
    import jumanji

    if isinstance(spec, jumanji.specs.DiscreteArray):
        action_space_cls = (
            if categorical_action_encoding
            else OneHotDiscreteTensorSpec
        if dtype is None:
            dtype = numpy_to_torch_dtype_dict[spec.dtype]
        return action_space_cls(spec.num_values, dtype=dtype, device=device)
    elif isinstance(spec, jumanji.specs.BoundedArray):
        shape = spec.shape
        if dtype is None:
            dtype = numpy_to_torch_dtype_dict[spec.dtype]
        return BoundedTensorSpec(
    elif isinstance(spec, jumanji.specs.Array):
        shape = spec.shape
        if dtype is None:
            dtype = numpy_to_torch_dtype_dict[spec.dtype]
        if dtype in (torch.float, torch.double, torch.half):
            return UnboundedContinuousTensorSpec(
                shape=shape, dtype=dtype, device=device
            return UnboundedDiscreteTensorSpec(shape=shape, dtype=dtype, device=device)
    elif isinstance(spec, jumanji.specs.Spec) and hasattr(spec, "__dict__"):
        new_spec = {}
        for key, value in spec.__dict__.items():
            if isinstance(value, jumanji.specs.Spec):
                if key.endswith("_obs"):
                    key = key[:-4]
                if key.endswith("_spec"):
                    key = key[:-5]
                new_spec[key] = _jumanji_to_torchrl_spec_transform(
                    value, dtype, device, categorical_action_encoding
        return CompositeSpec(**new_spec)
        raise TypeError(f"Unsupported spec type {type(spec)}")

[docs]class JumanjiWrapper(GymLikeEnv): """Jumanji environment wrapper. Examples: >>> env = jumanji.make("Snake-6x6-v0") >>> env = JumanjiWrapper(env) >>> env.set_seed(0) >>> td = env.reset() >>> td["action"] = env.action_spec.rand() >>> td = env.step(td) >>> print(td1) TensorDict( fields={ action: Tensor(torch.Size([1]), dtype=torch.int32), done: Tensor(torch.Size([1]), dtype=torch.bool), next: TensorDict( fields={ observation: Tensor(torch.Size([6, 6, 5]), dtype=torch.float32)}, batch_size=torch.Size([]), device=cpu, is_shared=False), observation: Tensor(torch.Size([6, 6, 5]), dtype=torch.float32), reward: Tensor(torch.Size([1]), dtype=torch.float32), state: TensorDict(...)}, batch_size=torch.Size([]), device=cpu, is_shared=False) >>> print(env.available_envs) ['Snake-6x6-v0', 'Snake-12x12-v0', 'TSP50-v0', 'TSP100-v0', ...] """ git_url = "" libname = "jumanji" @_classproperty def available_envs(cls): if not _has_jumanji: return yield from _get_envs() @property def lib(self): import jumanji return jumanji def __init__(self, env: "jumanji.env.Environment" = None, **kwargs): # noqa: F821 if not _has_jumanji: raise ImportError( "jumanji is not installed or importing it failed. Consider checking your installation." ) if env is not None: kwargs["env"] = env super().__init__(**kwargs) def _build_env( self, env, _seed: Optional[int] = None, from_pixels: bool = False, render_kwargs: Optional[dict] = None, pixels_only: bool = False, camera_id: Union[int, str] = 0, **kwargs, ): self.from_pixels = from_pixels self.pixels_only = pixels_only if from_pixels: raise NotImplementedError("TODO") return env def _make_state_example(self, env): import jax from jax import numpy as jnp key = jax.random.PRNGKey(0) keys = jax.random.split(key, self.batch_size.numel()) state, _ = jax.vmap(env.reset)(jnp.stack(keys)) state = _tree_reshape(state, self.batch_size) return state def _make_state_spec(self, env) -> TensorSpec: import jax key = jax.random.PRNGKey(0) state, _ = env.reset(key) state_dict = _object_to_tensordict(state, self.device, batch_size=()) state_spec = _extract_spec(state_dict) return state_spec def _make_action_spec(self, env) -> TensorSpec: action_spec = _jumanji_to_torchrl_spec_transform( env.action_spec(), device=self.device ) action_spec = action_spec.expand(*self.batch_size, *action_spec.shape) return action_spec def _make_observation_spec(self, env) -> TensorSpec: jumanji = self.lib spec = env.observation_spec() new_spec = _jumanji_to_torchrl_spec_transform(spec, device=self.device) if isinstance(spec, jumanji.specs.Array): return CompositeSpec(observation=new_spec).expand(self.batch_size) elif isinstance(spec, jumanji.specs.Spec): return CompositeSpec(**{k: v for k, v in new_spec.items()}).expand( self.batch_size ) else: raise TypeError(f"Unsupported spec type {type(spec)}") def _make_reward_spec(self, env) -> TensorSpec: reward_spec = _jumanji_to_torchrl_spec_transform( env.reward_spec(), device=self.device ) if not len(reward_spec.shape): reward_spec.shape = torch.Size([1]) return reward_spec.expand([*self.batch_size, *reward_spec.shape]) def _make_specs(self, env: "jumanji.env.Environment") -> None: # noqa: F821 # extract spec from jumanji definition self.action_spec = self._make_action_spec(env) self.observation_spec = self._make_observation_spec(env) self.reward_spec = self._make_reward_spec(env) # extract state spec from instance state_spec = self._make_state_spec(env).expand(self.batch_size) self.state_spec["state"] = state_spec self.observation_spec["state"] = state_spec.clone() # build state example for data conversion self._state_example = self._make_state_example(env) def _check_kwargs(self, kwargs: Dict): jumanji = self.lib if "env" not in kwargs: raise TypeError("Could not find environment key 'env' in kwargs.") env = kwargs["env"] if not isinstance(env, (jumanji.env.Environment,)): raise TypeError("env is not of type 'jumanji.env.Environment'.") def _init_env(self): pass @property def key(self): key = getattr(self, "_key", None) if key is None: raise RuntimeError( "the env.key attribute wasn't found. Make sure to call `env.set_seed(seed)` before any interaction." ) return key @key.setter def key(self, value): self._key = value def _set_seed(self, seed): import jax if seed is None: raise Exception("Jumanji requires an integer seed.") self.key = jax.random.PRNGKey(seed) def read_state(self, state): state_dict = _object_to_tensordict(state, self.device, self.batch_size) return self.state_spec["state"].encode(state_dict) def read_obs(self, obs): from jax import numpy as jnp if isinstance(obs, (list, jnp.ndarray, np.ndarray)): obs_dict = _ndarray_to_tensor(obs).to(self.device) else: obs_dict = _object_to_tensordict(obs, self.device, self.batch_size) return super().read_obs(obs_dict) def _step(self, tensordict: TensorDictBase) -> TensorDictBase: import jax # prepare inputs state = _tensordict_to_object(tensordict.get("state"), self._state_example) action = self.read_action(tensordict.get("action")) # flatten batch size into vector state = _tree_flatten(state, self.batch_size) action = _tree_flatten(action, self.batch_size) # jax vectorizing map on env.step state, timestep = jax.vmap(self._env.step)(state, action) # reshape batch size from vector state = _tree_reshape(state, self.batch_size) timestep = _tree_reshape(timestep, self.batch_size) # collect outputs state_dict = self.read_state(state) obs_dict = self.read_obs(timestep.observation) reward = self.read_reward(np.asarray(timestep.reward)) done = timestep.step_type == self.lib.types.StepType.LAST done = _ndarray_to_tensor(done).view(torch.bool).to(self.device) # build results tensordict_out = TensorDict( source=obs_dict, batch_size=tensordict.batch_size, device=self.device, ) tensordict_out.set("reward", reward) tensordict_out.set("done", done) tensordict_out.set("terminated", done) # tensordict_out.set("terminated", done) tensordict_out["state"] = state_dict return tensordict_out def _reset( self, tensordict: Optional[TensorDictBase] = None, **kwargs ) -> TensorDictBase: import jax from jax import numpy as jnp # generate random keys self.key, *keys = jax.random.split(self.key, self.numel() + 1) # jax vectorizing map on env.reset state, timestep = jax.vmap(self._env.reset)(jnp.stack(keys)) # reshape batch size from vector state = _tree_reshape(state, self.batch_size) timestep = _tree_reshape(timestep, self.batch_size) # collect outputs state_dict = self.read_state(state) obs_dict = self.read_obs(timestep.observation) done_td = # build results tensordict_out = TensorDict( source=obs_dict, batch_size=self.batch_size, device=self.device, ) tensordict_out.update(done_td) tensordict_out["state"] = state_dict return tensordict_out def _output_transform(self, step_outputs_tuple: Tuple) -> Tuple: ... def _reset_output_transform(self, reset_outputs_tuple: Tuple) -> Tuple: ...
[docs]class JumanjiEnv(JumanjiWrapper): """Jumanji environment wrapper. Examples: >>> env = JumanjiEnv(env_name="Snake-6x6-v0", frame_skip=4) >>> td = env.rand_step() >>> print(td) >>> print(env.available_envs) """ def __init__(self, env_name, **kwargs): kwargs["env_name"] = env_name super().__init__(**kwargs) def _build_env( self, env_name: str, **kwargs, ) -> "jumanji.env.Environment": # noqa: F821 if not _has_jumanji: raise ImportError( f"jumanji not found, unable to create {env_name}. " f"Consider installing jumanji. More info:" f" {self.git_url}." ) from_pixels = kwargs.pop("from_pixels", False) pixels_only = kwargs.pop("pixels_only", True) if kwargs: raise ValueError(f"Extra kwargs are not supported by {type(self)}.") self.wrapper_frame_skip = 1 env = self.lib.make(env_name, **kwargs) return super()._build_env(env, pixels_only=pixels_only, from_pixels=from_pixels) @property def env_name(self): return self._constructor_kwargs["env_name"] def _check_kwargs(self, kwargs: Dict): if "env_name" not in kwargs: raise TypeError("Expected 'env_name' to be part of kwargs") def __repr__(self) -> str: return f"{self.__class__.__name__}(env={self.env_name}, batch_size={self.batch_size}, device={self.device})"


