Source code for torchrl.trainers.helpers.envs
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from copy import copy
from dataclasses import dataclass, field as dataclass_field
from typing import Any, Callable, Optional, Sequence, Tuple, Union
import torch
from torchrl._utils import logger as torchrl_logger, VERBOSE
from torchrl.envs import ParallelEnv
from torchrl.envs.common import EnvBase
from torchrl.envs.env_creator import env_creator, EnvCreator
from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
CatFrames,
CatTensors,
CenterCrop,
Compose,
DoubleToFloat,
GrayScale,
NoopResetEnv,
ObservationNorm,
Resize,
RewardScaling,
ToTensorImage,
TransformedEnv,
VecNorm,
)
from torchrl.envs.transforms.transforms import (
FlattenObservation,
gSDENoise,
InitTracker,
StepCounter,
)
from torchrl.record.loggers import Logger
from torchrl.record.recorder import VideoRecorder
LIBS = {
"gym": GymEnv,
"dm_control": DMControlEnv,
}
[docs]def correct_for_frame_skip(cfg: "DictConfig") -> "DictConfig": # noqa: F821
"""Correct the arguments for the input frame_skip, by dividing all the arguments that reflect a count of frames by the frame_skip.
This is aimed at avoiding unknowingly over-sampling from the environment, i.e. targetting a total number of frames
of 1M but actually collecting frame_skip * 1M frames.
Args:
cfg (DictConfig): DictConfig containing some frame-counting argument, including:
"max_frames_per_traj", "total_frames", "frames_per_batch", "record_frames", "annealing_frames",
"init_random_frames", "init_env_steps"
Returns:
the input DictConfig, modified in-place.
"""
# Adapt all frame counts wrt frame_skip
if cfg.frame_skip != 1:
fields = [
"max_frames_per_traj",
"total_frames",
"frames_per_batch",
"record_frames",
"annealing_frames",
"init_random_frames",
"init_env_steps",
"noops",
]
for field in fields:
if hasattr(cfg, field):
setattr(cfg, field, getattr(cfg, field) // cfg.frame_skip)
return cfg
def make_env_transforms(
env,
cfg,
video_tag,
logger,
env_name,
stats,
norm_obs_only,
env_library,
action_dim_gsde,
state_dim_gsde,
batch_dims=0,
obs_norm_state_dict=None,
):
"""Creates the typical transforms for and env."""
env = TransformedEnv(env)
from_pixels = cfg.from_pixels
vecnorm = cfg.vecnorm
norm_rewards = vecnorm and cfg.norm_rewards
_norm_obs_only = norm_obs_only or not norm_rewards
reward_scaling = cfg.reward_scaling
reward_loc = cfg.reward_loc
if len(video_tag):
center_crop = cfg.center_crop
if center_crop:
center_crop = center_crop[0]
env.append_transform(
VideoRecorder(
logger=logger,
tag=f"{video_tag}_{env_name}_video",
center_crop=center_crop,
),
)
if from_pixels:
if not cfg.catframes:
raise RuntimeError(
"this env builder currently only accepts positive catframes values "
"when pixels are being used."
)
env.append_transform(ToTensorImage())
if cfg.center_crop:
env.append_transform(CenterCrop(*cfg.center_crop))
env.append_transform(Resize(cfg.image_size, cfg.image_size))
if cfg.grayscale:
env.append_transform(GrayScale())
env.append_transform(FlattenObservation(0, -3, allow_positive_dim=True))
env.append_transform(CatFrames(N=cfg.catframes, in_keys=["pixels"], dim=-3))
if stats is None and obs_norm_state_dict is None:
obs_stats = {}
elif stats is None:
obs_stats = copy(obs_norm_state_dict)
else:
obs_stats = copy(stats)
obs_stats["standard_normal"] = True
obs_norm = ObservationNorm(**obs_stats, in_keys=["pixels"])
env.append_transform(obs_norm)
if norm_rewards:
reward_scaling = 1.0
reward_loc = 0.0
if norm_obs_only:
reward_scaling = 1.0
reward_loc = 0.0
if reward_scaling is not None:
env.append_transform(RewardScaling(reward_loc, reward_scaling))
if not from_pixels:
selected_keys = [
key
for key in env.observation_spec.keys(True, True)
if ("pixels" not in key) and (key not in env.state_spec.keys(True, True))
]
# even if there is a single tensor, it'll be renamed in "observation_vector"
out_key = "observation_vector"
env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))
if not vecnorm:
if stats is None and obs_norm_state_dict is None:
_stats = {}
elif stats is None:
_stats = copy(obs_norm_state_dict)
else:
_stats = copy(stats)
_stats.update({"standard_normal": True})
obs_norm = ObservationNorm(
**_stats,
in_keys=[out_key],
)
env.append_transform(obs_norm)
else:
env.append_transform(
VecNorm(
in_keys=[out_key, "reward"] if not _norm_obs_only else [out_key],
decay=0.9999,
)
)
env.append_transform(DoubleToFloat())
if hasattr(cfg, "catframes") and cfg.catframes:
env.append_transform(CatFrames(N=cfg.catframes, in_keys=[out_key], dim=-1))
else:
env.append_transform(DoubleToFloat())
if hasattr(cfg, "gSDE") and cfg.gSDE:
env.append_transform(
gSDENoise(action_dim=action_dim_gsde, state_dim=state_dim_gsde)
)
env.append_transform(StepCounter())
env.append_transform(InitTracker())
return env
def get_norm_state_dict(env):
"""Gets the normalization loc and scale from the env state_dict."""
sd = env.state_dict()
sd = {
key: val
for key, val in sd.items()
if key.endswith("loc") or key.endswith("scale")
}
return sd
[docs]def transformed_env_constructor(
cfg: "DictConfig", # noqa: F821
video_tag: str = "",
logger: Optional[Logger] = None,
stats: Optional[dict] = None,
norm_obs_only: bool = False,
use_env_creator: bool = False,
custom_env_maker: Optional[Callable] = None,
custom_env: Optional[EnvBase] = None,
return_transformed_envs: bool = True,
action_dim_gsde: Optional[int] = None,
state_dim_gsde: Optional[int] = None,
batch_dims: Optional[int] = 0,
obs_norm_state_dict: Optional[dict] = None,
) -> Union[Callable, EnvCreator]:
"""Returns an environment creator from an argparse.Namespace built with the appropriate parser constructor.
Args:
cfg (DictConfig): a DictConfig containing the arguments of the script.
video_tag (str, optional): video tag to be passed to the Logger object
logger (Logger, optional): logger associated with the script
stats (dict, optional): a dictionary containing the :obj:`loc` and :obj:`scale` for the `ObservationNorm` transform
norm_obs_only (bool, optional): If `True` and `VecNorm` is used, the reward won't be normalized online.
Default is `False`.
use_env_creator (bool, optional): wheter the `EnvCreator` class should be used. By using `EnvCreator`,
one can make sure that running statistics will be put in shared memory and accessible for all workers
when using a `VecNorm` transform. Default is `True`.
custom_env_maker (callable, optional): if your env maker is not part
of torchrl env wrappers, a custom callable
can be passed instead. In this case it will override the
constructor retrieved from `args`.
custom_env (EnvBase, optional): if an existing environment needs to be
transformed_in, it can be passed directly to this helper. `custom_env_maker`
and `custom_env` are exclusive features.
return_transformed_envs (bool, optional): if ``True``, a transformed_in environment
is returned.
action_dim_gsde (int, Optional): if gSDE is used, this can present the action dim to initialize the noise.
Make sure this is indicated in environment executed in parallel.
state_dim_gsde: if gSDE is used, this can present the state dim to initialize the noise.
Make sure this is indicated in environment executed in parallel.
batch_dims (int, optional): number of dimensions of a batch of data. If a single env is
used, it should be 0 (default). If multiple envs are being transformed in parallel,
it should be set to 1 (or the number of dims of the batch).
obs_norm_state_dict (dict, optional): the state_dict of the ObservationNorm transform to be loaded into the
environment
"""
def make_transformed_env(**kwargs) -> TransformedEnv:
env_name = cfg.env_name
env_task = cfg.env_task
env_library = LIBS[cfg.env_library]
frame_skip = cfg.frame_skip
from_pixels = cfg.from_pixels
categorical_action_encoding = cfg.categorical_action_encoding
if custom_env is None and custom_env_maker is None:
if isinstance(cfg.collector_device, str):
device = cfg.collector_device
elif isinstance(cfg.collector_device, Sequence):
device = cfg.collector_device[0]
else:
raise ValueError(
"collector_device must be either a string or a sequence of strings"
)
env_kwargs = {
"env_name": env_name,
"device": device,
"frame_skip": frame_skip,
"from_pixels": from_pixels or len(video_tag),
"pixels_only": from_pixels,
}
if env_library is GymEnv:
env_kwargs.update(
{"categorical_action_encoding": categorical_action_encoding}
)
elif categorical_action_encoding:
raise NotImplementedError(
"categorical_action_encoding=True is currently only compatible with GymEnvs."
)
if env_library is DMControlEnv:
env_kwargs.update({"task_name": env_task})
env_kwargs.update(kwargs)
env = env_library(**env_kwargs)
elif custom_env is None and custom_env_maker is not None:
env = custom_env_maker(**kwargs)
elif custom_env_maker is None and custom_env is not None:
env = custom_env
else:
raise RuntimeError("cannot provive both custom_env and custom_env_maker")
if cfg.noops and custom_env is None:
# this is a bit hacky: if custom_env is not None, it is probably a ParallelEnv
# that already has its NoopResetEnv set for the contained envs.
# There is a risk however that we're just skipping the NoopsReset instantiation
env = TransformedEnv(env, NoopResetEnv(cfg.noops))
if not return_transformed_envs:
return env
return make_env_transforms(
env,
cfg,
video_tag,
logger,
env_name,
stats,
norm_obs_only,
env_library,
action_dim_gsde,
state_dim_gsde,
batch_dims=batch_dims,
obs_norm_state_dict=obs_norm_state_dict,
)
if use_env_creator:
return env_creator(make_transformed_env)
return make_transformed_env
[docs]def parallel_env_constructor(
cfg: "DictConfig", **kwargs # noqa: F821
) -> Union[ParallelEnv, EnvCreator]:
"""Returns a parallel environment from an argparse.Namespace built with the appropriate parser constructor.
Args:
cfg (DictConfig): config containing user-defined arguments
kwargs: keyword arguments for the `transformed_env_constructor` method.
"""
batch_transform = cfg.batch_transform
if not batch_transform:
raise NotImplementedError(
"batch_transform must be set to True for the recorder to be synced "
"with the collection envs."
)
if cfg.env_per_collector == 1:
kwargs.update({"cfg": cfg, "use_env_creator": True})
make_transformed_env = transformed_env_constructor(**kwargs)
return make_transformed_env
kwargs.update({"cfg": cfg, "use_env_creator": True})
make_transformed_env = transformed_env_constructor(
return_transformed_envs=not batch_transform, **kwargs
)
parallel_env = ParallelEnv(
num_workers=cfg.env_per_collector,
create_env_fn=make_transformed_env,
create_env_kwargs=None,
pin_memory=cfg.pin_memory,
)
if batch_transform:
kwargs.update(
{
"cfg": cfg,
"use_env_creator": False,
"custom_env": parallel_env,
"batch_dims": 1,
}
)
env = transformed_env_constructor(**kwargs)()
return env
return parallel_env
[docs]@torch.no_grad()
def get_stats_random_rollout(
cfg: "DictConfig", # noqa: F821
proof_environment: EnvBase = None,
key: Optional[str] = None,
):
"""Gathers stas (loc and scale) from an environment using random rollouts.
Args:
cfg (DictConfig): a config object with `init_env_steps` field, indicating
the total number of frames to be collected to compute the stats.
proof_environment (EnvBase instance, optional): if provided, this env will
be used ot execute the rollouts. If not, it will be created using
the cfg object.
key (str, optional): if provided, the stats of this key will be gathered.
If not, it is expected that only one key exists in `env.observation_spec`.
"""
proof_env_is_none = proof_environment is None
if proof_env_is_none:
proof_environment = transformed_env_constructor(
cfg=cfg, use_env_creator=False, stats={"loc": 0.0, "scale": 1.0}
)()
if VERBOSE:
torchrl_logger.info("computing state stats")
if not hasattr(cfg, "init_env_steps"):
raise AttributeError("init_env_steps missing from arguments.")
n = 0
val_stats = []
while n < cfg.init_env_steps:
_td_stats = proof_environment.rollout(max_steps=cfg.init_env_steps)
n += _td_stats.numel()
val = _td_stats.get(key).cpu()
val_stats.append(val)
del _td_stats, val
val_stats = torch.cat(val_stats, 0)
if key is None:
keys = list(proof_environment.observation_spec.keys(True, True))
key = keys.pop()
if len(keys):
raise RuntimeError(
f"More than one key exists in the observation_specs: {[key] + keys} were found, "
"thus get_stats_random_rollout cannot infer which to compute the stats of."
)
if key == "pixels":
m = val_stats.mean()
s = val_stats.std()
else:
m = val_stats.mean(dim=0)
s = val_stats.std(dim=0)
m[s == 0] = 0.0
s[s == 0] = 1.0
if VERBOSE:
torchrl_logger.info(
f"stats computed for {val_stats.numel()} steps. Got: \n"
f"loc = {m}, \n"
f"scale = {s}"
)
if not torch.isfinite(m).all():
raise RuntimeError("non-finite values found in mean")
if not torch.isfinite(s).all():
raise RuntimeError("non-finite values found in sd")
stats = {"loc": m, "scale": s}
if proof_env_is_none:
proof_environment.close()
if (
proof_environment.device != torch.device("cpu")
and torch.cuda.device_count() > 0
):
torch.cuda.empty_cache()
del proof_environment
return stats
def initialize_observation_norm_transforms(
proof_environment: EnvBase,
num_iter: int = 1000,
key: Union[str, Tuple[str, ...]] = None,
):
"""Calls :obj:`ObservationNorm.init_stats` on all uninitialized :obj:`ObservationNorm` instances of a :obj:`TransformedEnv`.
If an :obj:`ObservationNorm` already has non-null :obj:`loc` or :obj:`scale`, a call to :obj:`initialize_observation_norm_transforms` will be a no-op.
Similarly, if the transformed environment does not contain any :obj:`ObservationNorm`, a call to this function will have no effect.
If no key is provided but the observations of the :obj:`EnvBase` contains more than one key, an exception will
be raised.
Args:
proof_environment (EnvBase instance, optional): if provided, this env will
be used ot execute the rollouts. If not, it will be created using
the cfg object.
num_iter (int): Number of iterations used for initializing the :obj:`ObservationNorms`
key (str, optional): if provided, the stats of this key will be gathered.
If not, it is expected that only one key exists in `env.observation_spec`.
"""
if not isinstance(proof_environment.transform, Compose) and not isinstance(
proof_environment.transform, ObservationNorm
):
return
if key is None:
keys = list(proof_environment.base_env.observation_spec.keys(True, True))
key = keys.pop()
if len(keys):
raise RuntimeError(
f"More than one key exists in the observation_specs: {[key] + keys} were found, "
"thus initialize_observation_norm_transforms cannot infer which to compute the stats of."
)
if isinstance(proof_environment.transform, Compose):
for transform in proof_environment.transform:
if isinstance(transform, ObservationNorm) and not transform.initialized:
transform.init_stats(num_iter=num_iter, key=key)
elif not proof_environment.transform.initialized:
proof_environment.transform.init_stats(num_iter=num_iter, key=key)
def retrieve_observation_norms_state_dict(proof_environment: TransformedEnv):
"""Traverses the transforms of the environment and retrieves the :obj:`ObservationNorm` state dicts.
Returns a list of tuple (idx, state_dict) for each :obj:`ObservationNorm` transform in proof_environment
If the environment transforms do not contain any :obj:`ObservationNorm`, returns an empty list
Args:
proof_environment (EnvBase instance, optional): the :obj:``TransformedEnv` to retrieve the :obj:`ObservationNorm`
state dict from
"""
obs_norm_state_dicts = []
if isinstance(proof_environment.transform, Compose):
for idx, transform in enumerate(proof_environment.transform):
if isinstance(transform, ObservationNorm):
obs_norm_state_dicts.append((idx, transform.state_dict()))
if isinstance(proof_environment.transform, ObservationNorm):
obs_norm_state_dicts.append((0, proof_environment.transform.state_dict()))
return obs_norm_state_dicts
@dataclass
class EnvConfig:
"""Environment config struct."""
env_library: str = "gym"
# env_library used for the simulated environment. Default=gym
env_name: str = "Humanoid-v2"
# name of the environment to be created. Default=Humanoid-v2
env_task: str = ""
# task (if any) for the environment. Default=run
from_pixels: bool = False
# whether the environment output should be state vector(s) (default) or the pixels.
frame_skip: int = 1
# frame_skip for the environment. Note that this value does NOT impact the buffer size,
# maximum steps per trajectory, frames per batch or any other factor in the algorithm,
# e.g. if the total number of frames that has to be computed is 50e6 and the frame skip is 4
# the actual number of frames retrieved will be 200e6. Default=1.
reward_scaling: Optional[float] = None
# scale of the reward.
reward_loc: float = 0.0
# location of the reward.
init_env_steps: int = 1000
# number of random steps to compute normalizing constants
vecnorm: bool = False
# Normalizes the environment observation and reward outputs with the running statistics obtained across processes.
norm_rewards: bool = False
# If True, rewards will be normalized on the fly. This may interfere with SAC update rule and should be used cautiously.
norm_stats: bool = True
# Deactivates the normalization based on random collection of data.
noops: int = 0
# number of random steps to do after reset. Default is 0
catframes: int = 0
# Number of frames to concatenate through time. Default is 0 (do not use CatFrames).
center_crop: Any = dataclass_field(default_factory=lambda: [])
# center crop size.
grayscale: bool = True
# Disables grayscale transform.
max_frames_per_traj: int = 1000
# Number of steps before a reset of the environment is called (if it has not been flagged as done before).
batch_transform: bool = False
# if ``True``, the transforms will be applied to the parallel env, and not to each individual env.\
image_size: int = 84
# if True and environment has discrete action space, then it is encoded as categorical values rather than one-hot.
categorical_action_encoding: bool = False