BatchSizeTransform¶
- class torchrl.envs.transforms.BatchSizeTransform(*, batch_size: torch.Size | None = None, reshape_fn: Callable[[TensorDictBase], TensorDictBase] | None = None, reset_func: Callable[[TensorDictBase, TensorDictBase], TensorDictBase] | None = None, env_kwarg: bool = False)[source]¶
A transform to modify the batch-size of an environmt.
This transform has two distinct usages: it can be used to set the batch-size for non-batch-locked (e.g. stateless) environments to enable data collection using data collectors. It can also be used to modify the batch-size of an environment (e.g. squeeze, unsqueeze or reshape).
This transform modifies the environment batch-size to match the one provided. It expects the parent environment batch-size to be expandable to the provided one.
- Keyword Arguments:
batch_size (torch.Size or equivalent, optional) – the new batch-size of the environment. Exclusive with
reshape_fn
.reshape_fn (callable, optional) –
a callable to modify the environment batch-size. Exclusive with
batch_size
.Note
Currently, transformations involving
reshape
,flatten
,unflatten
,squeeze
andunsqueeze
are supported. If another reshape operation is required, please submit a feature request on TorchRL github.reset_func (callable, optional) – a function that produces a reset tensordict. The signature must match
Callable[[TensorDictBase, TensorDictBase], TensorDictBase]
where the first input argument is the optional tensordict passed to the environment during the call toreset()
and the second is the output ofTransformedEnv.base_env.reset
. It can also support an optionalenv
keyword argument ifenv_kwarg=True
.env_kwarg (bool, optional) – if
True
,reset_func
must support aenv
keyword argument. Defaults toFalse
. The env passed will be the env accompanied by its transform.
Example
>>> # Changing the batch-size with a function >>> from torchrl.envs import GymEnv >>> base_env = GymEnv("CartPole-v1") >>> env = TransformedEnv(base_env, BatchSizeTransform(reshape_fn=lambda data: data.reshape(1, 1))) >>> env.rollout(4) >>> # Setting the shape of a stateless environment >>> class MyEnv(EnvBase): ... batch_locked = False ... def __init__(self): ... super().__init__() ... self.observation_spec = CompositeSpec(observation=UnboundedContinuousTensorSpec(3)) ... self.reward_spec = UnboundedContinuousTensorSpec(1) ... self.action_spec = UnboundedContinuousTensorSpec(1) ... ... def _reset(self, tensordict: TensorDictBase, **kwargs) -> TensorDictBase: ... tensordict_batch_size = tensordict.batch_size if tensordict is not None else torch.Size([]) ... result = self.observation_spec.rand(tensordict_batch_size) ... result.update(self.full_done_spec.zero(tensordict_batch_size)) ... return result ... ... def _step( ... self, ... tensordict: TensorDictBase, ... ) -> TensorDictBase: ... result = self.observation_spec.rand(tensordict.batch_size) ... result.update(self.full_done_spec.zero(tensordict.batch_size)) ... result.update(self.full_reward_spec.zero(tensordict.batch_size)) ... return result ... ... def _set_seed(self, seed: Optional[int]): ... pass ... >>> env = TransformedEnv(MyEnv(), BatchSizeTransform([5])) >>> assert env.batch_size == torch.Size([5]) >>> assert env.rollout(10).shape == torch.Size([5, 10])
The
reset_func
can create a tensordict with the desired batch-size, allowing for a fine-grained reset call:>>> def reset_func(tensordict, tensordict_reset, env): ... result = env.observation_spec.rand() ... result.update(env.full_done_spec.zero()) ... assert result.batch_size != torch.Size([]) ... return result >>> env = TransformedEnv(MyEnv(), BatchSizeTransform([5], reset_func=reset_func, env_kwarg=True)) >>> print(env.rollout(2)) TensorDict( fields={ action: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), reward: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False)
This transform can be used to deploy non-batch-locked environments within data collectors:
>>> from torchrl.collectors import SyncDataCollector >>> collector = SyncDataCollector(env, lambda td: env.rand_action(td), frames_per_batch=10, total_frames=-1) >>> for data in collector: ... print(data) ... break TensorDict( fields={ action: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), collector: TensorDict( fields={ traj_ids: Tensor(shape=torch.Size([5, 2]), device=cpu, dtype=torch.int64, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False), done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), reward: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False) >>> collector.shutdown()
- forward(tensordict: TensorDictBase) TensorDictBase ¶
Reads the input tensordict, and for the selected keys, applies the transform.
- transform_input_spec(input_spec: CompositeSpec) CompositeSpec [source]¶
Transforms the input spec such that the resulting spec matches transform mapping.
- Parameters:
input_spec (TensorSpec) – spec before the transform
- Returns:
expected spec after the transform
- transform_output_spec(output_spec: CompositeSpec) CompositeSpec [source]¶
Transforms the output spec such that the resulting spec matches transform mapping.
This method should generally be left untouched. Changes should be implemented using
transform_observation_spec()
,transform_reward_spec()
andtransformfull_done_spec()
. :param output_spec: spec before the transform :type output_spec: TensorSpec- Returns:
expected spec after the transform