WorkerSet#

A set of RolloutWorker containing n ray remote workers as well as a single “local” RolloutWorker .

WorkerSet exposes some convenience methods to make calls on its individual workers’ own methods in parallel using e.g. ray.get().

class ray.rllib.evaluation.worker_set.WorkerSet(*, env_creator: Optional[Callable[[EnvContext], Optional[Any]]] = None, validate_env: Optional[Callable[[Any], None]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, config: Optional[AlgorithmConfig] = None, num_workers: int = 0, local_worker: bool = True, logdir: Optional[str] = None, _setup: bool = True, policy_class=- 1, trainer_config=- 1)[source]#

Set of RolloutWorkers with n @ray.remote workers and zero or one local worker.

Where: n >= 0.

__init__(*, env_creator: Optional[Callable[[EnvContext], Optional[Any]]] = None, validate_env: Optional[Callable[[Any], None]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, config: Optional[AlgorithmConfig] = None, num_workers: int = 0, local_worker: bool = True, logdir: Optional[str] = None, _setup: bool = True, policy_class=- 1, trainer_config=- 1)[source]#

Initializes a WorkerSet instance.

Parameters
  • env_creator – Function that returns env given env config.

  • validate_env – Optional callable to validate the generated environment (only on worker=0). This callable should raise an exception if the environment is invalid.

  • default_policy_class – An optional default Policy class to use inside the (multi-agent) policies dict. In case the PolicySpecs in there have no class defined, use this default_policy_class. If None, PolicySpecs will be using the Algorithm’s default Policy class.

  • config – Optional AlgorithmConfig (or config dict).

  • num_workers – Number of remote rollout workers to create.

  • local_worker – Whether to create a local (non @ray.remote) worker in the returned set as well (default: True). If num_workers is 0, always create a local worker.

  • logdir – Optional logging directory for workers.

  • _setup – Whether to setup workers. This is only for testing.

local_worker() ray.rllib.evaluation.rollout_worker.RolloutWorker[source]#

Returns the local rollout worker.

healthy_worker_ids() List[int][source]#

Returns the list of remote worker IDs.

num_remote_workers() int[source]#

Returns the number of remote rollout workers.

num_healthy_remote_workers() int[source]#

Returns the number of healthy workers, including local and remote workers.

num_healthy_workers() int[source]#

Returns the number of healthy workers, including local and remote workers.

num_in_flight_async_reqs() int[source]#

Returns the number of in-flight async requests.

num_remote_worker_restarts() int[source]#

Total number of times managed remote workers get restarted.

sync_weights(policies: Optional[List[str]] = None, from_worker: Optional[ray.rllib.evaluation.rollout_worker.RolloutWorker] = None, to_worker_indices: Optional[List[int]] = None, global_vars: Optional[Dict[str, Union[numpy.array, tf.Tensor, torch.Tensor]]] = None, timeout_seconds: Optional[int] = 0) None[source]#

Syncs model weights from the local worker to all remote workers.

Parameters
  • policies – Optional list of PolicyIDs to sync weights for. If None (default), sync weights to/from all policies.

  • from_worker – Optional local RolloutWorker instance to sync from. If None (default), sync from this WorkerSet’s local worker.

  • to_worker_indices – Optional list of worker indices to sync the weights to. If None (default), sync to all remote workers.

  • global_vars – An optional global vars dict to set this worker to. If None, do not update the global_vars.

  • timeout_seconds – Timeout in seconds to wait for the sync weights calls to complete. Default is 0 (sync-and-forget, do not wait for any sync calls to finish). This significantly improves algorithm performance.

add_policy(policy_id: str, policy_cls: Optional[Type[ray.rllib.policy.policy.Policy]] = None, policy: Optional[ray.rllib.policy.policy.Policy] = None, *, observation_space: Optional[<MagicMock name='mock.spaces.Space' id='140330629676496'>] = None, action_space: Optional[<MagicMock name='mock.spaces.Space' id='140330629676496'>] = None, config: Optional[Union[AlgorithmConfig, dict]] = None, policy_state: Optional[Dict[str, Union[numpy.array, tf.Tensor, torch.Tensor, dict, tuple]]] = None, policy_mapping_fn: Optional[Callable[[Any, int], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Optional[Union[SampleBatch, MultiAgentBatch]]], bool]]] = None, workers: Optional[List[Union[ray.rllib.evaluation.rollout_worker.RolloutWorker, ray.actor.ActorHandle]]] = -1) None[source]#

Adds a policy to this WorkerSet’s workers or a specific list of workers.

Parameters
  • policy_id – ID of the policy to add.

  • policy_cls – The Policy class to use for constructing the new Policy. Note: Only one of policy_cls or policy must be provided.

  • policy – The Policy instance to add to this WorkerSet. If not None, the given Policy object will be directly inserted into the local worker and clones of that Policy will be created on all remote workers. Note: Only one of policy_cls or policy must be provided.

  • observation_space – The observation space of the policy to add. If None, try to infer this space from the environment.

  • action_space – The action space of the policy to add. If None, try to infer this space from the environment.

  • config – The config object or overrides for the policy to add.

  • policy_state – Optional state dict to apply to the new policy instance, right after its construction.

  • policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.

  • policies_to_train – An optional list of policy IDs to be trained or a callable taking PolicyID and SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.

  • workers – A list of RolloutWorker/ActorHandles (remote RolloutWorkers) to add this policy to. If defined, will only add the given policy to these workers.

Raises

KeyError – If the given policy_id already exists in this WorkerSet.

add_workers(num_workers: int, validate: bool = False) None[source]#

Creates and adds a number of remote workers to this worker set.

Can be called several times on the same WorkerSet to add more RolloutWorkers to the set.

Parameters
  • num_workers – The number of remote Workers to add to this WorkerSet.

  • validate – Whether to validate remote workers after their construction process.

Raises
  • RayError – If any of the constructed remote workers is not up and running

  • properly. –

reset(new_remote_workers: List[ray.actor.ActorHandle]) None[source]#

Hard overrides the remote workers in this set with the given one.

Parameters

new_remote_workers – A list of new RolloutWorkers (as ActorHandles) to use as remote workers.

stop() None[source]#

Calls stop on all rollout workers (including the local one).

is_policy_to_train(policy_id: str, batch: Optional[Union[SampleBatch, MultiAgentBatch]] = None) bool[source]#

Whether given PolicyID (optionally inside some batch) is trainable.

foreach_worker(func: Callable[[ray.rllib.evaluation.rollout_worker.RolloutWorker], ray.rllib.evaluation.worker_set.T], *, local_worker=True, healthy_only=False, remote_worker_ids: Optional[List[int]] = None, timeout_seconds: Optional[int] = None, return_obj_refs: bool = False) List[ray.rllib.evaluation.worker_set.T][source]#

Calls the given function with each worker instance as the argument.

Parameters
  • func – The function to call for each worker (as only arg).

  • local_worker – Whether apply func on local worker too. Default is True.

  • healthy_only – Apply func on known active workers only. By default this will apply func on all workers regardless of their states.

  • remote_worker_ids – Apply func on a selected set of remote workers.

  • timeout_seconds – Time to wait for results. Default is None.

  • return_obj_refs – whether to return ObjectRef instead of actual results. Note, for fault tolerance reasons, these returned ObjectRefs should never be resolved with ray.get() outside of this WorkerSet.

Returns

The list of return values of all calls to func([worker]).

foreach_worker_with_id(func: Callable[[int, ray.rllib.evaluation.rollout_worker.RolloutWorker], ray.rllib.evaluation.worker_set.T], *, local_worker=True, healthy_only=False, remote_worker_ids: Optional[List[int]] = None, timeout_seconds: Optional[int] = None) List[ray.rllib.evaluation.worker_set.T][source]#

Similar to foreach_worker(), but calls the function with id of the worker too.

Parameters
  • func – The function to call for each worker (as only arg).

  • local_worker – Whether apply func on local worker too. Default is True.

  • healthy_only – Apply func on known active workers only. By default this will apply func on all workers regardless of their states.

  • remote_worker_ids – Apply func on a selected set of remote workers.

  • timeout_seconds – Time to wait for results. Default is None.

Returns

The list of return values of all calls to func([worker, id]).

foreach_worker_async(func: Callable[[ray.rllib.evaluation.rollout_worker.RolloutWorker], ray.rllib.evaluation.worker_set.T], *, healthy_only=False, remote_worker_ids: Optional[List[int]] = None) int[source]#

Calls the given function asynchronously with each worker as the argument.

foreach_worker_async() does not return results directly. Instead, fetch_ready_async_reqs() can be used to pull results in an async manner whenever they are available.

Parameters
  • func – The function to call for each worker (as only arg).

  • healthy_only – Apply func on known active workers only. By default this will apply func on all workers regardless of their states.

  • remote_worker_ids – Apply func on a selected set of remote workers.

Returns

The number of async requests that are currently in-flight.

fetch_ready_async_reqs(*, timeout_seconds: Optional[int] = 0, return_obj_refs: bool = False) List[Tuple[int, ray.rllib.evaluation.worker_set.T]][source]#

Get esults from outstanding asynchronous requests that are ready.

Parameters

timeout_seconds – Time to wait for results. Default is 0, meaning those requests that are already ready.

Returns

A list of results successfully returned from outstanding remote calls, paired with the indices of the callee workers.

foreach_policy(func: Callable[[ray.rllib.policy.policy.Policy, str], ray.rllib.evaluation.worker_set.T]) List[ray.rllib.evaluation.worker_set.T][source]#

Calls func with each worker’s (policy, PolicyID) tuple.

Note that in the multi-agent case, each worker may have more than one policy.

Parameters

func – A function - taking a Policy and its ID - that is called on all workers’ Policies.

Returns

The list of return values of func over all workers’ policies. The

length of this list is: (num_workers + 1 (local-worker)) * [num policies in the multi-agent config dict]. The local workers’ results are first, followed by all remote workers’ results

foreach_policy_to_train(func: Callable[[ray.rllib.policy.policy.Policy, str], ray.rllib.evaluation.worker_set.T]) List[ray.rllib.evaluation.worker_set.T][source]#

Apply func to all workers’ Policies iff in policies_to_train.

Parameters

func – A function - taking a Policy and its ID - that is called on all workers’ Policies, for which worker.is_policy_to_train() returns True.

Returns

The list of n return values of all

func([trainable policy], [ID])-calls.

Return type

List[any]

foreach_env(func: Callable[[Any], List[ray.rllib.evaluation.worker_set.T]]) List[List[ray.rllib.evaluation.worker_set.T]][source]#

Calls func with all workers’ sub-environments as args.

An “underlying sub environment” is a single clone of an env within a vectorized environment. func takes a single underlying sub environment as arg, e.g. a gym.Env object.

Parameters

func – A function - taking an EnvType (normally a gym.Env object) as arg and returning a list of lists of return values, one value per underlying sub-environment per each worker.

Returns

The list (workers) of lists (sub environments) of results.

foreach_env_with_context(func: Callable[[ray.rllib.env.base_env.BaseEnv, ray.rllib.env.env_context.EnvContext], List[ray.rllib.evaluation.worker_set.T]]) List[List[ray.rllib.evaluation.worker_set.T]][source]#

Calls func with all workers’ sub-environments and env_ctx as args.

An “underlying sub environment” is a single clone of an env within a vectorized environment. func takes a single underlying sub environment and the env_context as args.

Parameters

func – A function - taking a BaseEnv object and an EnvContext as arg - and returning a list of lists of return values over envs of the worker.

Returns

The list (1 item per workers) of lists (1 item per sub-environment)

of results.

probe_unhealthy_workers() List[int][source]#

Checks the unhealth workers, and try restoring their states.

Returns

IDs of the workers that were restored.