Distributed Execution API

ray.rllib.execution.synchronous_parallel_sample(*, worker_set: ray.rllib.evaluation.worker_set.WorkerSet, max_agent_steps: Optional[int] = None, max_env_steps: Optional[int] = None, concat: bool = True) Union[List[Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch]], ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch][source]

Runs parallel and synchronous rollouts on all remote workers.

Waits for all workers to return from the remote calls.

If no remote workers exist (num_workers == 0), use the local worker for sampling.

Alternatively to calling worker.sample.remote(), the user can provide a remote_fn(), which will be applied to the worker(s) instead.

Parameters
  • worker_set – The WorkerSet to use for sampling.

  • remote_fn – If provided, use worker.apply.remote(remote_fn) instead of worker.sample.remote() to generate the requests.

  • max_agent_steps – Optional number of agent steps to be included in the final batch.

  • max_env_steps – Optional number of environment steps to be included in the final batch.

  • concat – Whether to concat all resulting batches at the end and return the concat’d batch.

Returns

The list of collected sample batch types (one for each parallel rollout worker in the given worker_set).

Examples

>>> # Define an RLlib Algorithm.
>>> algorithm = ... 
>>> # 2 remote workers (num_workers=2):
>>> batches = synchronous_parallel_sample(algorithm.workers) 
>>> print(len(batches)) 
2
>>> print(batches[0]) 
SampleBatch(16: ['obs', 'actions', 'rewards', 'dones'])
>>> # 0 remote workers (num_workers=0): Using the local worker.
>>> batches = synchronous_parallel_sample(algorithm.workers) 
>>> print(len(batches)) 
1
ray.rllib.execution.train_one_step(algorithm, train_batch, policies_to_train=None) Dict[source]

Function that improves the all policies in train_batch on the local worker.

Examples

>>> from ray.rllib.execution.rollout_ops import synchronous_parallel_sample
>>> algo = [...] 
>>> train_batch = synchronous_parallel_sample(algo.workers) 
>>> # This trains the policy on one batch.
>>> results = train_one_step(algo, train_batch)) 
{"default_policy": ...}

Updates the NUM_ENV_STEPS_TRAINED and NUM_AGENT_STEPS_TRAINED counters as well as the LEARN_ON_BATCH_TIMER timer of the algorithm object.

class ray.rllib.execution.ApplyGradients(workers, policies: List[str] = frozenset({}), update_all=True)[source]

Callable that applies gradients and updates workers.

This should be used with the .for_each() operator.

Examples

>>> from ray.rllib.execution.train_ops import ApplyGradients
>>> grad_op, workers = ... 
>>> apply_op = grads_op.for_each(ApplyGradients(workers)) 
>>> print(next(apply_op)) 
None

Updates the STEPS_TRAINED_COUNTER counter in the local iterator context.

ray.rllib.execution.AsyncGradients(workers: ray.rllib.evaluation.worker_set.WorkerSet) ray.util.iter.LocalIterator[Tuple[Union[List[Tuple[Any, Any]], List[Any]], int]][source]

Operator to compute gradients in parallel from rollout workers.

Parameters

workers – set of rollout workers to use.

Returns

A local iterator over policy gradients computed on rollout workers.

Examples

>>> from ray.rllib.execution.rollout_ops import AsyncGradients
>>> workers = ... 
>>> grads_op = AsyncGradients(workers) 
>>> print(next(grads_op)) 
{"var_0": ..., ...}, 50  # grads, batch count

Updates the STEPS_SAMPLED_COUNTER counter and LEARNER_INFO field in the local iterator context.

class ray.rllib.execution.AverageGradients[source]

Callable that averages the gradients in a batch.

This should be used with the .for_each() operator after a set of gradients have been batched with .batch().

Examples

>>> from ray.rllib.execution.train_ops import AverageGradients
>>> grads_op = ... 
>>> batched_grads = grads_op.batch(32) 
>>> avg_grads = batched_grads.for_each(AverageGradients()) 
>>> print(next(avg_grads)) 
{"var_0": ..., ...}, 1600  # averaged grads, summed batch count
class ray.rllib.execution.CollectMetrics(workers: ray.rllib.evaluation.worker_set.WorkerSet, min_history: int = 100, timeout_seconds: int = 180, keep_per_episode_custom_metrics: bool = False, selected_workers: Optional[List[ray.actor.ActorHandle]] = None, by_steps_trained: bool = False)[source]

Callable that collects metrics from workers.

The metrics are smoothed over a given history window.

This should be used with the .for_each() operator. For a higher level API, consider using StandardMetricsReporting instead.

Examples

>>> from ray.rllib.execution.metric_ops import CollectMetrics
>>> train_op, workers = ... 
>>> output_op = train_op.for_each(CollectMetrics(workers)) 
>>> print(next(output_op)) 
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
class ray.rllib.execution.ComputeGradients(workers: ray.rllib.evaluation.worker_set.WorkerSet)[source]

Callable that computes gradients with respect to the policy loss.

This should be used with the .for_each() operator.

Examples

>>> from ray.rllib.execution.train_ops import ComputeGradients
>>> rollouts, workers = ... 
>>> grads_op = rollouts.for_each(ComputeGradients(workers)) 
>>> print(next(grads_op)) 
{"var_0": ..., ...}, 50  # grads, batch count

Updates the LEARNER_INFO info field in the local iterator context.

class ray.rllib.execution.ConcatBatches(min_batch_size: int, count_steps_by: str = 'env_steps', using_iterators=True)[source]

Callable used to merge batches into larger batches for training.

This should be used with the .combine() operator if using_iterators=True.

Examples

>>> from ray.rllib.execution import ParallelRollouts
>>> rollouts = ParallelRollouts(...) 
>>> rollouts = rollouts.combine(ConcatBatches( 
...    min_batch_size=10000, count_steps_by="env_steps")) 
>>> print(next(rollouts).count) 
10000
ray.rllib.execution.Concurrently(ops: List[ray.util.iter.LocalIterator], *, mode: str = 'round_robin', output_indexes: Optional[List[int]] = None, round_robin_weights: Optional[List[int]] = None) ray.util.iter.LocalIterator[Union[SampleBatch, MultiAgentBatch]][source]

Operator that runs the given parent iterators concurrently.

Parameters
  • mode – One of ‘round_robin’, ‘async’. In ‘round_robin’ mode, we alternate between pulling items from each parent iterator in order deterministically. In ‘async’ mode, we pull from each parent iterator as fast as they are produced. This is non-deterministic.

  • output_indexes – If specified, only output results from the given ops. For example, if output_indexes=[0], only results from the first op in ops will be returned.

  • round_robin_weights – List of weights to use for round robin mode. For example, [2, 1] will cause the iterator to pull twice as many items from the first iterator as the second. [2, 1, *] will cause as many items to be pulled as possible from the third iterator without blocking. This is only allowed in round robin mode.

Examples

>>> from ray.rllib.execution import ParallelRollouts
>>> sim_op = ParallelRollouts(...).for_each(...) 
>>> replay_op = LocalReplay(...).for_each(...) 
>>> combined_op = Concurrently( 
...     [sim_op, replay_op], mode="async")
ray.rllib.execution.Dequeue(input_queue: queue.Queue, check=<function <lambda>>) ray.util.iter.LocalIterator[Union[SampleBatch, MultiAgentBatch]][source]

Dequeue data items from a queue.Queue instance.

The dequeue is non-blocking, so Dequeue operations can execute with Enqueue via the Concurrently() operator.

Parameters
  • input_queue – queue to pull items from.

  • check – liveness check. When this function returns false, Dequeue() will raise an error to halt execution.

Examples

>>> import queue
>>> from ray.rllib.execution import ParallelRollouts
>>> queue = queue.Queue(100) 
>>> write_op = ParallelRollouts(...) 
...     .for_each(Enqueue(queue))
>>> read_op = Dequeue(queue) 
>>> combined_op = Concurrently( 
...     [write_op, read_op], mode="async")
>>> next(combined_op) 
SampleBatch(...)
class ray.rllib.execution.Enqueue(output_queue: queue.Queue)[source]

Enqueue data items into a queue.Queue instance.

Returns the input item as output.

The enqueue is non-blocking, so Enqueue operations can executed with Dequeue via the Concurrently() operator.

Examples

>>> import queue
>>> from ray.rllib.execution import ParallelRollouts
>>> queue = queue.Queue(100) 
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue)) 
>>> read_op = Dequeue(queue) 
>>> combined_op = Concurrently( 
...     [write_op, read_op], mode="async")
>>> next(combined_op) 
SampleBatch(...)
class ray.rllib.execution.LearnerThread(local_worker: ray.rllib.evaluation.rollout_worker.RolloutWorker, minibatch_buffer_size: int, num_sgd_iter: int, learner_queue_size: int, learner_queue_timeout: int)[source]

Background thread that updates the local model from sample trajectories.

The learner thread communicates with the main thread through Queues. This is needed since Ray operations can only be run on the main thread. In addition, moving heavyweight gradient ops session runs off the main thread improves overall throughput.

run() None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

add_learner_metrics(result: Dict, overwrite_learner_info=True) Dict[source]

Add internal metrics to a result dict.

class ray.rllib.execution.MixInReplay(num_slots: int, replay_proportion: float)[source]

This operator adds replay to a stream of experiences.

It takes input batches, and returns a list of batches that include replayed data as well. The number of replayed batches is determined by the configured replay proportion. The max age of a batch is determined by the number of replay slots.

class ray.rllib.execution.MultiGPULearnerThread(local_worker: ray.rllib.evaluation.rollout_worker.RolloutWorker, num_gpus: int = 1, lr=None, train_batch_size: int = 500, num_multi_gpu_tower_stacks: int = 1, num_sgd_iter: int = 1, learner_queue_size: int = 16, learner_queue_timeout: int = 300, num_data_load_threads: int = 16, _fake_gpus: bool = False, minibatch_buffer_size=None)[source]

Learner that can use multiple GPUs and parallel loading.

This class is used for async sampling algorithms.

Example workflow: 2 GPUs and 3 multi-GPU tower stacks. -> On each GPU, there are 3 slots for batches, indexed 0, 1, and 2.

Workers collect data from env and push it into inqueue: Workers -> (data) -> self.inqueue

We also have two queues, indicating, which stacks are loaded and which are not. - idle_tower_stacks = [0, 1, 2] <- all 3 stacks are free at first. - ready_tower_stacks = [] <- None of the 3 stacks is loaded with data.

ready_tower_stacks is managed by ready_tower_stacks_buffer for possible minibatch-SGD iterations per loaded batch (this avoids a reload from CPU to GPU for each SGD iter).

n _MultiGPULoaderThreads: self.inqueue -get()-> policy.load_batch_into_buffer() -> ready_stacks = [0 …]

This thread: self.ready_tower_stacks_buffer -get()-> policy.learn_on_loaded_batch() -> if SGD-iters done, put stack index back in idle_tower_stacks queue.

class ray.rllib.execution.OncePerTimeInterval(delay: Optional[float] = None)[source]

Callable that returns True once per given interval.

This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.

Examples

>>> import time
>>> from ray.rllib.execution.metric_ops import OncePerTimeInterval
>>> train_op = ... 
>>> throttled_op = train_op.filter(OncePerTimeInterval(5)) 
>>> start = time.time() 
>>> next(throttled_op) 
>>> print(time.time() - start) 
5.00001  # will be greater than 5 seconds
class ray.rllib.execution.OncePerTimestepsElapsed(delay_steps: int, by_steps_trained: bool = False)[source]

Callable that returns True once per given number of timesteps.

This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.

Examples

>>> from ray.rllib.execution.metric_ops import OncePerTimestepsElapsed
>>> train_op = ... 
>>> throttled_op = train_op.filter( 
...     OncePerTimestepsElapsed(1000))
>>> next(throttled_op) 
# will only return after 1000 steps have elapsed
ray.rllib.execution.ParallelRollouts(workers: ray.rllib.evaluation.worker_set.WorkerSet, *, mode='bulk_sync', num_async=1) ray.util.iter.LocalIterator[ray.rllib.policy.sample_batch.SampleBatch][source]

Operator to collect experiences in parallel from rollout workers.

If there are no remote workers, experiences will be collected serially from the local worker instance instead.

Parameters
  • workers – set of rollout workers to use.

  • mode – One of ‘async’, ‘bulk_sync’, ‘raw’. In ‘async’ mode, batches are returned as soon as they are computed by rollout workers with no order guarantees. In ‘bulk_sync’ mode, we collect one batch from each worker and concatenate them together into a large batch to return. In ‘raw’ mode, the ParallelIterator object is returned directly and the caller is responsible for implementing gather and updating the timesteps counter.

  • num_async – In async mode, the max number of async requests in flight per actor.

Returns

A local iterator over experiences collected in parallel.

Examples

>>> from ray.rllib.execution import ParallelRollouts
>>> workers = ... 
>>> rollouts = ParallelRollouts(workers, mode="async") 
>>> batch = next(rollouts) 
>>> print(batch.count) 
50  # config.rollout_fragment_length
>>> rollouts = ParallelRollouts(workers, mode="bulk_sync") 
>>> batch = next(rollouts) 
>>> print(batch.count) 
200  # config.rollout_fragment_length * config.num_workers

Updates the STEPS_SAMPLED_COUNTER counter in the local iterator context.

ray.rllib.execution.Replay(*, local_buffer: Optional[ray.rllib.utils.replay_buffers.multi_agent_replay_buffer.MultiAgentReplayBuffer] = None, num_items_to_replay: int = 1, actors: Optional[List[ray.actor.ActorHandle]] = None, num_async: int = 4) ray.util.iter.LocalIterator[Union[SampleBatch, MultiAgentBatch]][source]

Replay experiences from the given buffer or actors.

This should be combined with the StoreToReplayActors operation using the Concurrently() operator.

Parameters
  • local_buffer – Local buffer to use. Only one of this and replay_actors can be specified.

  • num_items_to_replay – Number of items to sample from buffer

  • actors – List of replay actors. Only one of this and local_buffer can be specified.

  • num_async – In async mode, the max number of async requests in flight per actor.

Examples

>>> from ray.rllib.utils.replay_buffers import multi_agent_replay_buffer
>>> actors = [ 
...     multi_agent_replay_buffer.ReplayActor.remote() for _ in range(4)]
>>> replay_op = Replay(actors=actors, 
...     num_items_to_replay=batch_size)
>>> next(replay_op) 
SampleBatch(...)
class ray.rllib.execution.SelectExperiences(policy_ids: Optional[Container[str]] = None, local_worker: Optional[RolloutWorker] = None)[source]

Callable used to select experiences from a MultiAgentBatch.

This should be used with the .for_each() operator.

Examples

>>> from ray.rllib.execution import ParallelRollouts
>>> from ray.rllib.execution.rollout_ops import SelectExperiences
>>> rollouts = ParallelRollouts(...) 
>>> rollouts = rollouts.for_each( 
...     SelectExperiences(["pol1", "pol2"]))
>>> print(next(rollouts).policy_batches.keys()) 
{"pol1", "pol2"}
class ray.rllib.execution.SimpleReplayBuffer(num_slots: int, replay_proportion: Optional[float] = None)[source]

Simple replay buffer that operates over batches.

ray.rllib.execution.StandardMetricsReporting(train_op: ray.util.iter.LocalIterator[Any], workers: ray.rllib.evaluation.worker_set.WorkerSet, config: dict, selected_workers: Optional[List[ray.actor.ActorHandle]] = None, by_steps_trained: bool = False) ray.util.iter.LocalIterator[dict][source]

Operator to periodically collect and report metrics.

Parameters
  • train_op – Operator for executing training steps. We ignore the output values.

  • workers – Rollout workers to collect metrics from.

  • config – Algorithm configuration, used to determine the frequency of stats reporting.

  • selected_workers – Override the list of remote workers to collect metrics from.

  • by_steps_trained – If True, uses the STEPS_TRAINED_COUNTER instead of the STEPS_SAMPLED_COUNTER in metrics.

Returns

A local iterator over training results.

Return type

LocalIterator[dict]

Examples

>>> from ray.rllib.execution import ParallelRollouts, TrainOneStep
>>> train_op = ParallelRollouts(...) 
...     .for_each(TrainOneStep(...))
>>> metrics_op = StandardMetricsReporting( 
...     train_op, workers, config)
>>> next(metrics_op) 
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
class ray.rllib.execution.StandardizeFields(fields: List[str])[source]

Callable used to standardize fields of batches.

This should be used with the .for_each() operator. Note that the input may be mutated by this operator for efficiency.

Examples

>>> from ray.rllib.execution import ParallelRollouts
>>> from ray.rllib.execution.rollout_ops import StandardizeFields
>>> import numpy as np
>>> rollouts = ParallelRollouts(...) 
>>> rollouts = rollouts.for_each( 
...     StandardizeFields(["advantages"]))
>>> print(np.std(next(rollouts)["advantages"])) 
1.0
class ray.rllib.execution.StoreToReplayBuffer(*, local_buffer: Optional[ray.rllib.utils.replay_buffers.multi_agent_replay_buffer.MultiAgentReplayBuffer] = None, actors: Optional[List[ray.actor.ActorHandle]] = None)[source]

Callable that stores data into replay buffer actors.

If constructed with a local replay actor, data will be stored into that buffer. If constructed with a list of replay actor handles, data will be stored randomly among those actors.

This should be used with the .for_each() operator on a rollouts iterator. The batch that was stored is returned.

Examples

>>> from ray.rllib.utils.replay_buffers import multi_agent_replay_buffer
>>> from ray.rllib.execution.replay_ops import StoreToReplayBuffer
>>> from ray.rllib.execution import ParallelRollouts
>>> actors = [ 
...     multi_agent_replay_buffer.ReplayActor.remote() for _ in range(4)]
>>> rollouts = ParallelRollouts(...) 
>>> store_op = rollouts.for_each( 
...     StoreToReplayBuffer(actors=actors))
>>> next(store_op) 
SampleBatch(...)
class ray.rllib.execution.UpdateTargetNetwork(workers: ray.rllib.evaluation.worker_set.WorkerSet, target_update_freq: int, by_steps_trained: bool = False, policies: List[str] = frozenset({}))[source]

Periodically call policy.update_target() on all trainable policies.

This should be used with the .for_each() operator after training step has been taken.

Examples

>>> from ray.rllib.execution.train_ops import UpdateTargetNetwork
>>> from ray.rllib.execution import ParallelRollouts, TrainOneStep
>>> workers = ... 
>>> train_op = ParallelRollouts(...).for_each( 
...     TrainOneStep(...))
>>> update_op = train_op.for_each( 
...     UpdateTargetNetwork(workers, target_update_freq=500)) 
>>> print(next(update_op)) 
None

Updates the LAST_TARGET_UPDATE_TS and NUM_TARGET_UPDATES counters in the local iterator context. The value of the last update counter is used to track when we should update the target next.

class ray.rllib.execution.MinibatchBuffer(inqueue: queue.Queue, size: int, timeout: float, num_passes: int, init_num_passes: int = 1)[source]

Ring buffer of recent data batches for minibatch SGD.

This is for use with AsyncSamplesOptimizer.

get() Tuple[Any, bool][source]

Get a new batch from the internal ring buffer.

Returns

Data item saved from inqueue. released: True if the item is now removed from the ring buffer.

Return type

buf