RolloutWorker

RolloutWorkers are used as @ray.remote actors to collect and return samples from environments or offline files in parallel. An RLlib Trainer usually has num_workers RolloutWorker (not @ray.remote) in its WorkerSet under self.workers.

Depending on its evaluation config settings, an additional WorkerSet with Trainer under self.evaluation_workers.

class ray.rllib.evaluation.rollout_worker.RolloutWorker(*, env_creator: Callable[[EnvContext], Optional[Any]], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, policy_spec: Optional[Union[type, Dict[str, ray.rllib.policy.policy.PolicySpec]]] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]] = None, tf_session_creator: Optional[Callable[[], tf1.Session]] = None, rollout_fragment_length: int = 100, count_steps_by: str = 'env_steps', batch_mode: str = 'truncate_episodes', episode_horizon: Optional[int] = None, preprocessor_pref: str = 'deepmind', sample_async: bool = False, compress_observations: bool = False, num_envs: int = 1, observation_fn: Optional[ObservationFunction] = None, observation_filter: str = 'NoFilter', clip_rewards: Optional[Union[bool, float]] = None, normalize_actions: bool = True, clip_actions: bool = False, env_config: Optional[dict] = None, model_config: Optional[dict] = None, policy_config: Optional[dict] = None, worker_index: int = 0, num_workers: int = 0, recreated_worker: bool = False, record_env: Union[bool, str] = False, log_dir: Optional[str] = None, log_level: Optional[str] = None, callbacks: Type[DefaultCallbacks] = None, input_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.input_reader.InputReader] = <function RolloutWorker.<lambda>>, input_evaluation: List[str] = frozenset({}), output_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.output_writer.OutputWriter] = <function RolloutWorker.<lambda>>, remote_worker_envs: bool = False, remote_env_batch_wait_ms: int = 0, soft_horizon: bool = False, no_done_at_end: bool = False, seed: int = None, extra_python_environs: Optional[dict] = None, fake_sampler: bool = False, spaces: Optional[Dict[str, Tuple[<MagicMock name='mock.Space' id='140320602373072'>, <MagicMock name='mock.Space' id='140320602373072'>]]] = None, policy=None, monitor_path=None, disable_env_checking=False)[source]

Common experience collection class.

This class wraps a policy instance and an environment class to collect experiences from the environment. You can create many replicas of this class as Ray actors to scale RL training.

This class supports vectorized and multi-agent policy evaluation (e.g., VectorEnv, MultiAgentEnv, etc.)

Examples

>>> # Create a rollout worker and using it to collect experiences.
>>> import gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v0"), 
...   policy_spec=PGTFPolicy) 
>>> print(worker.sample()) 
SampleBatch({
    "obs": [[...]], "actions": [[...]], "rewards": [[...]],
    "dones": [[...]], "new_obs": [[...]]})
>>> # Creating a multi-agent rollout worker
>>> from gym.spaces import Discrete, Box
>>> import random
>>> MultiAgentTrafficGrid = ... 
>>> worker = RolloutWorker( 
...   env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25),
...   policy_spec={ 
...       # Use an ensemble of two policies for car agents
...       "car_policy1": 
...         (PGTFPolicy, Box(...), Discrete(...), {"gamma": 0.99}),
...       "car_policy2": 
...         (PGTFPolicy, Box(...), Discrete(...), {"gamma": 0.95}),
...       # Use a single shared policy for all traffic lights
...       "traffic_light_policy":
...         (PGTFPolicy, Box(...), Discrete(...), {}),
...   },
...   policy_mapping_fn=lambda agent_id, episode, **kwargs:
...     random.choice(["car_policy1", "car_policy2"])
...     if agent_id.startswith("car_") else "traffic_light_policy")
>>> print(worker.sample()) 
MultiAgentBatch({
    "car_policy1": SampleBatch(...),
    "car_policy2": SampleBatch(...),
    "traffic_light_policy": SampleBatch(...)})
classmethod as_remote(num_cpus: Optional[int] = None, num_gpus: Optional[Union[int, float]] = None, memory: Optional[int] = None, object_store_memory: Optional[int] = None, resources: Optional[dict] = None) type[source]

Returns RolloutWorker class as a @ray.remote using given options.

The returned class can then be used to instantiate ray actors.

Parameters
  • num_cpus – The number of CPUs to allocate for the remote actor.

  • num_gpus – The number of GPUs to allocate for the remote actor. This could be a fraction as well.

  • memory – The heap memory request for the remote actor.

  • object_store_memory – The object store memory for the remote actor.

  • resources – The default custom resources to allocate for the remote actor.

Returns

The @ray.remote decorated RolloutWorker class.

__init__(*, env_creator: Callable[[EnvContext], Optional[Any]], validate_env: Optional[Callable[[Any, ray.rllib.env.env_context.EnvContext], None]] = None, policy_spec: Optional[Union[type, Dict[str, ray.rllib.policy.policy.PolicySpec]]] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]] = None, tf_session_creator: Optional[Callable[[], tf1.Session]] = None, rollout_fragment_length: int = 100, count_steps_by: str = 'env_steps', batch_mode: str = 'truncate_episodes', episode_horizon: Optional[int] = None, preprocessor_pref: str = 'deepmind', sample_async: bool = False, compress_observations: bool = False, num_envs: int = 1, observation_fn: Optional[ObservationFunction] = None, observation_filter: str = 'NoFilter', clip_rewards: Optional[Union[bool, float]] = None, normalize_actions: bool = True, clip_actions: bool = False, env_config: Optional[dict] = None, model_config: Optional[dict] = None, policy_config: Optional[dict] = None, worker_index: int = 0, num_workers: int = 0, recreated_worker: bool = False, record_env: Union[bool, str] = False, log_dir: Optional[str] = None, log_level: Optional[str] = None, callbacks: Type[DefaultCallbacks] = None, input_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.input_reader.InputReader] = <function RolloutWorker.<lambda>>, input_evaluation: List[str] = frozenset({}), output_creator: Callable[[ray.rllib.offline.io_context.IOContext], ray.rllib.offline.output_writer.OutputWriter] = <function RolloutWorker.<lambda>>, remote_worker_envs: bool = False, remote_env_batch_wait_ms: int = 0, soft_horizon: bool = False, no_done_at_end: bool = False, seed: int = None, extra_python_environs: Optional[dict] = None, fake_sampler: bool = False, spaces: Optional[Dict[str, Tuple[<MagicMock name='mock.Space' id='140320602373072'>, <MagicMock name='mock.Space' id='140320602373072'>]]] = None, policy=None, monitor_path=None, disable_env_checking=False)[source]

Initializes a RolloutWorker instance.

Parameters
  • env_creator – Function that returns a gym.Env given an EnvContext wrapped configuration.

  • validate_env – Optional callable to validate the generated environment (only on worker=0).

  • policy_spec – The MultiAgentPolicyConfigDict mapping policy IDs (str) to PolicySpec’s or a single policy class to use. If a dict is specified, then we are in multi-agent mode and a policy_mapping_fn can also be set (if not, will map all agents to DEFAULT_POLICY_ID).

  • policy_mapping_fn – A callable that maps agent ids to policy ids in multi-agent mode. This function will be called each time a new agent appears in an episode, to bind that agent to a policy for the duration of the episode. If not provided, will map all agents to DEFAULT_POLICY_ID.

  • policies_to_train – Optional container of policies to train (None for all policies), or a callable taking PolicyID and SampleBatchType and returning a bool (trainable or not?).

  • tf_session_creator – A function that returns a TF session. This is optional and only useful with TFPolicy.

  • rollout_fragment_length – The target number of steps (maesured in count_steps_by) to include in each sample batch returned from this worker.

  • count_steps_by – The unit in which to count fragment lengths. One of env_steps or agent_steps.

  • batch_mode – One of the following batch modes: - “truncate_episodes”: Each call to sample() will return a batch of at most rollout_fragment_length * num_envs in size. The batch will be exactly rollout_fragment_length * num_envs in size if postprocessing does not change batch sizes. Episodes may be truncated in order to meet this size requirement. - “complete_episodes”: Each call to sample() will return a batch of at least rollout_fragment_length * num_envs in size. Episodes will not be truncated, but multiple episodes may be packed within one batch to meet the batch size. Note that when num_envs > 1, episode steps will be buffered until the episode completes, and hence batches may contain significant amounts of off-policy data.

  • episode_horizon – Horizon at which to stop episodes (even if the environment itself has not retured a “done” signal).

  • preprocessor_pref – Whether to use RLlib preprocessors (“rllib”) or deepmind (“deepmind”), when applicable.

  • sample_async – Whether to compute samples asynchronously in the background, which improves throughput but can cause samples to be slightly off-policy.

  • compress_observations – If true, compress the observations. They can be decompressed with rllib/utils/compression.

  • num_envs – If more than one, will create multiple envs and vectorize the computation of actions. This has no effect if if the env already implements VectorEnv.

  • observation_fn – Optional multi-agent observation function.

  • observation_filter – Name of observation filter to use.

  • clip_rewards – True for clipping rewards to [-1.0, 1.0] prior to experience postprocessing. None: Clip for Atari only. float: Clip to [-clip_rewards; +clip_rewards].

  • normalize_actions – Whether to normalize actions to the action space’s bounds.

  • clip_actions – Whether to clip action values to the range specified by the policy action space.

  • env_config – Config to pass to the env creator.

  • model_config – Config to use when creating the policy model.

  • policy_config – Config to pass to the policy. In the multi-agent case, this config will be merged with the per-policy configs specified by policy_spec.

  • worker_index – For remote workers, this should be set to a non-zero and unique value. This index is passed to created envs through EnvContext so that envs can be configured per worker.

  • num_workers – For remote workers, how many workers altogether have been created?

  • recreated_worker – Whether this worker is a recreated one. Workers are recreated by a Trainer (via WorkerSet) in case recreate_failed_workers=True and one of the original workers (or an already recreated one) has failed. They don’t differ from original workers other than the value of this flag (self.recreated_worker).

  • record_env – Write out episode stats and videos using gym.wrappers.Monitor to this directory if specified. If True, use the default output dir in ~/ray_results/…. If False, do not record anything.

  • log_dir – Directory where logs can be placed.

  • log_level – Set the root log level on creation.

  • callbacks – Custom sub-class of DefaultCallbacks for training/policy/rollout-worker callbacks.

  • input_creator – Function that returns an InputReader object for loading previous generated experiences.

  • input_evaluation – How to evaluate the policy performance. This only makes sense to set when the input is reading offline data. The possible values include: - “is”: the step-wise importance sampling estimator. - “wis”: the weighted step-wise is estimator. - “simulation”: run the environment in the background, but use this data for evaluation only and never for learning.

  • output_creator – Function that returns an OutputWriter object for saving generated experiences.

  • remote_worker_envs – If using num_envs_per_worker > 1, whether to create those new envs in remote processes instead of in the current process. This adds overheads, but can make sense if your envs are expensive to step/reset (e.g., for StarCraft). Use this cautiously, overheads are significant!

  • remote_env_batch_wait_ms – Timeout that remote workers are waiting when polling environments. 0 (continue when at least one env is ready) is a reasonable default, but optimal value could be obtained by measuring your environment step / reset and model inference perf.

  • soft_horizon – Calculate rewards but don’t reset the environment when the horizon is hit.

  • no_done_at_end – Ignore the done=True at the end of the episode and instead record done=False.

  • seed – Set the seed of both np and tf to this value to to ensure each remote worker has unique exploration behavior.

  • extra_python_environs – Extra python environments need to be set.

  • fake_sampler – Use a fake (inf speed) sampler for testing.

  • spaces – An optional space dict mapping policy IDs to (obs_space, action_space)-tuples. This is used in case no Env is created on this RolloutWorker.

  • policy – Obsoleted arg. Use policy_spec instead.

  • monitor_path – Obsoleted arg. Use record_env instead.

  • disable_env_checking – If True, disables the env checking module that validates the properties of the passed environment.

sample() Union[SampleBatch, MultiAgentBatch][source]

Returns a batch of experience sampled from this worker.

This method must be implemented by subclasses.

Returns

A columnar batch of experiences (e.g., tensors).

Examples

>>> import gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v0"), 
...   policy_spec=PGTFPolicy) 
>>> print(worker.sample()) 
SampleBatch({"obs": [...], "action": [...], ...})
sample_with_count() Tuple[Union[SampleBatch, MultiAgentBatch], int][source]

Same as sample() but returns the count as a separate value.

Returns

A columnar batch of experiences (e.g., tensors) and the

size of the collected batch.

Examples

>>> import gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v0"), 
...   policy_spec=PGTFPolicy) 
>>> print(worker.sample_with_count()) 
(SampleBatch({"obs": [...], "action": [...], ...}), 3)
learn_on_batch(samples: Union[SampleBatch, MultiAgentBatch]) Dict[source]

Update policies based on the given batch.

This is the equivalent to apply_gradients(compute_gradients(samples)), but can be optimized to avoid pulling gradients into CPU memory.

Parameters

samples – The SampleBatch or MultiAgentBatch to learn on.

Returns

Dictionary of extra metadata from compute_gradients().

Examples

>>> import gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v0"), 
...   policy_spec=PGTFPolicy) 
>>> batch = worker.sample() 
>>> info = worker.learn_on_batch(samples) 
sample_and_learn(expected_batch_size: int, num_sgd_iter: int, sgd_minibatch_size: str, standardize_fields: List[str]) Tuple[dict, int][source]

Sample and batch and learn on it.

This is typically used in combination with distributed allreduce.

Parameters
  • expected_batch_size – Expected number of samples to learn on.

  • num_sgd_iter – Number of SGD iterations.

  • sgd_minibatch_size – SGD minibatch size.

  • standardize_fields – List of sample fields to normalize.

Returns

A tuple consisting of a dictionary of extra metadata returned from

the policies’ learn_on_batch() and the number of samples learned on.

compute_gradients(samples: Union[SampleBatch, MultiAgentBatch], single_agent: bool = None) Tuple[Union[List[Tuple[Any, Any]], List[Any]], dict][source]

Returns a gradient computed w.r.t the specified samples.

Uses the Policy’s/ies’ compute_gradients method(s) to perform the calculations. Skips policies that are not trainable as per self.is_policy_to_train().

Parameters

samples – The SampleBatch or MultiAgentBatch to compute gradients for using this worker’s trainable policies.

Returns

In the single-agent case, a tuple consisting of ModelGradients and info dict of the worker’s policy. In the multi-agent case, a tuple consisting of a dict mapping PolicyID to ModelGradients and a dict mapping PolicyID to extra metadata info. Note that the first return value (grads) can be applied as is to a compatible worker using the worker’s apply_gradients() method.

Examples

>>> import gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v0"), 
...   policy_spec=PGTFPolicy) 
>>> batch = worker.sample() 
>>> grads, info = worker.compute_gradients(samples) 
apply_gradients(grads: Union[List[Tuple[Any, Any]], List[Any], Dict[str, Union[List[Tuple[Any, Any]], List[Any]]]]) None[source]

Applies the given gradients to this worker’s models.

Uses the Policy’s/ies’ apply_gradients method(s) to perform the operations.

Parameters

grads – Single ModelGradients (single-agent case) or a dict mapping PolicyIDs to the respective model gradients structs.

Examples

>>> import gym
>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy
>>> worker = RolloutWorker( 
...   env_creator=lambda _: gym.make("CartPole-v0"), 
...   policy_spec=PGTFPolicy) 
>>> samples = worker.sample() 
>>> grads, info = worker.compute_gradients(samples) 
>>> worker.apply_gradients(grads) 
get_metrics() List[Union[ray.rllib.evaluation.metrics.RolloutMetrics, ray.rllib.offline.off_policy_estimator.OffPolicyEstimate]][source]

Returns the thus-far collected metrics from this worker’s rollouts.

Returns

List of RolloutMetrics and/or OffPolicyEstimate objects collected thus-far.

foreach_env(func: Callable[[Any], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T][source]

Calls the given function with each sub-environment as arg.

Parameters

func – The function to call for each underlying sub-environment (as only arg).

Returns

The list of return values of all calls to func([env]).

foreach_env_with_context(func: Callable[[Any, ray.rllib.env.env_context.EnvContext], ray.rllib.utils.typing.T]) List[ray.rllib.utils.typing.T][source]

Calls given function with each sub-env plus env_ctx as args.

Parameters

func – The function to call for each underlying sub-environment and its EnvContext (as the args).

Returns

The list of return values of all calls to func([env, ctx]).

get_policy(policy_id: str = 'default_policy') Optional[ray.rllib.policy.policy.Policy][source]

Return policy for the specified id, or None.

Parameters

policy_id – ID of the policy to return. None for DEFAULT_POLICY_ID (in the single agent case).

Returns

The policy under the given ID (or None if not found).

add_policy(*, policy_id: str, policy_cls: Type[ray.rllib.policy.policy.Policy], observation_space: Optional[<MagicMock name='mock.Space' id='140320602373072'>] = None, action_space: Optional[<MagicMock name='mock.Space' id='140320602373072'>] = None, config: Optional[dict] = None, policy_state: Optional[Dict[str, Union[Any, dict, tuple]]] = None, policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]]] = None) ray.rllib.policy.policy.Policy[source]

Adds a new policy to this RolloutWorker.

Parameters
  • policy_id – ID of the policy to add.

  • policy_cls – The Policy class to use for constructing the new Policy.

  • observation_space – The observation space of the policy to add.

  • action_space – The action space of the policy to add.

  • config – The config overrides for the policy to add.

  • policy_state – Optional state dict to apply to the new policy instance, right after its construction.

  • policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.

  • policies_to_train – An optional container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.

Returns

The newly added policy.

Raises

KeyError – If the given policy_id already exists in this worker’s PolicyMap.

remove_policy(*, policy_id: str = 'default_policy', policy_mapping_fn: Optional[Callable[[Any], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Union[SampleBatch, MultiAgentBatch]], bool]]] = None) None[source]

Removes a policy from this RolloutWorker.

Parameters
  • policy_id – ID of the policy to be removed. None for DEFAULT_POLICY_ID.

  • policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.

  • policies_to_train – An optional container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.

set_policy_mapping_fn(policy_mapping_fn: Optional[Callable[[Any, Episode], str]] = None) None[source]

Sets self.policy_mapping_fn to a new callable (if provided).

Parameters

policy_mapping_fn – The new mapping function to use. If None, will keep the existing mapping function in place.

set_is_policy_to_train(is_policy_to_train: Union[Container[str], Callable[[str, Optional[Union[SampleBatch, MultiAgentBatch]]], bool]]) None[source]

Sets self.is_policy_to_train() to a new callable.

Parameters

is_policy_to_train – A container of policy IDs to be trained or a callable taking PolicyID and - optionally - SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.

get_policies_to_train(batch: Optional[Union[SampleBatch, MultiAgentBatch]] = None) Set[str][source]

Returns all policies-to-train, given an optional batch.

Loops through all policies currently in self.policy_map and checks the return value of self.is_policy_to_train(pid, batch).

Parameters

batch – An optional SampleBatchType for the self.is_policy_to_train(pid, [batch]?) check.

Returns

The set of currently trainable policy IDs, given the optional batch.

for_policy(func: Callable[[ray.rllib.policy.policy.Policy, Optional[Any]], ray.rllib.utils.typing.T], policy_id: Optional[str] = 'default_policy', **kwargs) ray.rllib.utils.typing.T[source]

Calls the given function with the specified policy as first arg.

Parameters
  • func – The function to call with the policy as first arg.

  • policy_id – The PolicyID of the policy to call the function with.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The return value of the function call.

foreach_policy(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T][source]

Calls the given function with each (policy, policy_id) tuple.

Parameters

func – The function to call with each (policy, policy ID) tuple.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The list of return values of all calls to

func([policy, pid, **kwargs]).

foreach_policy_to_train(func: Callable[[ray.rllib.policy.policy.Policy, str, Optional[Any]], ray.rllib.utils.typing.T], **kwargs) List[ray.rllib.utils.typing.T][source]

Calls the given function with each (policy, policy_id) tuple.

Only those policies/IDs will be called on, for which self.is_policy_to_train() returns True.

Parameters

func – The function to call with each (policy, policy ID) tuple, for only those policies that self.is_policy_to_train returns True.

Keyword Arguments

kwargs – Additional kwargs to be passed to the call.

Returns

The list of return values of all calls to func([policy, pid, **kwargs]).

sync_filters(new_filters: dict) None[source]

Changes self’s filter to given and rebases any accumulated delta.

Parameters

new_filters – Filters with new state to update local copy.

get_filters(flush_after: bool = False) Dict[source]

Returns a snapshot of filters.

Parameters

flush_after – Clears the filter buffer state.

Returns

Dict for serializable filters

save() bytes[source]

Serializes this RolloutWorker’s current state and returns it.

Returns

The current state of this RolloutWorker as a serialized, pickled byte sequence.

restore(objs: bytes) None[source]

Restores this RolloutWorker’s state from a sequence of bytes.

Parameters

objs – The byte sequence to restore this worker’s state from.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> state = worker.save() 
>>> new_worker = RolloutWorker(...) 
>>> new_worker.restore(state) 
get_weights(policies: Optional[Container[str]] = None) Dict[str, dict][source]

Returns each policies’ model weights of this worker.

Parameters

policies – List of PolicyIDs to get the weights from. Use None for all policies.

Returns

Dict mapping PolicyIDs to ModelWeights.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> weights = worker.get_weights() 
>>> print(weights) 
{"default_policy": {"layer1": array(...), "layer2": ...}}
set_weights(weights: Dict[str, dict], global_vars: Optional[Dict] = None) None[source]

Sets each policies’ model weights of this worker.

Parameters
  • weights – Dict mapping PolicyIDs to the new weights to be used.

  • global_vars – An optional global vars dict to set this worker to. If None, do not update the global_vars.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> weights = worker.get_weights() 
>>> # Set `global_vars` (timestep) as well.
>>> worker.set_weights(weights, {"timestep": 42}) 
get_global_vars() dict[source]

Returns the current global_vars dict of this worker.

Returns

The current global_vars dict of this worker.

Examples

>>> from ray.rllib.evaluation.rollout_worker import RolloutWorker
>>> # Create a RolloutWorker.
>>> worker = ... 
>>> global_vars = worker.get_global_vars() 
>>> print(global_vars) 
{"timestep": 424242}
set_global_vars(global_vars: dict) None[source]

Updates this worker’s and all its policies’ global vars.

Parameters

global_vars – The new global_vars dict.

Examples

>>> worker = ... 
>>> global_vars = worker.set_global_vars( 
...     {"timestep": 4242})
stop() None[source]

Releases all resources used by this RolloutWorker.

apply(func: Callable[[ray.rllib.evaluation.rollout_worker.RolloutWorker, Optional[Any], Optional[Any]], ray.rllib.utils.typing.T], *args, **kwargs) ray.rllib.utils.typing.T[source]

Calls the given function with this rollout worker instance.

Useful for when the RolloutWorker class has been converted into a ActorHandle and the user needs to execute some functionality (e.g. add a property) on the underlying policy object.

Parameters
  • func – The function to call, with this RolloutWorker as first argument, followed by args, and kwargs.

  • args – Optional additional args to pass to the function call.

  • kwargs – Optional additional kwargs to pass to the function call.

Returns

The return value of the function call.

setup_torch_data_parallel(url: str, world_rank: int, world_size: int, backend: str) None[source]

Join a torch process group for distributed SGD.

creation_args() dict[source]

Returns the kwargs dict used to create this worker.

get_host() str[source]

Returns the hostname of the process running this evaluator.

get_node_ip() str[source]

Returns the IP address of the node that this worker runs on.

find_free_port() int[source]

Finds a free port on the node that this worker runs on.