Source code for ray.rllib.utils.exploration.gaussian_noise

from gymnasium.spaces import Space
import numpy as np
from typing import Union, Optional

from ray.rllib.models.action_dist import ActionDistribution
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils.annotations import OldAPIStack, override
from ray.rllib.utils.exploration.exploration import Exploration
from ray.rllib.utils.exploration.random import Random
from ray.rllib.utils.framework import (
    try_import_tf,
    try_import_torch,
    get_variable,
    TensorType,
)
from ray.rllib.utils.numpy import convert_to_numpy
from ray.rllib.utils.schedules import Schedule
from ray.rllib.utils.schedules.piecewise_schedule import PiecewiseSchedule
from ray.rllib.utils.tf_utils import zero_logps_from_actions

tf1, tf, tfv = try_import_tf()
torch, _ = try_import_torch()


[docs] @OldAPIStack class GaussianNoise(Exploration): """An exploration that adds white noise to continuous actions. If explore=True, returns actions plus scale (annealed over time) x Gaussian noise. Also, some completely random period is possible at the beginning. If explore=False, returns the deterministic action. """
[docs] def __init__( self, action_space: Space, *, framework: str, model: ModelV2, random_timesteps: int = 1000, stddev: float = 0.1, initial_scale: float = 1.0, final_scale: float = 0.02, scale_timesteps: int = 10000, scale_schedule: Optional[Schedule] = None, **kwargs ): """Initializes a GaussianNoise instance. Args: random_timesteps: The number of timesteps for which to act completely randomly. Only after this number of timesteps, the `self.scale` annealing process will start (see below). stddev: The stddev (sigma) to use for the Gaussian noise to be added to the actions. initial_scale: The initial scaling weight to multiply the noise with. final_scale: The final scaling weight to multiply the noise with. scale_timesteps: The timesteps over which to linearly anneal the scaling factor (after(!) having used random actions for `random_timesteps` steps). scale_schedule: An optional Schedule object to use (instead of constructing one from the given parameters). """ assert framework is not None super().__init__(action_space, model=model, framework=framework, **kwargs) # Create the Random exploration module (used for the first n # timesteps). self.random_timesteps = random_timesteps self.random_exploration = Random( action_space, model=self.model, framework=self.framework, **kwargs ) self.stddev = stddev # The `scale` annealing schedule. self.scale_schedule = scale_schedule or PiecewiseSchedule( endpoints=[ (random_timesteps, initial_scale), (random_timesteps + scale_timesteps, final_scale), ], outside_value=final_scale, framework=self.framework, ) # The current timestep value (tf-var or python int). self.last_timestep = get_variable( np.array(0, np.int64), framework=self.framework, tf_name="timestep", dtype=np.int64, ) # Build the tf-info-op. if self.framework == "tf": self._tf_state_op = self.get_state()
@override(Exploration) def get_exploration_action( self, *, action_distribution: ActionDistribution, timestep: Union[int, TensorType], explore: bool = True ): # Adds IID Gaussian noise for exploration, TD3-style. if self.framework == "torch": return self._get_torch_exploration_action( action_distribution, explore, timestep ) else: return self._get_tf_exploration_action_op( action_distribution, explore, timestep ) def _get_tf_exploration_action_op( self, action_dist: ActionDistribution, explore: bool, timestep: Union[int, TensorType], ): ts = timestep if timestep is not None else self.last_timestep # The deterministic actions (if explore=False). deterministic_actions = action_dist.deterministic_sample() # Take a Gaussian sample with our stddev (mean=0.0) and scale it. gaussian_sample = self.scale_schedule(ts) * tf.random.normal( tf.shape(deterministic_actions), stddev=self.stddev ) # Stochastic actions could either be: random OR action + noise. random_actions, _ = self.random_exploration.get_tf_exploration_action_op( action_dist, explore ) stochastic_actions = tf.cond( pred=tf.convert_to_tensor(ts < self.random_timesteps), true_fn=lambda: random_actions, false_fn=lambda: tf.clip_by_value( deterministic_actions + gaussian_sample, self.action_space.low * tf.ones_like(deterministic_actions), self.action_space.high * tf.ones_like(deterministic_actions), ), ) # Chose by `explore` (main exploration switch). action = tf.cond( pred=tf.constant(explore, dtype=tf.bool) if isinstance(explore, bool) else explore, true_fn=lambda: stochastic_actions, false_fn=lambda: deterministic_actions, ) # Logp=always zero. logp = zero_logps_from_actions(deterministic_actions) # Increment `last_timestep` by 1 (or set to `timestep`). if self.framework == "tf2": if timestep is None: self.last_timestep.assign_add(1) else: self.last_timestep.assign(tf.cast(timestep, tf.int64)) return action, logp else: assign_op = ( tf1.assign_add(self.last_timestep, 1) if timestep is None else tf1.assign(self.last_timestep, timestep) ) with tf1.control_dependencies([assign_op]): return action, logp def _get_torch_exploration_action( self, action_dist: ActionDistribution, explore: bool, timestep: Union[int, TensorType], ): # Set last timestep or (if not given) increase by one. self.last_timestep = ( timestep if timestep is not None else self.last_timestep + 1 ) # Apply exploration. if explore: # Random exploration phase. if self.last_timestep < self.random_timesteps: action, _ = self.random_exploration.get_torch_exploration_action( action_dist, explore=True ) # Take a Gaussian sample with our stddev (mean=0.0) and scale it. else: det_actions = action_dist.deterministic_sample() scale = self.scale_schedule(self.last_timestep) gaussian_sample = scale * torch.normal( mean=torch.zeros(det_actions.size()), std=self.stddev ).to(self.device) action = torch.min( torch.max( det_actions + gaussian_sample, torch.tensor( self.action_space.low, dtype=torch.float32, device=self.device, ), ), torch.tensor( self.action_space.high, dtype=torch.float32, device=self.device ), ) # No exploration -> Return deterministic actions. else: action = action_dist.deterministic_sample() # Logp=always zero. logp = torch.zeros((action.size()[0],), dtype=torch.float32, device=self.device) return action, logp
[docs] @override(Exploration) def get_state(self, sess: Optional["tf.Session"] = None): """Returns the current scale value. Returns: Union[float,tf.Tensor[float]]: The current scale value. """ if sess: return sess.run(self._tf_state_op) scale = self.scale_schedule(self.last_timestep) return { "cur_scale": convert_to_numpy(scale) if self.framework != "tf" else scale, "last_timestep": convert_to_numpy(self.last_timestep) if self.framework != "tf" else self.last_timestep, }
@override(Exploration) def set_state(self, state: dict, sess: Optional["tf.Session"] = None) -> None: if self.framework == "tf": self.last_timestep.load(state["last_timestep"], session=sess) elif isinstance(self.last_timestep, int): self.last_timestep = state["last_timestep"] else: self.last_timestep.assign(state["last_timestep"])