RolloutWorker
RolloutWorker#
RolloutWorkers are used as @ray.remote
actors to collect and return samples
from environments or offline files in parallel. An RLlib Algorithm
usually has
num_workers
RolloutWorker
(not @ray.remote
) in
its WorkerSet
under self.workers
.
Depending on its evaluation config settings, an additional WorkerSet
with
Algorithm
under self.evaluation_workers
.
- class ray.rllib.evaluation.rollout_worker.RolloutWorker(*, env_creator: Callable[[EnvContext], Optional[Any]], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, config: Optional[AlgorithmConfig] = None, worker_index: int = 0, num_workers: Optional[int] = None, recreated_worker: bool = False, log_dir: Optional[str] = None, spaces: Optional[Dict[str, Tuple[<MagicMock name='mock.Space' id='140259249161680'>, <MagicMock name='mock.Space' id='140259249161680'>]]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, dataset_shards: Optional[List[ray.data.dataset.Dataset]] = None, policy_config=-1, input_creator=-1, output_creator=-1, rollout_fragment_length=-1, count_steps_by=-1, batch_mode=-1, episode_horizon=-1, preprocessor_pref=-1, sample_async=-1, compress_observations=-1, num_envs=-1, observation_fn=-1, clip_rewards=-1, normalize_actions=-1, clip_actions=-1, env_config=-1, model_config=-1, remote_worker_envs=-1, remote_env_batch_wait_ms=-1, soft_horizon=-1, no_done_at_end=-1, fake_sampler=-1, seed=-1, log_level=-1, callbacks=-1, disable_env_checking=-1, policy_spec=-1, policy_mapping_fn=-1, policies_to_train=-1, extra_python_environs=-1, policy=-1, tf_session_creator=-1)[source]#
Common experience collection class.
This class wraps a policy instance and an environment class to collect experiences from the environment. You can create many replicas of this class as Ray actors to scale RL training.
This class supports vectorized and multi-agent policy evaluation (e.g., VectorEnv, MultiAgentEnv, etc.)
Examples
>>> # Create a rollout worker and using it to collect experiences. >>> import gymnasium as gym >>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy >>> worker = RolloutWorker( ... env_creator=lambda _: gym.make("CartPole-v1"), ... default_policy_class=PGTF1Policy) >>> print(worker.sample()) SampleBatch({ "obs": [[...]], "actions": [[...]], "rewards": [[...]], "terminateds": [[...]], "truncateds": [[...]], "new_obs": [[...]]}) >>> # Creating a multi-agent rollout worker >>> from gymnasium.spaces import Discrete, Box >>> import random >>> MultiAgentTrafficGrid = ... >>> worker = RolloutWorker( ... env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25), ... config=AlgorithmConfig().multi_agent( ... policies={ ... # Use an ensemble of two policies for car agents ... "car_policy1": ... (PGTFPolicy, Box(...), Discrete(...), ... AlgorithmConfig.overrides(gamma=0.99)), ... "car_policy2": ... (PGTFPolicy, Box(...), Discrete(...), ... AlgorithmConfig.overrides(gamma=0.95)), ... # Use a single shared policy for all traffic lights ... "traffic_light_policy": ... (PGTFPolicy, Box(...), Discrete(...), {}), ... }, ... policy_mapping_fn=( ... lambda agent_id, episode, **kwargs: ... random.choice(["car_policy1", "car_policy2"]) ... if agent_id.startswith("car_") else "traffic_light_policy"), ... ), .. ) >>> print(worker.sample()) MultiAgentBatch({ "car_policy1": SampleBatch(...), "car_policy2": SampleBatch(...), "traffic_light_policy": SampleBatch(...)})
- classmethod as_remote(num_cpus: Optional[int] = None, num_gpus: Optional[Union[int, float]] = None, memory: Optional[int] = None, resources: Optional[dict] = None) type [source]#
Returns RolloutWorker class as a
@ray.remote using given options
.The returned class can then be used to instantiate ray actors.
- Parameters
num_cpus – The number of CPUs to allocate for the remote actor.
num_gpus – The number of GPUs to allocate for the remote actor. This could be a fraction as well.
memory – The heap memory request in bytes for this task/actor, rounded down to the nearest integer.
resources – The default custom resources to allocate for the remote actor.
- Returns
The
@ray.remote
decorated RolloutWorker class.
- __init__(*, env_creator: Callable[[EnvContext], Optional[Any]], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, config: Optional[AlgorithmConfig] = None, worker_index: int = 0, num_workers: Optional[int] = None, recreated_worker: bool = False, log_dir: Optional[str] = None, spaces: Optional[Dict[str, Tuple[<MagicMock name='mock.Space' id='140259249161680'>, <MagicMock name='mock.Space' id='140259249161680'>]]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, dataset_shards: Optional[List[ray.data.dataset.Dataset]] = None, policy_config=-1, input_creator=-1, output_creator=-1, rollout_fragment_length=-1, count_steps_by=-1, batch_mode=-1, episode_horizon=-1, preprocessor_pref=-1, sample_async=-1, compress_observations=-1, num_envs=-1, observation_fn=-1, clip_rewards=-1, normalize_actions=-1, clip_actions=-1, env_config=-1, model_config=-1, remote_worker_envs=-1, remote_env_batch_wait_ms=-1, soft_horizon=-1, no_done_at_end=-1, fake_sampler=-1, seed=-1, log_level=-1, callbacks=-1, disable_env_checking=-1, policy_spec=-1, policy_mapping_fn=-1, policies_to_train=-1, extra_python_environs=-1, policy=-1, tf_session_creator=-1)[source]#
Initializes a RolloutWorker instance.
- Parameters
env_creator – Function that returns a gym.Env given an EnvContext wrapped configuration.
validate_env – Optional callable to validate the generated environment (only on worker=0).
worker_index – For remote workers, this should be set to a non-zero and unique value. This index is passed to created envs through EnvContext so that envs can be configured per worker.
recreated_worker – Whether this worker is a recreated one. Workers are recreated by an Algorithm (via WorkerSet) in case
recreate_failed_workers=True
and one of the original workers (or an already recreated one) has failed. They don’t differ from original workers other than the value of this flag (self.recreated_worker
).log_dir – Directory where logs can be placed.
spaces – An optional space dict mapping policy IDs to (obs_space, action_space)-tuples. This is used in case no Env is created on this RolloutWorker.
- assert_healthy()[source]#
Checks that __init__ has been completed properly.
Useful in case a RolloutWorker is run as @ray.remote (Actor) and the owner would like to make sure the worker has been properly initialized.
- Returns
True if the worker is properly initialized
- sample() Union[SampleBatch, MultiAgentBatch] [source]#
Returns a batch of experience sampled from this worker.
This method must be implemented by subclasses.
- Returns
A columnar batch of experiences (e.g., tensors).
Examples
>>> import gymnasium as gym >>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy >>> worker = RolloutWorker( ... env_creator=lambda _: gym.make("CartPole-v1"), ... default_policy_class=PGTF1Policy, ... config=AlgorithmConfig(), ... ) >>> print(worker.sample()) SampleBatch({"obs": [...], "action": [...], ...})
- sample_with_count() Tuple[Union[SampleBatch, MultiAgentBatch], int] [source]#
Same as sample() but returns the count as a separate value.
- Returns
- A columnar batch of experiences (e.g., tensors) and the
size of the collected batch.
Examples
>>> import gymnasium as gym >>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy >>> worker = RolloutWorker( ... env_creator=lambda _: gym.make("CartPole-v1"), ... default_policy_class=PGTFPolicy) >>> print(worker.sample_with_count()) (SampleBatch({"obs": [...], "action": [...], ...}), 3)
- learn_on_batch(samples: Union[SampleBatch, MultiAgentBatch]) Dict [source]#
Update policies based on the given batch.
This is the equivalent to apply_gradients(compute_gradients(samples)), but can be optimized to avoid pulling gradients into CPU memory.
- Parameters
samples – The SampleBatch or MultiAgentBatch to learn on.
- Returns
Dictionary of extra metadata from compute_gradients().
Examples
>>> import gymnasium as gym >>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy >>> worker = RolloutWorker( ... env_creator=lambda _: gym.make("CartPole-v1"), ... default_policy_class=PGTF1Policy) >>> batch = worker.sample() >>> info = worker.learn_on_batch(samples)
- sample_and_learn(expected_batch_size: int, num_sgd_iter: int, sgd_minibatch_size: str, standardize_fields: List[str]) Tuple[dict, int] [source]#
Sample and batch and learn on it.
This is typically used in combination with distributed allreduce.
- Parameters
expected_batch_size – Expected number of samples to learn on.
num_sgd_iter – Number of SGD iterations.
sgd_minibatch_size – SGD minibatch size.
standardize_fields – List of sample fields to normalize.
- Returns
- A tuple consisting of a dictionary of extra metadata returned from
the policies’
learn_on_batch()
and the number of samples learned on.
- compute_gradients(samples: Union[SampleBatch, MultiAgentBatch], single_agent: bool = None) Tuple[Union[List[Tuple[Union[numpy.array, tf.Tensor, torch.Tensor], Union[numpy.array, tf.Tensor, torch.Tensor]]], List[Union[numpy.array, tf.Tensor, torch.Tensor]]], dict] [source]#
Returns a gradient computed w.r.t the specified samples.
Uses the Policy’s/ies’ compute_gradients method(s) to perform the calculations. Skips policies that are not trainable as per
self.is_policy_to_train()
.- Parameters
samples – The SampleBatch or MultiAgentBatch to compute gradients for using this worker’s trainable policies.
- Returns
In the single-agent case, a tuple consisting of ModelGradients and info dict of the worker’s policy. In the multi-agent case, a tuple consisting of a dict mapping PolicyID to ModelGradients and a dict mapping PolicyID to extra metadata info. Note that the first return value (grads) can be applied as is to a compatible worker using the worker’s
apply_gradients()
method.
Examples
>>> import gymnasium as gym >>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy >>> worker = RolloutWorker( ... env_creator=lambda _: gym.make("CartPole-v1"), ... default_policy_class=PGTF1Policy) >>> batch = worker.sample() >>> grads, info = worker.compute_gradients(samples)
- apply_gradients(grads: Union[List[Tuple[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor], Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]]], List[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]], Dict[str, Union[List[Tuple[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor], Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]]], List[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]]]]]) None [source]#
Applies the given gradients to this worker’s models.
Uses the Policy’s/ies’ apply_gradients method(s) to perform the operations.
- Parameters
grads – Single ModelGradients (single-agent case) or a dict mapping PolicyIDs to the respective model gradients structs.
Examples
>>> import gymnasium as gym >>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy >>> worker = RolloutWorker( ... env_creator=lambda _: gym.make("CartPole-v1"), ... default_policy_class=PGTF1Policy) >>> samples = worker.sample() >>> grads, info = worker.compute_gradients(samples) >>> worker.apply_gradients(grads)
- get_metrics() List[ray.rllib.evaluation.metrics.RolloutMetrics] [source]#
Returns the thus-far collected metrics from this worker’s rollouts.
- Returns
List of RolloutMetrics collected thus-far.
- foreach_env(func: Callable[[Any], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T] [source]#
Calls the given function with each sub-environment as arg.
- Parameters
func – The function to call for each underlying sub-environment (as only arg).
- Returns
The list of return values of all calls to
func([env])
.
- foreach_env_with_context(func: Callable[[Any, ray.rllib.env.env_context.EnvContext], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T] [source]#
Calls given function with each sub-env plus env_ctx as args.
- Parameters
func – The function to call for each underlying sub-environment and its EnvContext (as the args).
- Returns
The list of return values of all calls to
func([env, ctx])
.
- get_policy(policy_id: str = 'default_policy') Optional[ray.rllib.policy.policy.Policy] [source]#
Return policy for the specified id, or None.
- Parameters
policy_id – ID of the policy to return. None for DEFAULT_POLICY_ID (in the single agent case).
- Returns
The policy under the given ID (or None if not found).
- add_policy(policy_id: str, policy_cls: Optional[Type[ray.rllib.policy.policy.Policy]] = None, policy: Optional[ray.rllib.policy.policy.Policy] = None, *, observation_space: Optional[<MagicMock name='mock.Space' id='140259249161680'>] = None, action_space: Optional[<MagicMock name='mock.Space' id='140259249161680'>] = None, config: Optional[dict] = None, policy_state: Optional[Dict[str, Union[numpy.array, tf.Tensor, torch.Tensor, dict, tuple]]] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]]] = None) ray.rllib.policy.policy.Policy [source]#
Adds a new policy to this RolloutWorker.
- Parameters
policy_id – ID of the policy to add.
policy_cls – The Policy class to use for constructing the new Policy. Note: Only one of
policy_cls
orpolicy
must be provided.policy – The Policy instance to add to this algorithm. Note: Only one of
policy_cls
orpolicy
must be provided.observation_space – The observation space of the policy to add.
action_space – The action space of the policy to add.
config – The config overrides for the policy to add.
policy_state – Optional state dict to apply to the new policy instance, right after its construction.
policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.
policies_to_train – An optional container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.
- Returns
The newly added policy.
- Raises
ValueError – If both
policy_cls
ANDpolicy
are provided.KeyError – If the given
policy_id
already exists in this worker’s PolicyMap.
- remove_policy(*, policy_id: str = 'default_policy', policy_mapping_fn: Optional[Callable[[Any], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]]] = None) None [source]#
Removes a policy from this RolloutWorker.
- Parameters
policy_id – ID of the policy to be removed. None for DEFAULT_POLICY_ID.
policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.
policies_to_train – An optional container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.
- set_policy_mapping_fn(policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None) None [source]#
Sets
self.policy_mapping_fn
to a new callable (if provided).- Parameters
policy_mapping_fn – The new mapping function to use. If None, will keep the existing mapping function in place.
- set_is_policy_to_train(is_policy_to_train: Union[Container[str], Callable[[str, Optional[Union[SampleBatch, MultiAgentBatch]]], bool]]) None [source]#
Sets
self.is_policy_to_train()
to a new callable.- Parameters
is_policy_to_train – A container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.
- get_policies_to_train(batch: Optional[Union[SampleBatch, MultiAgentBatch]] = None) Set[str] [source]#
Returns all policies-to-train, given an optional batch.
Loops through all policies currently in
self.policy_map
and checks the return value ofself.is_policy_to_train(pid, batch)
.- Parameters
batch – An optional SampleBatchType for the
self.is_policy_to_train(pid, [batch]?)
check.- Returns
The set of currently trainable policy IDs, given the optional
batch
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- for_policy(func: Callable[[ray.rllib.policy.policy.Policy, Optional[Any]], ray.rllib.utils.typing.T], policy_id: Optional[str] = 'default_policy', **kwargs) ray.rllib.utils.typing.T [source]#
Calls the given function with the specified policy as first arg.
- Parameters
func – The function to call with the policy as first arg.
policy_id – The PolicyID of the policy to call the function with.
- Keyword Arguments
kwargs – Additional kwargs to be passed to the call.
- Returns
The return value of the function call.
- foreach_policy(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T] [source]#
Calls the given function with each (policy, policy_id) tuple.
- Parameters
func – The function to call with each (policy, policy ID) tuple.
- Keyword Arguments
kwargs – Additional kwargs to be passed to the call.
- Returns
- The list of return values of all calls to
func([policy, pid, **kwargs])
.
- foreach_policy_to_train(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T] [source]#
Calls the given function with each (policy, policy_id) tuple.
Only those policies/IDs will be called on, for which
self.is_policy_to_train()
returns True.- Parameters
func – The function to call with each (policy, policy ID) tuple, for only those policies that
self.is_policy_to_train
returns True.- Keyword Arguments
kwargs – Additional kwargs to be passed to the call.
- Returns
The list of return values of all calls to
func([policy, pid, **kwargs])
.
- sync_filters(new_filters: dict) None [source]#
Changes self’s filter to given and rebases any accumulated delta.
- Parameters
new_filters – Filters with new state to update local copy.
- get_filters(flush_after: bool = False) Dict [source]#
Returns a snapshot of filters.
- Parameters
flush_after – Clears the filter buffer state.
- Returns
Dict for serializable filters
- get_state() dict [source]#
Serializes this RolloutWorker’s current state and returns it.
- Returns
The current state of this RolloutWorker as a serialized, pickled byte sequence.
- set_state(state: dict) None [source]#
Restores this RolloutWorker’s state from a state dict.
- Parameters
state – The state dict to restore this worker’s state from.
Examples
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> # Create a RolloutWorker. >>> worker = ... >>> state = worker.get_state() >>> new_worker = RolloutWorker(...) >>> new_worker.set_state(state)
- get_weights(policies: Optional[Container[str]] = None) Dict[str, dict] [source]#
Returns each policies’ model weights of this worker.
- Parameters
policies – List of PolicyIDs to get the weights from. Use None for all policies.
- Returns
Dict mapping PolicyIDs to ModelWeights.
Examples
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> # Create a RolloutWorker. >>> worker = ... >>> weights = worker.get_weights() >>> print(weights) {"default_policy": {"layer1": array(...), "layer2": ...}}
- set_weights(weights: Dict[str, dict], global_vars: Optional[Dict] = None, weights_seq_no: Optional[int] = None) None [source]#
Sets each policies’ model weights of this worker.
- Parameters
weights – Dict mapping PolicyIDs to the new weights to be used.
global_vars – An optional global vars dict to set this worker to. If None, do not update the global_vars.
weights_seq_no – If needed, a sequence number for the weights version can be passed into this method. If not None, will store this seq no (in self.weights_seq_no) and in future calls - if the seq no did not change wrt. the last call - will ignore the call to save on performance.
Examples
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> # Create a RolloutWorker. >>> worker = ... >>> weights = worker.get_weights() >>> # Set `global_vars` (timestep) as well. >>> worker.set_weights(weights, {"timestep": 42})
- get_global_vars() dict [source]#
Returns the current
self.global_vars
dict of this RolloutWorker.- Returns
The current
self.global_vars
dict of this RolloutWorker.
Examples
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker >>> # Create a RolloutWorker. >>> worker = ... >>> global_vars = worker.get_global_vars() >>> print(global_vars) {"timestep": 424242}
- set_global_vars(global_vars: dict, policy_ids: Optional[List[str]] = None) None [source]#
Updates this worker’s and all its policies’ global vars.
Updates are done using the dict’s update method.
- Parameters
global_vars – The global_vars dict to update the
self.global_vars
dict from.policy_ids – Optional list of Policy IDs to update. If None, will update all policies on the to-be-updated workers.
Examples
>>> worker = ... >>> global_vars = worker.set_global_vars( ... {"timestep": 4242})