RolloutWorker

RolloutWorkers are used as @ray.remote actors to collect and return samples from environments or offline files in parallel. An RLlib Trainer usually has num_workers RolloutWorker (not @ray.remote) in its WorkerSet under self.workers.

Depending on its evaluation config settings, an additional WorkerSet with Trainer under self.evaluation_workers.

class ray.rllib.evaluation.rollout_worker.RolloutWorker(*, env_creator: Callable[[ray.rllib.env.env_context.EnvContext], Any], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, policy_spec: Optional[Union[type, Dict[str, ray.rllib.policy.policy.PolicySpec]]] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Optional[List[str]] = None, tf_session_creator: Optional[Callable[[], tf1.Session]] = None, rollout_fragment_length: int = 100, count_steps_by: str = 'env_steps', batch_mode: str = 'truncate_episodes', episode_horizon: Optional[int] = None, preprocessor_pref: str = 'deepmind', sample_async: bool = False, compress_observations: bool = False, num_envs: int = 1, observation_fn: Optional[ObservationFunction] = None, observation_filter: str = 'NoFilter', clip_rewards: Optional[Union[bool, float]] = None, normalize_actions: bool = True, clip_actions: bool = False, env_config: Optional[dict] = None, model_config: Optional[dict] = None, policy_config: Optional[dict] = None, worker_index: int = 0, num_workers: int = 0, record_env: Union[bool, str] = False, log_dir: Optional[str] = None, log_level: Optional[str] = None, callbacks: Type[DefaultCallbacks] = None, input_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.input_reader.InputReader] = <function RolloutWorker.<lambda>>, input_evaluation: List[str] = frozenset({}), output_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.output_writer.OutputWriter] = <function RolloutWorker.<lambda>>, remote_worker_envs: bool = False, remote_env_batch_wait_ms: int = 0, soft_horizon: bool = False, no_done_at_end: bool = False, seed: int = None, extra_python_environs: Optional[dict] = None, fake_sampler: bool = False, spaces: Optional[Dict[str, Tuple[<Mock name='mock.Space' id='139993908747600'>, <Mock name='mock.Space' id='139993908747600'>]]] = None, policy=None, monitor_path=None)[source]

Common experience collection class.

This class wraps a policy instance and an environment class to collect experiences from the environment. You can create many replicas of this class as Ray actors to scale RL training.

This class supports vectorized and multi-agent policy evaluation (e.g., VectorEnv, MultiAgentEnv, etc.)

Examples

>>> # Create a rollout worker and using it to collect experiences.
>>> worker = RolloutWorker(
...   env_creator=lambda _: gym.make("CartPole-v0"),
...   policy_spec=PGTFPolicy)
>>> print(worker.sample())
SampleBatch({
    "obs": [[...]], "actions": [[...]], "rewards": [[...]],
    "dones": [[...]], "new_obs": [[...]]})
>>> # Creating a multi-agent rollout worker
>>> worker = RolloutWorker(
...   env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25),
...   policy_spec={
...       # Use an ensemble of two policies for car agents
...       "car_policy1":
...         (PGTFPolicy, Box(...), Discrete(...), {"gamma": 0.99}),
...       "car_policy2":
...         (PGTFPolicy, Box(...), Discrete(...), {"gamma": 0.95}),
...       # Use a single shared policy for all traffic lights
...       "traffic_light_policy":
...         (PGTFPolicy, Box(...), Discrete(...), {}),
...   },
...   policy_mapping_fn=lambda agent_id, episode, **kwargs:
...     random.choice(["car_policy1", "car_policy2"])
...     if agent_id.startswith("car_") else "traffic_light_policy")
>>> print(worker.sample())
MultiAgentBatch({
    "car_policy1": SampleBatch(...),
    "car_policy2": SampleBatch(...),
    "traffic_light_policy": SampleBatch(...)})
classmethod as_remote(num_cpus: Optional[int] = None, num_gpus: Optional[Union[int, float]] = None, memory: Optional[int] = None, object_store_memory: Optional[int] = None, resources: Optional[dict] = None) type[source]

Returns RolloutWorker class as a @ray.remote using given options.

The returned class can then be used to instantiate ray actors.

Parameters
  • num_cpus – The number of CPUs to allocate for the remote actor.

  • num_gpus – The number of GPUs to allocate for the remote actor. This could be a fraction as well.

  • memory – The heap memory request for the remote actor.

  • object_store_memory – The object store memory for the remote actor.

  • resources – The default custom resources to allocate for the remote actor.

Returns

The @ray.remote decorated RolloutWorker class.

__init__(*, env_creator: Callable[[ray.rllib.env.env_context.EnvContext], Any], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, policy_spec: Optional[Union[type, Dict[str, ray.rllib.policy.policy.PolicySpec]]] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Optional[List[str]] = None, tf_session_creator: Optional[Callable[[], tf1.Session]] = None, rollout_fragment_length: int = 100, count_steps_by: str = 'env_steps', batch_mode: str = 'truncate_episodes', episode_horizon: Optional[int] = None, preprocessor_pref: str = 'deepmind', sample_async: bool = False, compress_observations: bool = False, num_envs: int = 1, observation_fn: Optional[ObservationFunction] = None, observation_filter: str = 'NoFilter', clip_rewards: Optional[Union[bool, float]] = None, normalize_actions: bool = True, clip_actions: bool = False, env_config: Optional[dict] = None, model_config: Optional[dict] = None, policy_config: Optional[dict] = None, worker_index: int = 0, num_workers: int = 0, record_env: Union[bool, str] = False, log_dir: Optional[str] = None, log_level: Optional[str] = None, callbacks: Type[DefaultCallbacks] = None, input_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.input_reader.InputReader] = <function RolloutWorker.<lambda>>, input_evaluation: List[str] = frozenset({}), output_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.output_writer.OutputWriter] = <function RolloutWorker.<lambda>>, remote_worker_envs: bool = False, remote_env_batch_wait_ms: int = 0, soft_horizon: bool = False, no_done_at_end: bool = False, seed: int = None, extra_python_environs: Optional[dict] = None, fake_sampler: bool = False, spaces: Optional[Dict[str, Tuple[<Mock name='mock.Space' id='139993908747600'>, <Mock name='mock.Space' id='139993908747600'>]]] = None, policy=None, monitor_path=None)[source]

Initializes a RolloutWorker instance.

Parameters
  • env_creator – Function that returns a gym.Env given an EnvContext wrapped configuration.

  • validate_env – Optional callable to validate the generated environment (only on worker=0).

  • policy_spec – The MultiAgentPolicyConfigDict mapping policy IDs (str) to PolicySpec’s or a single policy class to use. If a dict is specified, then we are in multi-agent mode and a policy_mapping_fn can also be set (if not, will map all agents to DEFAULT_POLICY_ID).

  • policy_mapping_fn – A callable that maps agent ids to policy ids in multi-agent mode. This function will be called each time a new agent appears in an episode, to bind that agent to a policy for the duration of the episode. If not provided, will map all agents to DEFAULT_POLICY_ID.

  • policies_to_train – Optional list of policies to train, or None for all policies.

  • tf_session_creator – A function that returns a TF session. This is optional and only useful with TFPolicy.

  • rollout_fragment_length – The target number of steps (maesured in count_steps_by) to include in each sample batch returned from this worker.

  • count_steps_by – The unit in which to count fragment lengths. One of env_steps or agent_steps.

  • batch_mode – One of the following batch modes: - “truncate_episodes”: Each call to sample() will return a batch of at most rollout_fragment_length * num_envs in size. The batch will be exactly rollout_fragment_length * num_envs in size if postprocessing does not change batch sizes. Episodes may be truncated in order to meet this size requirement. - “complete_episodes”: Each call to sample() will return a batch of at least rollout_fragment_length * num_envs in size. Episodes will not be truncated, but multiple episodes may be packed within one batch to meet the batch size. Note that when num_envs > 1, episode steps will be buffered until the episode completes, and hence batches may contain significant amounts of off-policy data.

  • episode_horizon – Horizon at which to stop episodes (even if the environment itself has not retured a “done” signal).

  • preprocessor_pref – Whether to use RLlib preprocessors (“rllib”) or deepmind (“deepmind”), when applicable.

  • sample_async – Whether to compute samples asynchronously in the background, which improves throughput but can cause samples to be slightly off-policy.

  • compress_observations – If true, compress the observations. They can be decompressed with rllib/utils/compression.

  • num_envs – If more than one, will create multiple envs and vectorize the computation of actions. This has no effect if if the env already implements VectorEnv.

  • observation_fn – Optional multi-agent observation function.

  • observation_filter – Name of observation filter to use.

  • clip_rewards – True for clipping rewards to [-1.0, 1.0] prior to experience postprocessing. None: Clip for Atari only. float: Clip to [-clip_rewards; +clip_rewards].

  • normalize_actions – Whether to normalize actions to the action space’s bounds.

  • clip_actions – Whether to clip action values to the range specified by the policy action space.

  • env_config – Config to pass to the env creator.

  • model_config – Config to use when creating the policy model.

  • policy_config – Config to pass to the policy. In the multi-agent case, this config will be merged with the per-policy configs specified by policy_spec.

  • worker_index – For remote workers, this should be set to a non-zero and unique value. This index is passed to created envs through EnvContext so that envs can be configured per worker.

  • num_workers – For remote workers, how many workers altogether have been created?

  • record_env – Write out episode stats and videos using gym.wrappers.Monitor to this directory if specified. If True, use the default output dir in ~/ray_results/…. If False, do not record anything.

  • log_dir – Directory where logs can be placed.

  • log_level – Set the root log level on creation.

  • callbacks – Custom sub-class of DefaultCallbacks for training/policy/rollout-worker callbacks.

  • input_creator – Function that returns an InputReader object for loading previous generated experiences.

  • input_evaluation – How to evaluate the policy performance. This only makes sense to set when the input is reading offline data. The possible values include: - “is”: the step-wise importance sampling estimator. - “wis”: the weighted step-wise is estimator. - “simulation”: run the environment in the background, but use this data for evaluation only and never for learning.

  • output_creator – Function that returns an OutputWriter object for saving generated experiences.

  • remote_worker_envs – If using num_envs_per_worker > 1, whether to create those new envs in remote processes instead of in the current process. This adds overheads, but can make sense if your envs are expensive to step/reset (e.g., for StarCraft). Use this cautiously, overheads are significant!

  • remote_env_batch_wait_ms – Timeout that remote workers are waiting when polling environments. 0 (continue when at least one env is ready) is a reasonable default, but optimal value could be obtained by measuring your environment step / reset and model inference perf.

  • soft_horizon – Calculate rewards but don’t reset the environment when the horizon is hit.

  • no_done_at_end – Ignore the done=True at the end of the episode and instead record done=False.

  • seed – Set the seed of both np and tf to this value to to ensure each remote worker has unique exploration behavior.

  • extra_python_environs – Extra python environments need to be set.

  • fake_sampler – Use a fake (inf speed) sampler for testing.

  • spaces – An optional space dict mapping policy IDs to (obs_space, action_space)-tuples. This is used in case no Env is created on this RolloutWorker.

  • policy – Obsoleted arg. Use policy_spec instead.

  • monitor_path – Obsoleted arg. Use record_env instead.

sample() Union[SampleBatch, MultiAgentBatch][source]

Returns a batch of experience sampled from this worker.

This method must be implemented by subclasses.

Returns

A columnar batch of experiences (e.g., tensors).

Examples

>>> print(worker.sample())
SampleBatch({"obs": [1, 2, 3], "action": [0, 1, 0], ...})
sample_with_count() Tuple[Union[SampleBatch, MultiAgentBatch], int][source]

Same as sample() but returns the count as a separate value.

Returns

A columnar batch of experiences (e.g., tensors) and the

size of the collected batch.

Examples

>>> print(worker.sample_with_count())
(SampleBatch({"obs": [1, 2, 3], "action": [0, 1, 0], ...}), 3)
learn_on_batch(samples: Union[SampleBatch, MultiAgentBatch]) Dict[source]

Update policies based on the given batch.

This is the equivalent to apply_gradients(compute_gradients(samples)), but can be optimized to avoid pulling gradients into CPU memory.

Parameters

samples – The SampleBatch or MultiAgentBatch to learn on.

Returns

Dictionary of extra metadata from compute_gradients().

Examples

>>> batch = worker.sample()
>>> info = worker.learn_on_batch(samples)
sample_and_learn(expected_batch_size: int, num_sgd_iter: int, sgd_minibatch_size: str, standardize_fields: List[str]) Tuple[dict, int][source]

Sample and batch and learn on it.

This is typically used in combination with distributed allreduce.

Parameters
  • expected_batch_size – Expected number of samples to learn on.

  • num_sgd_iter – Number of SGD iterations.

  • sgd_minibatch_size – SGD minibatch size.

  • standardize_fields – List of sample fields to normalize.

Returns

A tuple consisting of a dictionary of extra metadata returned from

the policies’ learn_on_batch() and the number of samples learned on.

compute_gradients(samples: Union[SampleBatch, MultiAgentBatch]) Tuple[Union[List[Tuple[Any, Any]], List[Any]], dict][source]

Returns a gradient computed w.r.t the specified samples.

Uses the Policy’s/ies’ compute_gradients method(s) to perform the calculations.

Parameters

samples – The SampleBatch or MultiAgentBatch to compute gradients for using this worker’s policies.

Returns

In the single-agent case, a tuple consisting of ModelGradients and info dict of the worker’s policy. In the multi-agent case, a tuple consisting of a dict mapping PolicyID to ModelGradients and a dict mapping PolicyID to extra metadata info. Note that the first return value (grads) can be applied as is to a compatible worker using the worker’s apply_gradients() method.

Examples

>>> batch = worker.sample()
>>> grads, info = worker.compute_gradients(samples)
apply_gradients(grads: Union[List[Tuple[Any, Any]], List[Any], Dict[str, Union[List[Tuple[Any, Any]], List[Any]]]]) None[source]

Applies the given gradients to this worker’s models.

Uses the Policy’s/ies’ apply_gradients method(s) to perform the operations.

Parameters

grads – Single ModelGradients (single-agent case) or a dict mapping PolicyIDs to the respective model gradients structs.

Examples

>>> samples = worker.sample()
>>> grads, info = worker.compute_gradients(samples)
>>> worker.apply_gradients(grads)
get_metrics() List[Union[ray.rllib.evaluation.metrics.RolloutMetrics, ray.rllib.offline.off_policy_estimator.OffPolicyEstimate]][source]

Returns the thus-far collected metrics from this worker’s rollouts.

Returns

List of RolloutMetrics and/or OffPolicyEstimate objects collected thus-far.

foreach_env(func: Callable[[Any], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T][source]

Calls the given function with each sub-environment as arg.

Parameters

func – The function to call for each underlying sub-environment (as only arg).

Returns

The list of return values of all calls to func([env]).

foreach_env_with_context(func: Callable[[Any, ray.rllib.env.env_context.EnvContext], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T][source]

Calls given function with each sub-env plus env_ctx as args.

Parameters

func – The function to call for each underlying sub-environment and its EnvContext (as the args).

Returns

The list of return values of all calls to func([env, ctx]).

get_policy(policy_id: str = 'default_policy') Optional[ray.rllib.policy.policy.Policy][source]

Return policy for the specified id, or None.

Parameters

policy_id – ID of the policy to return. None for DEFAULT_POLICY_ID (in the single agent case).

Returns

The policy under the given ID (or None if not found).

add_policy(*, policy_id: str, policy_cls: Type[ray.rllib.policy.policy.Policy], observation_space: Optional[<Mock name='mock.Space' id='139993908747600'>] = None, action_space: Optional[<Mock name='mock.Space' id='139993908747600'>] = None, config: Optional[dict] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Optional[List[str]] = None) ray.rllib.policy.policy.Policy[source]

Adds a new policy to this RolloutWorker.

Parameters
  • policy_id – ID of the policy to add.

  • policy_cls – The Policy class to use for constructing the new Policy.

  • observation_space – The observation space of the policy to add.

  • action_space – The action space of the policy to add.

  • config – The config overrides for the policy to add.

  • policy_config – The base config of the Trainer object owning this RolloutWorker.

  • policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.

  • policies_to_train – An optional list of policy IDs to be trained. If None, will keep the existing list in place. Policies, whose IDs are not in the list will not be updated.

Returns

The newly added policy.

remove_policy(*, policy_id: str = 'default_policy', policy_mapping_fn: Optional[Callable[[Any], str]] = None, policies_to_train: Optional[List[str]] = None) None[source]

Removes a policy from this RolloutWorker.

Parameters
  • policy_id – ID of the policy to be removed. None for DEFAULT_POLICY_ID.

  • policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.

  • policies_to_train – An optional list of policy IDs to be trained. If None, will keep the existing list in place. Policies, whose IDs are not in the list will not be updated.

set_policy_mapping_fn(policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None) None[source]

Sets self.policy_mapping_fn to a new callable (if provided).

Parameters

policy_mapping_fn – The new mapping function to use. If None, will keep the existing mapping function in place.

set_policies_to_train(policies_to_train: Optional[List[str]] = None) None[source]

Sets self.policies_to_train to a new list of PolicyIDs.

Parameters

policies_to_train – The new list of policy IDs to train with. If None, will keep the existing list in place.

for_policy(func: Callable[[ray.rllib.policy.policy.Policy, Optional[Any]], ray.rllib.utils.typing.T], policy_id: Optional[str] = 'default_policy', **kwargs) ray.rllib.utils.typing.T[source]

Calls the given function with the specified policy as first arg.

Parameters
  • func – The function to call with the policy as first arg.

  • policy_id – The PolicyID of the policy to call the function with.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The return value of the function call.

foreach_policy(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T][source]

Calls the given function with each (policy, policy_id) tuple.

Parameters

func – The function to call with each (policy, policy ID) tuple.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The list of return values of all calls to

func([policy, pid, **kwargs]).

foreach_trainable_policy(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T][source]

Calls the given function with each (policy, policy_id) tuple.

Only those policies/IDs will be called on, which can be found in self.policies_to_train.

Parameters

func – The function to call with each (policy, policy ID) tuple, for only those policies that are in self.policies_to_train.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The list of return values of all calls to func([policy, pid, **kwargs]).

sync_filters(new_filters: dict) None[source]

Changes self’s filter to given and rebases any accumulated delta.

Parameters

new_filters – Filters with new state to update local copy.

get_filters(flush_after: bool = False) Dict[source]

Returns a snapshot of filters.

Parameters

flush_after – Clears the filter buffer state.

Returns

Dict for serializable filters

save() bytes[source]

Serializes this RolloutWorker’s current state and returns it.

Returns

The current state of this RolloutWorker as a serialized, pickled byte sequence.

restore(objs: bytes) None[source]

Restores this RolloutWorker’s state from a sequence of bytes.

Parameters

objs – The byte sequence to restore this worker’s state from.

Examples

>>> state = worker.save()
>>> new_worker = RolloutWorker(...)
>>> new_worker.restore(state)
get_weights(policies: Optional[List[str]] = None) Dict[str, dict][source]

Returns each policies’ model weights of this worker.

Parameters

policies – List of PolicyIDs to get the weights from. Use None for all policies.

Returns

Dict mapping PolicyIDs to ModelWeights.

Examples

>>> weights = worker.get_weights()
>>> print(weights)
{"default_policy": {"layer1": array(...), "layer2": ...}}
set_weights(weights: Dict[str, dict], global_vars: Optional[Dict] = None) None[source]

Sets each policies’ model weights of this worker.

Parameters
  • weights – Dict mapping PolicyIDs to the new weights to be used.

  • global_vars – An optional global vars dict to set this worker to. If None, do not update the global_vars.

Examples

>>> weights = worker.get_weights()
>>> # Set `global_vars` (timestep) as well.
>>> worker.set_weights(weights, {"timestep": 42})
get_global_vars() dict[source]

Returns the current global_vars dict of this worker.

Returns

The current global_vars dict of this worker.

Examples

>>> global_vars = worker.get_global_vars()
>>> print(global_vars)
{"timestep": 424242}
set_global_vars(global_vars: dict) None[source]

Updates this worker’s and all its policies’ global vars.

Parameters

global_vars – The new global_vars dict.

Examples

>>> global_vars = worker.set_global_vars({"timestep": 4242})
stop() None[source]

Releases all resources used by this RolloutWorker.

apply(func: Callable[[ray.rllib.evaluation.rollout_worker.RolloutWorker, Optional[Any], Optional[Any]], ray.rllib.utils.typing.T], *args, **kwargs) ray.rllib.utils.typing.T[source]

Calls the given function with this rollout worker instance.

Useful for when the RolloutWorker class has been converted into a ActorHandle and the user needs to execute some functionality (e.g. add a property) on the underlying policy object.

Parameters
  • func – The function to call, with this RolloutWorker as first argument, followed by args, and kwargs.

  • args – Optional additional args to pass to the function call.

  • kwargs – Optional additional kwargs to pass to the function call.

Returns

The return value of the function call.

setup_torch_data_parallel(url: str, world_rank: int, world_size: int, backend: str) None[source]

Join a torch process group for distributed SGD.

creation_args() dict[source]

Returns the kwargs dict used to create this worker.

get_host() str[source]

Returns the hostname of the process running this evaluator.

get_node_ip() str[source]

Returns the IP address of the node that this worker runs on.

find_free_port() int[source]

Finds a free port on the node that this worker runs on.