RolloutWorker#

RolloutWorkers are used as @ray.remote actors to collect and return samples from environments or offline files in parallel. An RLlib Algorithm usually has num_workers RolloutWorker (not @ray.remote) in its WorkerSet under self.workers.

Depending on its evaluation config settings, an additional WorkerSet with Algorithm under self.evaluation_workers.

class ray.rllib.evaluation.rollout_worker.RolloutWorker(*, env_creator: Callable[[EnvContext], Optional[Any]], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, config: Optional[AlgorithmConfig] = None, worker_index: int = 0, num_workers: Optional[int] = None, recreated_worker: bool = False, log_dir: Optional[str] = None, spaces: Optional[Dict[str, Tuple[<MagicMock name='mock.Space' id='140259249161680'>, <MagicMock name='mock.Space' id='140259249161680'>]]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, dataset_shards: Optional[List[ray.data.dataset.Dataset]] = None, policy_config=-1, input_creator=-1, output_creator=-1, rollout_fragment_length=-1, count_steps_by=-1, batch_mode=-1, episode_horizon=-1, preprocessor_pref=-1, sample_async=-1, compress_observations=-1, num_envs=-1, observation_fn=-1, clip_rewards=-1, normalize_actions=-1, clip_actions=-1, env_config=-1, model_config=-1, remote_worker_envs=-1, remote_env_batch_wait_ms=-1, soft_horizon=-1, no_done_at_end=-1, fake_sampler=-1, seed=-1, log_level=-1, callbacks=-1, disable_env_checking=-1, policy_spec=-1, policy_mapping_fn=-1, policies_to_train=-1, extra_python_environs=-1, policy=-1, tf_session_creator=-1)[source]#

Common experience collection class.

This class wraps a policy instance and an environment class to collect experiences from the environment. You can create many replicas of this class as Ray actors to scale RL training.

This class supports vectorized and multi-agent policy evaluation (e.g., VectorEnv, MultiAgentEnv, etc.)

Examples

>>> # Create a rollout worker and using it to collect experiences.
>>> import gymnasium as gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v1"), 
...   default_policy_class=PGTF1Policy) 
>>> print(worker.sample()) 
SampleBatch({
    "obs": [[...]], "actions": [[...]], "rewards": [[...]],
    "terminateds": [[...]], "truncateds": [[...]], "new_obs": [[...]]})
>>> # Creating a multi-agent rollout worker
>>> from gymnasium.spaces import Discrete, Box
>>> import random
>>> MultiAgentTrafficGrid = ... 
>>> worker = RolloutWorker( 
...   env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25),
...   config=AlgorithmConfig().multi_agent(
...     policies={ 
...       # Use an ensemble of two policies for car agents
...       "car_policy1": 
...         (PGTFPolicy, Box(...), Discrete(...),
...          AlgorithmConfig.overrides(gamma=0.99)),
...       "car_policy2": 
...         (PGTFPolicy, Box(...), Discrete(...),
...          AlgorithmConfig.overrides(gamma=0.95)),
...       # Use a single shared policy for all traffic lights
...       "traffic_light_policy":
...         (PGTFPolicy, Box(...), Discrete(...), {}),
...     },
...     policy_mapping_fn=(
...       lambda agent_id, episode, **kwargs:
...       random.choice(["car_policy1", "car_policy2"])
...       if agent_id.startswith("car_") else "traffic_light_policy"),
...     ),
..  )
>>> print(worker.sample()) 
MultiAgentBatch({
    "car_policy1": SampleBatch(...),
    "car_policy2": SampleBatch(...),
    "traffic_light_policy": SampleBatch(...)})
classmethod as_remote(num_cpus: Optional[int] = None, num_gpus: Optional[Union[int, float]] = None, memory: Optional[int] = None, resources: Optional[dict] = None) type[source]#

Returns RolloutWorker class as a @ray.remote using given options.

The returned class can then be used to instantiate ray actors.

Parameters
  • num_cpus – The number of CPUs to allocate for the remote actor.

  • num_gpus – The number of GPUs to allocate for the remote actor. This could be a fraction as well.

  • memory – The heap memory request in bytes for this task/actor, rounded down to the nearest integer.

  • resources – The default custom resources to allocate for the remote actor.

Returns

The @ray.remote decorated RolloutWorker class.

__init__(*, env_creator: Callable[[EnvContext], Optional[Any]], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, config: Optional[AlgorithmConfig] = None, worker_index: int = 0, num_workers: Optional[int] = None, recreated_worker: bool = False, log_dir: Optional[str] = None, spaces: Optional[Dict[str, Tuple[<MagicMock name='mock.Space' id='140259249161680'>, <MagicMock name='mock.Space' id='140259249161680'>]]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, dataset_shards: Optional[List[ray.data.dataset.Dataset]] = None, policy_config=-1, input_creator=-1, output_creator=-1, rollout_fragment_length=-1, count_steps_by=-1, batch_mode=-1, episode_horizon=-1, preprocessor_pref=-1, sample_async=-1, compress_observations=-1, num_envs=-1, observation_fn=-1, clip_rewards=-1, normalize_actions=-1, clip_actions=-1, env_config=-1, model_config=-1, remote_worker_envs=-1, remote_env_batch_wait_ms=-1, soft_horizon=-1, no_done_at_end=-1, fake_sampler=-1, seed=-1, log_level=-1, callbacks=-1, disable_env_checking=-1, policy_spec=-1, policy_mapping_fn=-1, policies_to_train=-1, extra_python_environs=-1, policy=-1, tf_session_creator=-1)[source]#

Initializes a RolloutWorker instance.

Parameters
  • env_creator – Function that returns a gym.Env given an EnvContext wrapped configuration.

  • validate_env – Optional callable to validate the generated environment (only on worker=0).

  • worker_index – For remote workers, this should be set to a non-zero and unique value. This index is passed to created envs through EnvContext so that envs can be configured per worker.

  • recreated_worker – Whether this worker is a recreated one. Workers are recreated by an Algorithm (via WorkerSet) in case recreate_failed_workers=True and one of the original workers (or an already recreated one) has failed. They don’t differ from original workers other than the value of this flag (self.recreated_worker).

  • log_dir – Directory where logs can be placed.

  • spaces – An optional space dict mapping policy IDs to (obs_space, action_space)-tuples. This is used in case no Env is created on this RolloutWorker.

assert_healthy()[source]#

Checks that __init__ has been completed properly.

Useful in case a RolloutWorker is run as @ray.remote (Actor) and the owner would like to make sure the worker has been properly initialized.

Returns

True if the worker is properly initialized

sample() Union[SampleBatch, MultiAgentBatch][source]#

Returns a batch of experience sampled from this worker.

This method must be implemented by subclasses.

Returns

A columnar batch of experiences (e.g., tensors).

Examples

>>> import gymnasium as gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v1"), 
...   default_policy_class=PGTF1Policy, 
...   config=AlgorithmConfig(), 
... )
>>> print(worker.sample()) 
SampleBatch({"obs": [...], "action": [...], ...})
sample_with_count() Tuple[Union[SampleBatch, MultiAgentBatch], int][source]#

Same as sample() but returns the count as a separate value.

Returns

A columnar batch of experiences (e.g., tensors) and the

size of the collected batch.

Examples

>>> import gymnasium as gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v1"), 
...   default_policy_class=PGTFPolicy) 
>>> print(worker.sample_with_count()) 
(SampleBatch({"obs": [...], "action": [...], ...}), 3)
learn_on_batch(samples: Union[SampleBatch, MultiAgentBatch]) Dict[source]#

Update policies based on the given batch.

This is the equivalent to apply_gradients(compute_gradients(samples)), but can be optimized to avoid pulling gradients into CPU memory.

Parameters

samples – The SampleBatch or MultiAgentBatch to learn on.

Returns

Dictionary of extra metadata from compute_gradients().

Examples

>>> import gymnasium as gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v1"), 
...   default_policy_class=PGTF1Policy) 
>>> batch = worker.sample() 
>>> info = worker.learn_on_batch(samples) 
sample_and_learn(expected_batch_size: int, num_sgd_iter: int, sgd_minibatch_size: str, standardize_fields: List[str]) Tuple[dict, int][source]#

Sample and batch and learn on it.

This is typically used in combination with distributed allreduce.

Parameters
  • expected_batch_size – Expected number of samples to learn on.

  • num_sgd_iter – Number of SGD iterations.

  • sgd_minibatch_size – SGD minibatch size.

  • standardize_fields – List of sample fields to normalize.

Returns

A tuple consisting of a dictionary of extra metadata returned from

the policies’ learn_on_batch() and the number of samples learned on.

compute_gradients(samples: Union[SampleBatch, MultiAgentBatch], single_agent: bool = None) Tuple[Union[List[Tuple[Union[numpy.array, tf.Tensor, torch.Tensor], Union[numpy.array, tf.Tensor, torch.Tensor]]], List[Union[numpy.array, tf.Tensor, torch.Tensor]]], dict][source]#

Returns a gradient computed w.r.t the specified samples.

Uses the Policy’s/ies’ compute_gradients method(s) to perform the calculations. Skips policies that are not trainable as per self.is_policy_to_train().

Parameters

samples – The SampleBatch or MultiAgentBatch to compute gradients for using this worker’s trainable policies.

Returns

In the single-agent case, a tuple consisting of ModelGradients and info dict of the worker’s policy. In the multi-agent case, a tuple consisting of a dict mapping PolicyID to ModelGradients and a dict mapping PolicyID to extra metadata info. Note that the first return value (grads) can be applied as is to a compatible worker using the worker’s apply_gradients() method.

Examples

>>> import gymnasium as gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v1"), 
...   default_policy_class=PGTF1Policy) 
>>> batch = worker.sample() 
>>> grads, info = worker.compute_gradients(samples) 
apply_gradients(grads: Union[List[Tuple[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor], Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]]], List[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]], Dict[str, Union[List[Tuple[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor], Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]]], List[Union[numpy.array, tensorflow.python.framework.ops.Tensor, torch.Tensor]]]]]) None[source]#

Applies the given gradients to this worker’s models.

Uses the Policy’s/ies’ apply_gradients method(s) to perform the operations.

Parameters

grads – Single ModelGradients (single-agent case) or a dict mapping PolicyIDs to the respective model gradients structs.

Examples

>>> import gymnasium as gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.algorithms.pg.pg_tf_policy import PGTF1Policy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v1"), 
...   default_policy_class=PGTF1Policy) 
>>> samples = worker.sample() 
>>> grads, info = worker.compute_gradients(samples) 
>>> worker.apply_gradients(grads) 
get_metrics() List[ray.rllib.evaluation.metrics.RolloutMetrics][source]#

Returns the thus-far collected metrics from this worker’s rollouts.

Returns

List of RolloutMetrics collected thus-far.

foreach_env(func: Callable[[Any], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T][source]#

Calls the given function with each sub-environment as arg.

Parameters

func – The function to call for each underlying sub-environment (as only arg).

Returns

The list of return values of all calls to func([env]).

foreach_env_with_context(func: Callable[[Any, ray.rllib.env.env_context.EnvContext], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T][source]#

Calls given function with each sub-env plus env_ctx as args.

Parameters

func – The function to call for each underlying sub-environment and its EnvContext (as the args).

Returns

The list of return values of all calls to func([env, ctx]).

get_policy(policy_id: str = 'default_policy') Optional[ray.rllib.policy.policy.Policy][source]#

Return policy for the specified id, or None.

Parameters

policy_id – ID of the policy to return. None for DEFAULT_POLICY_ID (in the single agent case).

Returns

The policy under the given ID (or None if not found).

add_policy(policy_id: str, policy_cls: Optional[Type[ray.rllib.policy.policy.Policy]] = None, policy: Optional[ray.rllib.policy.policy.Policy] = None, *, observation_space: Optional[<MagicMock name='mock.Space' id='140259249161680'>] = None, action_space: Optional[<MagicMock name='mock.Space' id='140259249161680'>] = None, config: Optional[dict] = None, policy_state: Optional[Dict[str, Union[numpy.array, tf.Tensor, torch.Tensor, dict, tuple]]] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]]] = None) ray.rllib.policy.policy.Policy[source]#

Adds a new policy to this RolloutWorker.

Parameters
  • policy_id – ID of the policy to add.

  • policy_cls – The Policy class to use for constructing the new Policy. Note: Only one of policy_cls or policy must be provided.

  • policy – The Policy instance to add to this algorithm. Note: Only one of policy_cls or policy must be provided.

  • observation_space – The observation space of the policy to add.

  • action_space – The action space of the policy to add.

  • config – The config overrides for the policy to add.

  • policy_state – Optional state dict to apply to the new policy instance, right after its construction.

  • policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.

  • policies_to_train – An optional container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.

Returns

The newly added policy.

Raises
  • ValueError – If both policy_cls AND policy are provided.

  • KeyError – If the given policy_id already exists in this worker’s PolicyMap.

remove_policy(*, policy_id: str = 'default_policy', policy_mapping_fn: Optional[Callable[[Any], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]]] = None) None[source]#

Removes a policy from this RolloutWorker.

Parameters
  • policy_id – ID of the policy to be removed. None for DEFAULT_POLICY_ID.

  • policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.

  • policies_to_train – An optional container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.

set_policy_mapping_fn(policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None) None[source]#

Sets self.policy_mapping_fn to a new callable (if provided).

Parameters

policy_mapping_fn – The new mapping function to use. If None, will keep the existing mapping function in place.

set_is_policy_to_train(is_policy_to_train: Union[Container[str], Callable[[str, Optional[Union[SampleBatch, MultiAgentBatch]]], bool]]) None[source]#

Sets self.is_policy_to_train() to a new callable.

Parameters

is_policy_to_train – A container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.

get_policies_to_train(batch: Optional[Union[SampleBatch, MultiAgentBatch]] = None) Set[str][source]#

Returns all policies-to-train, given an optional batch.

Loops through all policies currently in self.policy_map and checks the return value of self.is_policy_to_train(pid, batch).

Parameters

batch – An optional SampleBatchType for the self.is_policy_to_train(pid, [batch]?) check.

Returns

The set of currently trainable policy IDs, given the optional batch.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

for_policy(func: Callable[[ray.rllib.policy.policy.Policy, Optional[Any]], ray.rllib.utils.typing.T], policy_id: Optional[str] = 'default_policy', **kwargs) ray.rllib.utils.typing.T[source]#

Calls the given function with the specified policy as first arg.

Parameters
  • func – The function to call with the policy as first arg.

  • policy_id – The PolicyID of the policy to call the function with.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The return value of the function call.

foreach_policy(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T][source]#

Calls the given function with each (policy, policy_id) tuple.

Parameters

func – The function to call with each (policy, policy ID) tuple.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The list of return values of all calls to

func([policy, pid, **kwargs]).

foreach_policy_to_train(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T][source]#

Calls the given function with each (policy, policy_id) tuple.

Only those policies/IDs will be called on, for which self.is_policy_to_train() returns True.

Parameters

func – The function to call with each (policy, policy ID) tuple, for only those policies that self.is_policy_to_train returns True.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The list of return values of all calls to func([policy, pid, **kwargs]).

sync_filters(new_filters: dict) None[source]#

Changes self’s filter to given and rebases any accumulated delta.

Parameters

new_filters – Filters with new state to update local copy.

get_filters(flush_after: bool = False) Dict[source]#

Returns a snapshot of filters.

Parameters

flush_after – Clears the filter buffer state.

Returns

Dict for serializable filters

get_state() dict[source]#

Serializes this RolloutWorker’s current state and returns it.

Returns

The current state of this RolloutWorker as a serialized, pickled byte sequence.

set_state(state: dict) None[source]#

Restores this RolloutWorker’s state from a state dict.

Parameters

state – The state dict to restore this worker’s state from.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> state = worker.get_state() 
>>> new_worker = RolloutWorker(...) 
>>> new_worker.set_state(state) 
get_weights(policies: Optional[Container[str]] = None) Dict[str, dict][source]#

Returns each policies’ model weights of this worker.

Parameters

policies – List of PolicyIDs to get the weights from. Use None for all policies.

Returns

Dict mapping PolicyIDs to ModelWeights.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> weights = worker.get_weights() 
>>> print(weights) 
{"default_policy": {"layer1": array(...), "layer2": ...}}
set_weights(weights: Dict[str, dict], global_vars: Optional[Dict] = None, weights_seq_no: Optional[int] = None) None[source]#

Sets each policies’ model weights of this worker.

Parameters
  • weights – Dict mapping PolicyIDs to the new weights to be used.

  • global_vars – An optional global vars dict to set this worker to. If None, do not update the global_vars.

  • weights_seq_no – If needed, a sequence number for the weights version can be passed into this method. If not None, will store this seq no (in self.weights_seq_no) and in future calls - if the seq no did not change wrt. the last call - will ignore the call to save on performance.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> weights = worker.get_weights() 
>>> # Set `global_vars` (timestep) as well.
>>> worker.set_weights(weights, {"timestep": 42}) 
get_global_vars() dict[source]#

Returns the current self.global_vars dict of this RolloutWorker.

Returns

The current self.global_vars dict of this RolloutWorker.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> global_vars = worker.get_global_vars() 
>>> print(global_vars) 
{"timestep": 424242}
set_global_vars(global_vars: dict, policy_ids: Optional[List[str]] = None) None[source]#

Updates this worker’s and all its policies’ global vars.

Updates are done using the dict’s update method.

Parameters
  • global_vars – The global_vars dict to update the self.global_vars dict from.

  • policy_ids – Optional list of Policy IDs to update. If None, will update all policies on the to-be-updated workers.

Examples

>>> worker = ... 
>>> global_vars = worker.set_global_vars( 
...     {"timestep": 4242})
stop() None[source]#

Releases all resources used by this RolloutWorker.

lock() None[source]#

Locks this RolloutWorker via its own threading.Lock.

unlock() None[source]#

Unlocks this RolloutWorker via its own threading.Lock.

setup_torch_data_parallel(url: str, world_rank: int, world_size: int, backend: str) None[source]#

Join a torch process group for distributed SGD.

creation_args() dict[source]#

Returns the kwargs dict used to create this worker.

get_host() str[source]#

Returns the hostname of the process running this evaluator.

get_node_ip() str[source]#

Returns the IP address of the node that this worker runs on.

find_free_port() int[source]#

Finds a free port on the node that this worker runs on.