WorkerSet
WorkerSet#
A set of RolloutWorker
containing n ray remote workers as well as a single “local”
RolloutWorker
.
WorkerSet
exposes some convenience methods to make calls on its individual
workers’ own methods in parallel using e.g. ray.get()
.
- class ray.rllib.evaluation.worker_set.WorkerSet(*, env_creator: Optional[Callable[[EnvContext], Optional[Any]]] = None, validate_env: Optional[Callable[[Any], None]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, config: Optional[AlgorithmConfig] = None, num_workers: int = 0, local_worker: bool = True, logdir: Optional[str] = None, _setup: bool = True, policy_class=- 1, trainer_config=- 1)[source]#
Set of RolloutWorkers with n @ray.remote workers and zero or one local worker.
Where: n >= 0.
- __init__(*, env_creator: Optional[Callable[[EnvContext], Optional[Any]]] = None, validate_env: Optional[Callable[[Any], None]] = None, default_policy_class: Optional[Type[ray.rllib.policy.policy.Policy]] = None, config: Optional[AlgorithmConfig] = None, num_workers: int = 0, local_worker: bool = True, logdir: Optional[str] = None, _setup: bool = True, policy_class=- 1, trainer_config=- 1)[source]#
Initializes a WorkerSet instance.
- Parameters
env_creator – Function that returns env given env config.
validate_env – Optional callable to validate the generated environment (only on worker=0). This callable should raise an exception if the environment is invalid.
default_policy_class – An optional default Policy class to use inside the (multi-agent)
policies
dict. In case the PolicySpecs in there have no class defined, use thisdefault_policy_class
. If None, PolicySpecs will be using the Algorithm’s default Policy class.config – Optional AlgorithmConfig (or config dict).
num_workers – Number of remote rollout workers to create.
local_worker – Whether to create a local (non @ray.remote) worker in the returned set as well (default: True). If
num_workers
is 0, always create a local worker.logdir – Optional logging directory for workers.
_setup – Whether to setup workers. This is only for testing.
- local_worker() ray.rllib.evaluation.rollout_worker.RolloutWorker [source]#
Returns the local rollout worker.
- num_healthy_remote_workers() int [source]#
Returns the number of healthy workers, including local and remote workers.
- num_healthy_workers() int [source]#
Returns the number of healthy workers, including local and remote workers.
- num_remote_worker_restarts() int [source]#
Total number of times managed remote workers get restarted.
- sync_weights(policies: Optional[List[str]] = None, from_worker: Optional[ray.rllib.evaluation.rollout_worker.RolloutWorker] = None, to_worker_indices: Optional[List[int]] = None, global_vars: Optional[Dict[str, Union[numpy.array, tf.Tensor, torch.Tensor]]] = None, timeout_seconds: Optional[int] = 0) None [source]#
Syncs model weights from the local worker to all remote workers.
- Parameters
policies – Optional list of PolicyIDs to sync weights for. If None (default), sync weights to/from all policies.
from_worker – Optional local RolloutWorker instance to sync from. If None (default), sync from this WorkerSet’s local worker.
to_worker_indices – Optional list of worker indices to sync the weights to. If None (default), sync to all remote workers.
global_vars – An optional global vars dict to set this worker to. If None, do not update the global_vars.
timeout_seconds – Timeout in seconds to wait for the sync weights calls to complete. Default is 0 (sync-and-forget, do not wait for any sync calls to finish). This significantly improves algorithm performance.
- add_policy(policy_id: str, policy_cls: Optional[Type[ray.rllib.policy.policy.Policy]] = None, policy: Optional[ray.rllib.policy.policy.Policy] = None, *, observation_space: Optional[<MagicMock name='mock.spaces.Space' id='140330629676496'>] = None, action_space: Optional[<MagicMock name='mock.spaces.Space' id='140330629676496'>] = None, config: Optional[Union[AlgorithmConfig, dict]] = None, policy_state: Optional[Dict[str, Union[numpy.array, tf.Tensor, torch.Tensor, dict, tuple]]] = None, policy_mapping_fn: Optional[Callable[[Any, int], str]] = None, policies_to_train: Optional[Union[Container[str], Callable[[str, Optional[Union[SampleBatch, MultiAgentBatch]]], bool]]] = None, workers: Optional[List[Union[ray.rllib.evaluation.rollout_worker.RolloutWorker, ray.actor.ActorHandle]]] = -1) None [source]#
Adds a policy to this WorkerSet’s workers or a specific list of workers.
- Parameters
policy_id – ID of the policy to add.
policy_cls – The Policy class to use for constructing the new Policy. Note: Only one of
policy_cls
orpolicy
must be provided.policy – The Policy instance to add to this WorkerSet. If not None, the given Policy object will be directly inserted into the local worker and clones of that Policy will be created on all remote workers. Note: Only one of
policy_cls
orpolicy
must be provided.observation_space – The observation space of the policy to add. If None, try to infer this space from the environment.
action_space – The action space of the policy to add. If None, try to infer this space from the environment.
config – The config object or overrides for the policy to add.
policy_state – Optional state dict to apply to the new policy instance, right after its construction.
policy_mapping_fn – An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till the end of the episode.
policies_to_train – An optional list of policy IDs to be trained or a callable taking PolicyID and SampleBatchType and returning a bool (trainable or not?). If None, will keep the existing setup in place. Policies, whose IDs are not in the list (or for which the callable returns False) will not be updated.
workers – A list of RolloutWorker/ActorHandles (remote RolloutWorkers) to add this policy to. If defined, will only add the given policy to these workers.
- Raises
KeyError – If the given
policy_id
already exists in this WorkerSet.
- add_workers(num_workers: int, validate: bool = False) None [source]#
Creates and adds a number of remote workers to this worker set.
Can be called several times on the same WorkerSet to add more RolloutWorkers to the set.
- Parameters
num_workers – The number of remote Workers to add to this WorkerSet.
validate – Whether to validate remote workers after their construction process.
- Raises
RayError – If any of the constructed remote workers is not up and running
properly. –
- reset(new_remote_workers: List[ray.actor.ActorHandle]) None [source]#
Hard overrides the remote workers in this set with the given one.
- Parameters
new_remote_workers – A list of new RolloutWorkers (as
ActorHandles
) to use as remote workers.
- is_policy_to_train(policy_id: str, batch: Optional[Union[SampleBatch, MultiAgentBatch]] = None) bool [source]#
Whether given PolicyID (optionally inside some batch) is trainable.
- foreach_worker(func: Callable[[ray.rllib.evaluation.rollout_worker.RolloutWorker], ray.rllib.evaluation.worker_set.T], *, local_worker=True, healthy_only=False, remote_worker_ids: Optional[List[int]] = None, timeout_seconds: Optional[int] = None, return_obj_refs: bool = False) List[ray.rllib.evaluation.worker_set.T] [source]#
Calls the given function with each worker instance as the argument.
- Parameters
func – The function to call for each worker (as only arg).
local_worker – Whether apply func on local worker too. Default is True.
healthy_only – Apply func on known active workers only. By default this will apply func on all workers regardless of their states.
remote_worker_ids – Apply func on a selected set of remote workers.
timeout_seconds – Time to wait for results. Default is None.
return_obj_refs – whether to return ObjectRef instead of actual results. Note, for fault tolerance reasons, these returned ObjectRefs should never be resolved with ray.get() outside of this WorkerSet.
- Returns
The list of return values of all calls to
func([worker])
.
- foreach_worker_with_id(func: Callable[[int, ray.rllib.evaluation.rollout_worker.RolloutWorker], ray.rllib.evaluation.worker_set.T], *, local_worker=True, healthy_only=False, remote_worker_ids: Optional[List[int]] = None, timeout_seconds: Optional[int] = None) List[ray.rllib.evaluation.worker_set.T] [source]#
Similar to foreach_worker(), but calls the function with id of the worker too.
- Parameters
func – The function to call for each worker (as only arg).
local_worker – Whether apply func on local worker too. Default is True.
healthy_only – Apply func on known active workers only. By default this will apply func on all workers regardless of their states.
remote_worker_ids – Apply func on a selected set of remote workers.
timeout_seconds – Time to wait for results. Default is None.
- Returns
The list of return values of all calls to
func([worker, id])
.
- foreach_worker_async(func: Callable[[ray.rllib.evaluation.rollout_worker.RolloutWorker], ray.rllib.evaluation.worker_set.T], *, healthy_only=False, remote_worker_ids: Optional[List[int]] = None) int [source]#
Calls the given function asynchronously with each worker as the argument.
foreach_worker_async() does not return results directly. Instead, fetch_ready_async_reqs() can be used to pull results in an async manner whenever they are available.
- Parameters
func – The function to call for each worker (as only arg).
healthy_only – Apply func on known active workers only. By default this will apply func on all workers regardless of their states.
remote_worker_ids – Apply func on a selected set of remote workers.
- Returns
The number of async requests that are currently in-flight.
- fetch_ready_async_reqs(*, timeout_seconds: Optional[int] = 0, return_obj_refs: bool = False) List[Tuple[int, ray.rllib.evaluation.worker_set.T]] [source]#
Get esults from outstanding asynchronous requests that are ready.
- Parameters
timeout_seconds – Time to wait for results. Default is 0, meaning those requests that are already ready.
- Returns
A list of results successfully returned from outstanding remote calls, paired with the indices of the callee workers.
- foreach_policy(func: Callable[[ray.rllib.policy.policy.Policy, str], ray.rllib.evaluation.worker_set.T]) List[ray.rllib.evaluation.worker_set.T] [source]#
Calls
func
with each worker’s (policy, PolicyID) tuple.Note that in the multi-agent case, each worker may have more than one policy.
- Parameters
func – A function - taking a Policy and its ID - that is called on all workers’ Policies.
- Returns
- The list of return values of func over all workers’ policies. The
length of this list is: (num_workers + 1 (local-worker)) * [num policies in the multi-agent config dict]. The local workers’ results are first, followed by all remote workers’ results
- foreach_policy_to_train(func: Callable[[ray.rllib.policy.policy.Policy, str], ray.rllib.evaluation.worker_set.T]) List[ray.rllib.evaluation.worker_set.T] [source]#
Apply
func
to all workers’ Policies iff inpolicies_to_train
.- Parameters
func – A function - taking a Policy and its ID - that is called on all workers’ Policies, for which
worker.is_policy_to_train()
returns True.- Returns
- The list of n return values of all
func([trainable policy], [ID])
-calls.
- Return type
List[any]
- foreach_env(func: Callable[[Any], List[ray.rllib.evaluation.worker_set.T]]) List[List[ray.rllib.evaluation.worker_set.T]] [source]#
Calls
func
with all workers’ sub-environments as args.An “underlying sub environment” is a single clone of an env within a vectorized environment.
func
takes a single underlying sub environment as arg, e.g. a gym.Env object.- Parameters
func – A function - taking an EnvType (normally a gym.Env object) as arg and returning a list of lists of return values, one value per underlying sub-environment per each worker.
- Returns
The list (workers) of lists (sub environments) of results.
- foreach_env_with_context(func: Callable[[ray.rllib.env.base_env.BaseEnv, ray.rllib.env.env_context.EnvContext], List[ray.rllib.evaluation.worker_set.T]]) List[List[ray.rllib.evaluation.worker_set.T]] [source]#
Calls
func
with all workers’ sub-environments and env_ctx as args.An “underlying sub environment” is a single clone of an env within a vectorized environment.
func
takes a single underlying sub environment and the env_context as args.- Parameters
func – A function - taking a BaseEnv object and an EnvContext as arg - and returning a list of lists of return values over envs of the worker.
- Returns
- The list (1 item per workers) of lists (1 item per sub-environment)
of results.