"""
Proximal Policy Optimization (PPO)
==================================
This file defines the distributed Algorithm class for proximal policy
optimization.
See `ppo_[tf|torch]_policy.py` for the definition of the policy loss.
Detailed documentation: https://docs.ray.io/en/master/rllib-algorithms.html#ppo
"""
import logging
from typing import Any, Dict, List, Optional, Type, Union, TYPE_CHECKING
from ray.rllib.algorithms.algorithm import Algorithm
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig, NotProvided
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
from ray.rllib.execution.rollout_ops import (
standardize_fields,
synchronous_parallel_sample,
)
from ray.rllib.execution.train_ops import (
train_one_step,
multi_gpu_train_one_step,
)
from ray.rllib.policy.policy import Policy
from ray.rllib.utils.annotations import override
from ray.rllib.utils.deprecation import DEPRECATED_VALUE
from ray.rllib.utils.metrics import (
ENV_RUNNER_RESULTS,
ENV_RUNNER_SAMPLING_TIMER,
LEARNER_RESULTS,
LEARNER_UPDATE_TIMER,
NUM_AGENT_STEPS_SAMPLED,
NUM_AGENT_STEPS_SAMPLED_LIFETIME,
NUM_ENV_STEPS_SAMPLED,
NUM_ENV_STEPS_SAMPLED_LIFETIME,
NUM_ENV_STEPS_TRAINED,
NUM_ENV_STEPS_TRAINED_LIFETIME,
NUM_EPISODES,
NUM_EPISODES_LIFETIME,
SYNCH_WORKER_WEIGHTS_TIMER,
SAMPLE_TIMER,
TIMERS,
ALL_MODULES,
)
from ray.rllib.utils.metrics.learner_info import LEARNER_STATS_KEY
from ray.rllib.utils.schedules.scheduler import Scheduler
from ray.rllib.utils.typing import ResultDict
from ray.util.debug import log_once
if TYPE_CHECKING:
from ray.rllib.core.learner.learner import Learner
logger = logging.getLogger(__name__)
LEARNER_RESULTS_VF_LOSS_UNCLIPPED_KEY = "vf_loss_unclipped"
LEARNER_RESULTS_VF_EXPLAINED_VAR_KEY = "vf_explained_var"
LEARNER_RESULTS_KL_KEY = "mean_kl_loss"
LEARNER_RESULTS_CURR_KL_COEFF_KEY = "curr_kl_coeff"
LEARNER_RESULTS_CURR_ENTROPY_COEFF_KEY = "curr_entropy_coeff"
[docs]
class PPOConfig(AlgorithmConfig):
"""Defines a configuration class from which a PPO Algorithm can be built.
.. testcode::
from ray.rllib.algorithms.ppo import PPOConfig
config = PPOConfig()
config.environment("CartPole-v1")
config.env_runners(num_env_runners=1)
config.training(
gamma=0.9, lr=0.01, kl_coeff=0.3, train_batch_size_per_learner=256
)
# Build a Algorithm object from the config and run 1 training iteration.
algo = config.build()
algo.train()
.. testcode::
from ray.rllib.algorithms.ppo import PPOConfig
from ray import air
from ray import tune
config = (
PPOConfig()
# Set the config object's env.
.environment(env="CartPole-v1")
# Update the config object's training parameters.
.training(
lr=0.001, clip_param=0.2
)
)
tune.Tuner(
"PPO",
run_config=air.RunConfig(stop={"training_iteration": 1}),
param_space=config,
).fit()
.. testoutput::
:hide:
...
"""
def __init__(self, algo_class=None):
"""Initializes a PPOConfig instance."""
super().__init__(algo_class=algo_class or PPO)
self.exploration_config = {
# The Exploration class to use. In the simplest case, this is the name
# (str) of any class present in the `rllib.utils.exploration` package.
# You can also provide the python class directly or the full location
# of your class (e.g. "ray.rllib.utils.exploration.epsilon_greedy.
# EpsilonGreedy").
"type": "StochasticSampling",
# Add constructor kwargs here (if any).
}
# fmt: off
# __sphinx_doc_begin__
self.lr_schedule = None
self.lr = 5e-5
self.rollout_fragment_length = "auto"
self.train_batch_size = 4000
# PPO specific settings:
self.use_critic = True
self.use_gae = True
self.num_epochs = 30
self.minibatch_size = 128
self.shuffle_batch_per_epoch = True
self.lambda_ = 1.0
self.use_kl_loss = True
self.kl_coeff = 0.2
self.kl_target = 0.01
self.vf_loss_coeff = 1.0
self.entropy_coeff = 0.0
self.entropy_coeff_schedule = None
self.clip_param = 0.3
self.vf_clip_param = 10.0
self.grad_clip = None
# Override some of AlgorithmConfig's default values with PPO-specific values.
self.num_env_runners = 2
self.model["vf_share_layers"] = False
# `.api_stack()`
self.api_stack(
enable_rl_module_and_learner=True,
enable_env_runner_and_connector_v2=True,
)
# __sphinx_doc_end__
# fmt: on
# Deprecated keys.
self.sgd_minibatch_size = DEPRECATED_VALUE
self.vf_share_layers = DEPRECATED_VALUE
@override(AlgorithmConfig)
def get_default_rl_module_spec(self) -> RLModuleSpec:
from ray.rllib.algorithms.ppo.ppo_catalog import PPOCatalog
if self.framework_str == "torch":
from ray.rllib.algorithms.ppo.torch.ppo_torch_rl_module import (
PPOTorchRLModule,
)
return RLModuleSpec(module_class=PPOTorchRLModule, catalog_class=PPOCatalog)
elif self.framework_str == "tf2":
from ray.rllib.algorithms.ppo.tf.ppo_tf_rl_module import PPOTfRLModule
return RLModuleSpec(module_class=PPOTfRLModule, catalog_class=PPOCatalog)
else:
raise ValueError(
f"The framework {self.framework_str} is not supported. "
"Use either 'torch' or 'tf2'."
)
@override(AlgorithmConfig)
def get_default_learner_class(self) -> Union[Type["Learner"], str]:
if self.framework_str == "torch":
from ray.rllib.algorithms.ppo.torch.ppo_torch_learner import (
PPOTorchLearner,
)
return PPOTorchLearner
elif self.framework_str in ["tf2", "tf"]:
raise ValueError(
"TensorFlow is no longer supported on the new API stack! "
"Use `framework='torch'`."
)
else:
raise ValueError(
f"The framework {self.framework_str} is not supported. "
"Use `framework='torch'`."
)
[docs]
@override(AlgorithmConfig)
def training(
self,
*,
use_critic: Optional[bool] = NotProvided,
use_gae: Optional[bool] = NotProvided,
lambda_: Optional[float] = NotProvided,
use_kl_loss: Optional[bool] = NotProvided,
kl_coeff: Optional[float] = NotProvided,
kl_target: Optional[float] = NotProvided,
vf_loss_coeff: Optional[float] = NotProvided,
entropy_coeff: Optional[float] = NotProvided,
entropy_coeff_schedule: Optional[List[List[Union[int, float]]]] = NotProvided,
clip_param: Optional[float] = NotProvided,
vf_clip_param: Optional[float] = NotProvided,
grad_clip: Optional[float] = NotProvided,
# @OldAPIStack
lr_schedule: Optional[List[List[Union[int, float]]]] = NotProvided,
# Deprecated.
vf_share_layers=DEPRECATED_VALUE,
**kwargs,
) -> "PPOConfig":
"""Sets the training related configuration.
Args:
use_critic: Should use a critic as a baseline (otherwise don't use value
baseline; required for using GAE).
use_gae: If true, use the Generalized Advantage Estimator (GAE)
with a value function, see https://arxiv.org/pdf/1506.02438.pdf.
lambda_: The lambda parameter for General Advantage Estimation (GAE).
Defines the exponential weight used between actually measured rewards
vs value function estimates over multiple time steps. Specifically,
`lambda_` balances short-term, low-variance estimates against long-term,
high-variance returns. A `lambda_` of 0.0 makes the GAE rely only on
immediate rewards (and vf predictions from there on, reducing variance,
but increasing bias), while a `lambda_` of 1.0 only incorporates vf
predictions at the truncation points of the given episodes or episode
chunks (reducing bias but increasing variance).
use_kl_loss: Whether to use the KL-term in the loss function.
kl_coeff: Initial coefficient for KL divergence.
kl_target: Target value for KL divergence.
vf_loss_coeff: Coefficient of the value function loss. IMPORTANT: you must
tune this if you set vf_share_layers=True inside your model's config.
entropy_coeff: The entropy coefficient (float) or entropy coefficient
schedule in the format of
[[timestep, coeff-value], [timestep, coeff-value], ...]
In case of a schedule, intermediary timesteps will be assigned to
linearly interpolated coefficient values. A schedule config's first
entry must start with timestep 0, i.e.: [[0, initial_value], [...]].
clip_param: The PPO clip parameter.
vf_clip_param: Clip param for the value function. Note that this is
sensitive to the scale of the rewards. If your expected V is large,
increase this.
grad_clip: If specified, clip the global norm of gradients by this amount.
Returns:
This updated AlgorithmConfig object.
"""
# Pass kwargs onto super's `training()` method.
super().training(**kwargs)
if use_critic is not NotProvided:
self.use_critic = use_critic
# TODO (Kourosh) This is experimental.
# Don't forget to remove .use_critic from algorithm config.
if use_gae is not NotProvided:
self.use_gae = use_gae
if lambda_ is not NotProvided:
self.lambda_ = lambda_
if use_kl_loss is not NotProvided:
self.use_kl_loss = use_kl_loss
if kl_coeff is not NotProvided:
self.kl_coeff = kl_coeff
if kl_target is not NotProvided:
self.kl_target = kl_target
if vf_loss_coeff is not NotProvided:
self.vf_loss_coeff = vf_loss_coeff
if entropy_coeff is not NotProvided:
self.entropy_coeff = entropy_coeff
if clip_param is not NotProvided:
self.clip_param = clip_param
if vf_clip_param is not NotProvided:
self.vf_clip_param = vf_clip_param
if grad_clip is not NotProvided:
self.grad_clip = grad_clip
# TODO (sven): Remove these once new API stack is only option for PPO.
if lr_schedule is not NotProvided:
self.lr_schedule = lr_schedule
if entropy_coeff_schedule is not NotProvided:
self.entropy_coeff_schedule = entropy_coeff_schedule
return self
@override(AlgorithmConfig)
def validate(self) -> None:
# Call super's validation method.
super().validate()
# Warn about new API stack on by default.
if self.enable_rl_module_and_learner:
logger.warning(
f"You are running {self.algo_class.__name__} on the new API stack! "
"This is the new default behavior for this algorithm. If you don't "
"want to use the new API stack, set `config.api_stack("
"enable_rl_module_and_learner=False,"
"enable_env_runner_and_connector_v2=False)`. For a detailed migration "
"guide, see here: https://docs.ray.io/en/master/rllib/new-api-stack-migration-guide.html" # noqa
)
# Synchronous sampling, on-policy/PPO algos -> Check mismatches between
# `rollout_fragment_length` and `train_batch_size_per_learner` to avoid user
# confusion.
# TODO (sven): Make rollout_fragment_length a property and create a private
# attribute to store (possibly) user provided value (or "auto") in. Deprecate
# `self.get_rollout_fragment_length()`.
self.validate_train_batch_size_vs_rollout_fragment_length()
# SGD minibatch size must be smaller than train_batch_size (b/c
# we subsample a batch of `minibatch_size` from the train-batch for
# each `num_epochs`).
if (
not self.enable_rl_module_and_learner
and self.minibatch_size > self.train_batch_size
):
raise ValueError(
f"`minibatch_size` ({self.minibatch_size}) must be <= "
f"`train_batch_size` ({self.train_batch_size}). In PPO, the train batch"
f" will be split into {self.minibatch_size} chunks, each of which "
f"is iterated over (used for updating the policy) {self.num_epochs} "
"times."
)
elif self.enable_rl_module_and_learner:
mbs = self.minibatch_size
tbs = self.train_batch_size_per_learner or self.train_batch_size
if isinstance(mbs, int) and isinstance(tbs, int) and mbs > tbs:
raise ValueError(
f"`minibatch_size` ({mbs}) must be <= "
f"`train_batch_size_per_learner` ({tbs}). In PPO, the train batch"
f" will be split into {mbs} chunks, each of which is iterated over "
f"(used for updating the policy) {self.num_epochs} times."
)
# Episodes may only be truncated (and passed into PPO's
# `postprocessing_fn`), iff generalized advantage estimation is used
# (value function estimate at end of truncated episode to estimate
# remaining value).
if (
not self.in_evaluation
and self.batch_mode == "truncate_episodes"
and not self.use_gae
):
raise ValueError(
"Episode truncation is not supported without a value "
"function (to estimate the return at the end of the truncated"
" trajectory). Consider setting "
"batch_mode=complete_episodes."
)
# Entropy coeff schedule checking.
if self.enable_rl_module_and_learner:
if self.entropy_coeff_schedule is not None:
raise ValueError(
"`entropy_coeff_schedule` is deprecated and must be None! Use the "
"`entropy_coeff` setting to setup a schedule."
)
Scheduler.validate(
fixed_value_or_schedule=self.entropy_coeff,
setting_name="entropy_coeff",
description="entropy coefficient",
)
if isinstance(self.entropy_coeff, float) and self.entropy_coeff < 0.0:
raise ValueError("`entropy_coeff` must be >= 0.0")
@property
@override(AlgorithmConfig)
def _model_config_auto_includes(self) -> Dict[str, Any]:
return super()._model_config_auto_includes | {"vf_share_layers": False}
class PPO(Algorithm):
@classmethod
@override(Algorithm)
def get_default_config(cls) -> AlgorithmConfig:
return PPOConfig()
@classmethod
@override(Algorithm)
def get_default_policy_class(
cls, config: AlgorithmConfig
) -> Optional[Type[Policy]]:
if config["framework"] == "torch":
from ray.rllib.algorithms.ppo.ppo_torch_policy import PPOTorchPolicy
return PPOTorchPolicy
elif config["framework"] == "tf":
from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
return PPOTF1Policy
else:
from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF2Policy
return PPOTF2Policy
@override(Algorithm)
def training_step(self):
# New API stack (RLModule, Learner, EnvRunner, ConnectorV2).
if self.config.enable_env_runner_and_connector_v2:
return self._training_step_new_api_stack()
# Old API stack (Policy, RolloutWorker, Connector, maybe RLModule,
# maybe Learner).
else:
return self._training_step_old_api_stack()
def _training_step_new_api_stack(self) -> ResultDict:
# Collect batches from sample workers until we have a full batch.
with self.metrics.log_time((TIMERS, ENV_RUNNER_SAMPLING_TIMER)):
# Sample in parallel from the workers.
if self.config.count_steps_by == "agent_steps":
episodes, env_runner_results = synchronous_parallel_sample(
worker_set=self.env_runner_group,
max_agent_steps=self.config.total_train_batch_size,
sample_timeout_s=self.config.sample_timeout_s,
_uses_new_env_runners=(
self.config.enable_env_runner_and_connector_v2
),
_return_metrics=True,
)
else:
episodes, env_runner_results = synchronous_parallel_sample(
worker_set=self.env_runner_group,
max_env_steps=self.config.total_train_batch_size,
sample_timeout_s=self.config.sample_timeout_s,
_uses_new_env_runners=(
self.config.enable_env_runner_and_connector_v2
),
_return_metrics=True,
)
# Return early if all our workers failed.
if not episodes:
return {}
# Reduce EnvRunner metrics over the n EnvRunners.
self.metrics.merge_and_log_n_dicts(
env_runner_results, key=ENV_RUNNER_RESULTS
)
# Log lifetime counts for env- and agent steps.
self.metrics.log_dict(
{
NUM_AGENT_STEPS_SAMPLED_LIFETIME: self.metrics.peek(
(ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED)
),
NUM_ENV_STEPS_SAMPLED_LIFETIME: self.metrics.peek(
(ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED)
),
NUM_EPISODES_LIFETIME: self.metrics.peek(
(ENV_RUNNER_RESULTS, NUM_EPISODES)
),
},
reduce="sum",
)
# Perform a learner update step on the collected episodes.
with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)):
learner_results = self.learner_group.update_from_episodes(
episodes=episodes,
timesteps={
NUM_ENV_STEPS_SAMPLED_LIFETIME: (
self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME)
),
},
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
self.metrics.merge_and_log_n_dicts(learner_results, key=LEARNER_RESULTS)
self.metrics.log_dict(
{
NUM_ENV_STEPS_TRAINED_LIFETIME: self.metrics.peek(
(LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED)
),
# NUM_MODULE_STEPS_TRAINED_LIFETIME: self.metrics.peek(
# (LEARNER_RESULTS, NUM_MODULE_STEPS_TRAINED)
# ),
},
reduce="sum",
)
# Update weights - after learning on the local worker - on all remote
# workers.
with self.metrics.log_time((TIMERS, SYNCH_WORKER_WEIGHTS_TIMER)):
# The train results's loss keys are ModuleIDs to their loss values.
# But we also return a total_loss key at the same level as the ModuleID
# keys. So we need to subtract that to get the correct set of ModuleIDs to
# update.
# TODO (sven): We should also not be using `learner_results` as a messenger
# to infer which modules to update. `policies_to_train` might also NOT work
# as it might be a very large set (100s of Modules) vs a smaller Modules
# set that's present in the current train batch.
modules_to_update = set(learner_results[0].keys()) - {ALL_MODULES}
# if self.env_runner_group.num_remote_workers() > 0:
self.env_runner_group.sync_weights(
# Sync weights from learner_group to all EnvRunners.
from_worker_or_learner_group=self.learner_group,
policies=modules_to_update,
inference_only=True,
)
# else:
# weights = self.learner_group.get_weights(inference_only=True)
# self.env_runner.set_weights(weights)
return self.metrics.reduce()
def _training_step_old_api_stack(self) -> ResultDict:
# Collect batches from sample workers until we have a full batch.
with self._timers[SAMPLE_TIMER]:
if self.config.count_steps_by == "agent_steps":
train_batch = synchronous_parallel_sample(
worker_set=self.env_runner_group,
max_agent_steps=self.config.total_train_batch_size,
sample_timeout_s=self.config.sample_timeout_s,
)
else:
train_batch = synchronous_parallel_sample(
worker_set=self.env_runner_group,
max_env_steps=self.config.total_train_batch_size,
sample_timeout_s=self.config.sample_timeout_s,
)
# Return early if all our workers failed.
if not train_batch:
return {}
train_batch = train_batch.as_multi_agent()
self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps()
self._counters[NUM_ENV_STEPS_SAMPLED] += train_batch.env_steps()
# Standardize advantages.
train_batch = standardize_fields(train_batch, ["advantages"])
if self.config.simple_optimizer:
train_results = train_one_step(self, train_batch)
else:
train_results = multi_gpu_train_one_step(self, train_batch)
policies_to_update = list(train_results.keys())
global_vars = {
"timestep": self._counters[NUM_AGENT_STEPS_SAMPLED],
# TODO (sven): num_grad_updates per each policy should be
# accessible via `train_results` (and get rid of global_vars).
"num_grad_updates_per_policy": {
pid: self.env_runner.policy_map[pid].num_grad_updates
for pid in policies_to_update
},
}
# Update weights - after learning on the local worker - on all remote
# workers.
with self._timers[SYNCH_WORKER_WEIGHTS_TIMER]:
if self.env_runner_group.num_remote_workers() > 0:
from_worker_or_learner_group = None
self.env_runner_group.sync_weights(
from_worker_or_learner_group=from_worker_or_learner_group,
policies=policies_to_update,
global_vars=global_vars,
)
# For each policy: Update KL scale and warn about possible issues
for policy_id, policy_info in train_results.items():
# Update KL loss with dynamic scaling
# for each (possibly multiagent) policy we are training
kl_divergence = policy_info[LEARNER_STATS_KEY].get("kl")
self.get_policy(policy_id).update_kl(kl_divergence)
# Warn about excessively high value function loss
scaled_vf_loss = (
self.config.vf_loss_coeff * policy_info[LEARNER_STATS_KEY]["vf_loss"]
)
policy_loss = policy_info[LEARNER_STATS_KEY]["policy_loss"]
if (
log_once("ppo_warned_lr_ratio")
and self.config.get("model", {}).get("vf_share_layers")
and scaled_vf_loss > 100
):
logger.warning(
"The magnitude of your value function loss for policy: {} is "
"extremely large ({}) compared to the policy loss ({}). This "
"can prevent the policy from learning. Consider scaling down "
"the VF loss by reducing vf_loss_coeff, or disabling "
"vf_share_layers.".format(policy_id, scaled_vf_loss, policy_loss)
)
# Warn about bad clipping configs.
train_batch.policy_batches[policy_id].set_get_interceptor(None)
mean_reward = train_batch.policy_batches[policy_id]["rewards"].mean()
if (
log_once("ppo_warned_vf_clip")
and mean_reward > self.config.vf_clip_param
):
self.warned_vf_clip = True
logger.warning(
f"The mean reward returned from the environment is {mean_reward}"
f" but the vf_clip_param is set to {self.config['vf_clip_param']}."
f" Consider increasing it for policy: {policy_id} to improve"
" value function convergence."
)
# Update global vars on local worker as well.
# TODO (simon): At least in RolloutWorker obsolete I guess as called in
# `sync_weights()` called above if remote workers. Can we call this
# where `set_weights()` is called on the local_worker?
self.env_runner.set_global_vars(global_vars)
return train_results