Note
Ray 2.10.0 introduces the alpha stage of RLlib’s “new API stack”. The Ray Team plans to transition algorithms, example scripts, and documentation to the new code base thereby incrementally replacing the “old API stack” (e.g., ModelV2, Policy, RolloutWorker) throughout the subsequent minor releases leading up to Ray 3.0.
See here for more details on how to use the new API stack.
Learner (Alpha)#
Learner
allows you to abstract the training
logic of RLModules. It supports both gradient-based and non-gradient-based updates (e.g.
polyak averaging, etc.) The API enables you to distribute the Learner using data-
distributed parallel (DDP). The Learner achieves the following:
Facilitates gradient-based updates on RLModule.
Provides abstractions for non-gradient based updates such as polyak averaging, etc.
Reporting training statistics.
Checkpoints the modules and optimizer states for durable training.
The Learner
class supports data-distributed-
parallel style training using the
LearnerGroup
API. Under this paradigm,
the LearnerGroup
maintains multiple
copies of the same Learner
with identical
parameters and hyperparameters. Each of these
Learner
instances computes the loss and gradients on a
shard of a sample batch and then accumulates the gradients across the
Learner
instances. Learn more about data-distributed
parallel learning in
this article.
LearnerGroup
also allows for
asynchronous training and (distributed) checkpointing for durability during training.
Enabling Learner API in RLlib experiments#
Adjust the amount of resources for training using the
num_gpus_per_learner
, num_cpus_per_learner
, and num_learners
arguments in the AlgorithmConfig
.
from ray.rllib.algorithms.ppo.ppo import PPOConfig
config = (
PPOConfig()
.api_stack(
enable_rl_module_and_learner=True,
enable_env_runner_and_connector_v2=True,
)
.learners(
num_learners=0, # Set this to greater than 1 to allow for DDP style updates.
num_gpus_per_learner=0, # Set this to 1 to enable GPU training.
num_cpus_per_learner=1,
)
)
Note
This features is in alpha. If you migrate to this algorithm, enable the feature by
via AlgorithmConfig.api_stack(enable_rl_module_and_learner=True, enable_env_runner_and_connector_v2=True)
.
The following algorithms support Learner
out of the box. Implement
an algorithm with a custom Learner
to leverage this API for other algorithms.
Algorithm |
Supported Framework |
---|---|
PPO |
|
Impala |
|
APPO |
Basic usage#
Use the LearnerGroup
utility to interact with multiple learners.
Construction#
If you enable the RLModule
and Learner
APIs via the AlgorithmConfig
, then calling build()
constructs a LearnerGroup
for you, but if you’re using these APIs standalone, you can construct the LearnerGroup
as follows.
env = gym.make("CartPole-v1")
# Create an AlgorithmConfig object from which we can build the
# LearnerGroup.
config = (
PPOConfig()
# Number of Learner workers (Ray actors).
# Use 0 for no actors, only create a local Learner.
# Use >=1 to create n DDP-style Learner workers (Ray actors).
.learners(num_learners=1)
# Specify the learner's hyperparameters.
.training(
use_kl_loss=True,
kl_coeff=0.01,
kl_target=0.05,
clip_param=0.2,
vf_clip_param=0.2,
entropy_coeff=0.05,
vf_loss_coeff=0.5
)
)
# Construct a new LearnerGroup using our config object.
learner_group = config.build_learner_group(env=env)
env = gym.make("CartPole-v1")
# Create an AlgorithmConfig object from which we can build the
# Learner.
config = (
PPOConfig()
# Specify the Learner's hyperparameters.
.training(
use_kl_loss=True,
kl_coeff=0.01,
kl_target=0.05,
clip_param=0.2,
vf_clip_param=0.2,
entropy_coeff=0.05,
vf_loss_coeff=0.5
)
)
# Construct a new Learner using our config object.
learner = config.build_learner(env=env)
Updates#
TIMESTEPS = {"num_env_steps_sampled_lifetime": 250}
# This is a blocking update.
results = learner_group.update_from_batch(batch=DUMMY_BATCH, timesteps=TIMESTEPS)
# This is a non-blocking update. The results are returned in a future
# call to `update_from_batch(..., async_update=True)`
_ = learner_group.update_from_batch(batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS)
# Artificially wait for async request to be done to get the results
# in the next call to
# `LearnerGroup.update_from_batch(..., async_update=True)`.
time.sleep(5)
results = learner_group.update_from_batch(
batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS
)
# `results` is a list of n items (where n is the number of async results collected).
assert isinstance(results, list), results
# Each item in that list is another list of m items (where m is the number of Learner
# workers).
assert isinstance(results[0], list), results
# Each item in the inner list is a result dict from the Learner worker.
assert isinstance(results[0][0], dict), results
When updating a LearnerGroup
you can perform blocking or async updates on batches of data.
Async updates are necessary for implementing async algorithms such as APPO/IMPALA.
# This is a blocking update (given a training batch).
result = learner.update_from_batch(batch=DUMMY_BATCH, timesteps=TIMESTEPS)
When updating a Learner
you can only perform blocking updates on batches of data.
You can perform non-gradient based updates before or after the gradient-based ones by overriding
before_gradient_based_update()
and
after_gradient_based_update()
.
Getting and setting state#
# Get the LearnerGroup's RLModule weights and optimizer states.
state = learner_group.get_state()
learner_group.set_state(state)
# Only get the RLModule weights.
weights = learner_group.get_weights()
learner_group.set_weights(weights)
Set/get the state dict of all learners through learner_group through
LearnerGroup.set_state
or LearnerGroup.get_state
.
This includes the neural network weights
and the optimizer states on each learner. For example an Adam optimizer’s state
has momentum information based on recently computed gradients.
If you only want to get or set the weights of the RLModules (neural networks) of
all Learners, you can do so through the LearnerGroup APIs
LearnerGroup.get_weights
and LearnerGroup.set_weights
.
from ray.rllib.core import COMPONENT_RL_MODULE
# Get the Learner's RLModule weights and optimizer states.
state = learner.get_state()
# Note that `state` is now a dict:
# {
# COMPONENT_RL_MODULE: [RLModule's state],
# COMPONENT_OPTIMIZER: [Optimizer states],
# }
learner.set_state(state)
# Only get the RLModule weights (as numpy, not torch/tf).
rl_module_only_state = learner.get_state(components=COMPONENT_RL_MODULE)
# Note that `rl_module_only_state` is now a dict:
# {COMPONENT_RL_MODULE: [RLModule's state]}
learner.module.set_state(rl_module_only_state)
You can set and get the entire state of a Learner
using set_state()
and get_state()
.
For getting only the RLModule’s weights (without optimizer states), use
the components=COMPONENT_RL_MODULE
arg in get_state()
(see code above).
For setting only the RLModule’s weights (without touching the optimizer states), use
get_state()
and pass in a dict:
{COMPONENT_RL_MODULE: [RLModule's state]}
(see code above).
Checkpointing#
learner_group.save_to_path(LEARNER_GROUP_CKPT_DIR)
learner_group.restore_from_path(LEARNER_GROUP_CKPT_DIR)
Checkpoint the state of all learners in the LearnerGroup
through save_to_path()
and restore
the state of a saved LearnerGroup
through restore_from_path()
.
A LearnerGroup’s state includes the neural network weights and all optimizer states.
Note that since the state of all of the Learner
instances is identical,
only the states from the first Learner
are saved.
learner.save_to_path(LEARNER_CKPT_DIR)
learner.restore_from_path(LEARNER_CKPT_DIR)
Checkpoint the state of a Learner
through save_to_path()
and restore the state
of a saved Learner
through restore_from_path()
.
A Learner’s state includes the neural network weights and all optimizer states.
Implementation#
Learner
has many APIs for flexible implementation, however the core ones that you need to implement are:
Method |
Description |
---|---|
set up any optimizers for a RLModule. |
|
calculate the loss for gradient based update to a module. |
|
do any non-gradient based updates to a RLModule before(!) the gradient based ones, e.g. add noise to your network. |
|
do any non-gradient based updates to a RLModule after(!) the gradient based ones, e.g. update a loss coefficient based on some schedule. |
Starter Example#
A Learner
that implements behavior cloning could look like the following:
class BCTorchLearner(TorchLearner):
@override(Learner)
def compute_loss_for_module(
self,
*,
module_id: ModuleID,
config: AlgorithmConfig = None,
batch: Dict[str, Any],
fwd_out: Dict[str, TensorType],
) -> TensorType:
# standard behavior cloning loss
action_dist_inputs = fwd_out[SampleBatch.ACTION_DIST_INPUTS]
action_dist_class = self._module[module_id].get_train_action_dist_cls()
action_dist = action_dist_class.from_logits(action_dist_inputs)
loss = -torch.mean(action_dist.logp(batch[SampleBatch.ACTIONS]))
return loss