Distributed Execution API

ray.rllib.execution.synchronous_parallel_sample(worker_set: ray.rllib.evaluation.worker_set.WorkerSet, remote_fn: Optional[Callable[[RolloutWorker], None]] = None) List[ray.rllib.policy.sample_batch.SampleBatch][source]

Runs parallel and synchronous rollouts on all remote workers.

Waits for all workers to return from the remote calls.

If no remote workers exist (num_workers == 0), use the local worker for sampling.

Alternatively to calling worker.sample.remote(), the user can provide a remote_fn(), which will be applied to the worker(s) instead.

Parameters
  • worker_set – The WorkerSet to use for sampling.

  • remote_fn – If provided, use worker.apply.remote(remote_fn) instead of worker.sample.remote() to generate the requests.

Returns

The list of collected sample batch types (one for each parallel rollout worker in the given worker_set).

Examples

>>> # 2 remote workers (num_workers=2):
>>> batches = synchronous_parallel_sample(trainer.workers)
>>> print(len(batches))
... 2
>>> print(batches[0])
... SampleBatch(16: ['obs', 'actions', 'rewards', 'dones'])
>>> # 0 remote workers (num_workers=0): Using the local worker.
>>> batches = synchronous_parallel_sample(trainer.workers)
>>> print(len(batches))
... 1
class ray.rllib.execution.ApplyGradients(workers, policies: List[str] = frozenset({}), update_all=True)[source]

Callable that applies gradients and updates workers.

This should be used with the .for_each() operator.

Examples

>>> apply_op = grads_op.for_each(ApplyGradients(workers))
>>> print(next(apply_op))
None

Updates the STEPS_TRAINED_COUNTER counter in the local iterator context.

ray.rllib.execution.AsyncGradients(workers: ray.rllib.evaluation.worker_set.WorkerSet) ray.util.iter.LocalIterator[Tuple[Union[List[Tuple[Any, Any]], List[Any]], int]][source]

Operator to compute gradients in parallel from rollout workers.

Parameters

workers (WorkerSet) – set of rollout workers to use.

Returns

A local iterator over policy gradients computed on rollout workers.

Examples

>>> grads_op = AsyncGradients(workers)
>>> print(next(grads_op))
{"var_0": ..., ...}, 50  # grads, batch count

Updates the STEPS_SAMPLED_COUNTER counter and LEARNER_INFO field in the local iterator context.

class ray.rllib.execution.AverageGradients[source]

Callable that averages the gradients in a batch.

This should be used with the .for_each() operator after a set of gradients have been batched with .batch().

Examples

>>> batched_grads = grads_op.batch(32)
>>> avg_grads = batched_grads.for_each(AverageGradients())
>>> print(next(avg_grads))
{"var_0": ..., ...}, 1600  # averaged grads, summed batch count
class ray.rllib.execution.CollectMetrics(workers: ray.rllib.evaluation.worker_set.WorkerSet, min_history: int = 100, timeout_seconds: int = 180, keep_per_episode_custom_metrics: bool = False, selected_workers: Optional[List[ray.actor.ActorHandle]] = None, by_steps_trained: bool = False)[source]

Callable that collects metrics from workers.

The metrics are smoothed over a given history window.

This should be used with the .for_each() operator. For a higher level API, consider using StandardMetricsReporting instead.

Examples

>>> output_op = train_op.for_each(CollectMetrics(workers))
>>> print(next(output_op))
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
class ray.rllib.execution.ComputeGradients(workers: ray.rllib.evaluation.worker_set.WorkerSet)[source]

Callable that computes gradients with respect to the policy loss.

This should be used with the .for_each() operator.

Examples

>>> grads_op = rollouts.for_each(ComputeGradients(workers))
>>> print(next(grads_op))
{"var_0": ..., ...}, 50  # grads, batch count

Updates the LEARNER_INFO info field in the local iterator context.

class ray.rllib.execution.ConcatBatches(min_batch_size: int, count_steps_by: str = 'env_steps')[source]

Callable used to merge batches into larger batches for training.

This should be used with the .combine() operator.

Examples

>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.combine(ConcatBatches(
...    min_batch_size=10000, count_steps_by="env_steps"))
>>> print(next(rollouts).count)
10000
ray.rllib.execution.Concurrently(ops: List[ray.util.iter.LocalIterator], *, mode: str = 'round_robin', output_indexes: Optional[List[int]] = None, round_robin_weights: Optional[List[int]] = None) ray.util.iter.LocalIterator[Union[SampleBatch, MultiAgentBatch]][source]

Operator that runs the given parent iterators concurrently.

Parameters
  • mode (str) – One of ‘round_robin’, ‘async’. In ‘round_robin’ mode, we alternate between pulling items from each parent iterator in order deterministically. In ‘async’ mode, we pull from each parent iterator as fast as they are produced. This is non-deterministic.

  • output_indexes (list) – If specified, only output results from the given ops. For example, if output_indexes=[0], only results from the first op in ops will be returned.

  • round_robin_weights (list) – List of weights to use for round robin mode. For example, [2, 1] will cause the iterator to pull twice as many items from the first iterator as the second. [2, 1, *] will cause as many items to be pulled as possible from the third iterator without blocking. This is only allowed in round robin mode.

Examples

>>> sim_op = ParallelRollouts(...).for_each(...)
>>> replay_op = LocalReplay(...).for_each(...)
>>> combined_op = Concurrently([sim_op, replay_op], mode="async")
ray.rllib.execution.Dequeue(input_queue: queue.Queue, check=<function <lambda>>) ray.util.iter.LocalIterator[Union[SampleBatch, MultiAgentBatch]][source]

Dequeue data items from a queue.Queue instance.

The dequeue is non-blocking, so Dequeue operations can execute with Enqueue via the Concurrently() operator.

Parameters
  • input_queue (Queue) – queue to pull items from.

  • check (fn) – liveness check. When this function returns false, Dequeue() will raise an error to halt execution.

Examples

>>> queue = queue.Queue(100)
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
>>> read_op = Dequeue(queue)
>>> combined_op = Concurrently([write_op, read_op], mode="async")
>>> next(combined_op)
SampleBatch(...)
class ray.rllib.execution.Enqueue(output_queue: queue.Queue)[source]

Enqueue data items into a queue.Queue instance.

Returns the input item as output.

The enqueue is non-blocking, so Enqueue operations can executed with Dequeue via the Concurrently() operator.

Examples

>>> queue = queue.Queue(100)
>>> write_op = ParallelRollouts(...).for_each(Enqueue(queue))
>>> read_op = Dequeue(queue)
>>> combined_op = Concurrently([write_op, read_op], mode="async")
>>> next(combined_op)
SampleBatch(...)
class ray.rllib.execution.LearnerThread(local_worker: ray.rllib.evaluation.rollout_worker.RolloutWorker, minibatch_buffer_size: int, num_sgd_iter: int, learner_queue_size: int, learner_queue_timeout: int)[source]

Background thread that updates the local model from sample trajectories.

The learner thread communicates with the main thread through Queues. This is needed since Ray operations can only be run on the main thread. In addition, moving heavyweight gradient ops session runs off the main thread improves overall throughput.

run() None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

add_learner_metrics(result: Dict) Dict[source]

Add internal metrics to a trainer result dict.

class ray.rllib.execution.MixInReplay(num_slots: int, replay_proportion: float)[source]

This operator adds replay to a stream of experiences.

It takes input batches, and returns a list of batches that include replayed data as well. The number of replayed batches is determined by the configured replay proportion. The max age of a batch is determined by the number of replay slots.

class ray.rllib.execution.MultiAgentReplayBuffer(num_shards: int = 1, learning_starts: int = 1000, capacity: int = 10000, replay_batch_size: int = 1, prioritized_replay_alpha: float = 0.6, prioritized_replay_beta: float = 0.4, prioritized_replay_eps: float = 1e-06, replay_mode: str = 'independent', replay_sequence_length: int = 1, replay_burn_in: int = 0, replay_zero_init_states: bool = True, buffer_size=- 1, *, _ray_trace_ctx=None)[source]

A replay buffer shard storing data for all policies (in multiagent setup).

Ray actors are single-threaded, so for scalability, multiple replay actors may be created to increase parallelism.

static get_instance_for_testing()[source]

Return a MultiAgentReplayBuffer instance that has been previously instantiated.

Returns

The lastly instantiated MultiAgentReplayBuffer.

Return type

_local_replay_buffer

get_host(*, _ray_trace_ctx=None) str[source]

Returns the computer’s network name.

Returns

The computer’s networks name or an empty string, if the network name could not be determined.

add_batch(batch: Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch], *, _ray_trace_ctx=None) None[source]

Adds a batch to the appropriate policy’s replay buffer.

Turns the batch into a MultiAgentBatch of the DEFAULT_POLICY_ID if it is not a MultiAgentBatch. Subsequently adds the batch to

Parameters

batch (SampleBatchType) – The batch to be added.

replay(policy_id: Optional[str] = None, *, _ray_trace_ctx=None) Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch][source]

If this buffer was given a fake batch, return it, otherwise return a MultiAgentBatch with samples.

update_priorities(prio_dict: Dict, *, _ray_trace_ctx=None) None[source]

Updates the priorities of underlying replay buffers.

Computes new priorities from td_errors and prioritized_replay_eps. These priorities are used to update underlying replay buffers per policy_id.

Parameters
  • prio_dict (Dict) – A dictionary containing td_errors for

  • buffers. (batches saved in underlying replay) –

stats(debug: bool = False, *, _ray_trace_ctx=None) Dict[source]

Returns the stats of this buffer and all underlying buffers.

Parameters
  • debug (bool) – If True, stats of underlying replay buffers will

  • debug=True. (be fetched with) –

Returns

Dictionary of buffer stats.

Return type

stat

class ray.rllib.execution.MultiGPULearnerThread(local_worker: ray.rllib.evaluation.rollout_worker.RolloutWorker, num_gpus: int = 1, lr=None, train_batch_size: int = 500, num_multi_gpu_tower_stacks: int = 1, num_sgd_iter: int = 1, learner_queue_size: int = 16, learner_queue_timeout: int = 300, num_data_load_threads: int = 16, _fake_gpus: bool = False, minibatch_buffer_size=None)[source]

Learner that can use multiple GPUs and parallel loading.

This class is used for async sampling algorithms.

Example workflow: 2 GPUs and 3 multi-GPU tower stacks. -> On each GPU, there are 3 slots for batches, indexed 0, 1, and 2.

Workers collect data from env and push it into inqueue: Workers -> (data) -> self.inqueue

We also have two queues, indicating, which stacks are loaded and which are not. - idle_tower_stacks = [0, 1, 2] <- all 3 stacks are free at first. - ready_tower_stacks = [] <- None of the 3 stacks is loaded with data.

ready_tower_stacks is managed by ready_tower_stacks_buffer for possible minibatch-SGD iterations per loaded batch (this avoids a reload from CPU to GPU for each SGD iter).

n _MultiGPULoaderThreads: self.inqueue -get()-> policy.load_batch_into_buffer() -> ready_stacks = [0 …]

This thread: self.ready_tower_stacks_buffer -get()-> policy.learn_on_loaded_batch() -> if SGD-iters done, put stack index back in idle_tower_stacks queue.

class ray.rllib.execution.OncePerTimeInterval(delay: Optional[float] = None)[source]

Callable that returns True once per given interval.

This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.

Examples

>>> throttled_op = train_op.filter(OncePerTimeInterval(5))
>>> start = time.time()
>>> next(throttled_op)
>>> print(time.time() - start)
5.00001  # will be greater than 5 seconds
class ray.rllib.execution.OncePerTimestepsElapsed(delay_steps: int, by_steps_trained: bool = False)[source]

Callable that returns True once per given number of timesteps.

This should be used with the .filter() operator to throttle / rate-limit metrics reporting. For a higher-level API, consider using StandardMetricsReporting instead.

Examples

>>> throttled_op = train_op.filter(OncePerTimestepsElapsed(1000))
>>> next(throttled_op)
# will only return after 1000 steps have elapsed
ray.rllib.execution.ParallelRollouts(workers: ray.rllib.evaluation.worker_set.WorkerSet, *, mode='bulk_sync', num_async=1) ray.util.iter.LocalIterator[ray.rllib.policy.sample_batch.SampleBatch][source]

Operator to collect experiences in parallel from rollout workers.

If there are no remote workers, experiences will be collected serially from the local worker instance instead.

Parameters
  • workers (WorkerSet) – set of rollout workers to use.

  • mode (str) – One of ‘async’, ‘bulk_sync’, ‘raw’. In ‘async’ mode, batches are returned as soon as they are computed by rollout workers with no order guarantees. In ‘bulk_sync’ mode, we collect one batch from each worker and concatenate them together into a large batch to return. In ‘raw’ mode, the ParallelIterator object is returned directly and the caller is responsible for implementing gather and updating the timesteps counter.

  • num_async (int) – In async mode, the max number of async requests in flight per actor.

Returns

A local iterator over experiences collected in parallel.

Examples

>>> rollouts = ParallelRollouts(workers, mode="async")
>>> batch = next(rollouts)
>>> print(batch.count)
50  # config.rollout_fragment_length
>>> rollouts = ParallelRollouts(workers, mode="bulk_sync")
>>> batch = next(rollouts)
>>> print(batch.count)
200  # config.rollout_fragment_length * config.num_workers

Updates the STEPS_SAMPLED_COUNTER counter in the local iterator context.

ray.rllib.execution.Replay(*, local_buffer: Optional[ray.rllib.execution.buffers.multi_agent_replay_buffer.MultiAgentReplayBuffer] = None, actors: Optional[List[ray.actor.ActorHandle]] = None, num_async: int = 4) ray.util.iter.LocalIterator[Union[SampleBatch, MultiAgentBatch]][source]

Replay experiences from the given buffer or actors.

This should be combined with the StoreToReplayActors operation using the Concurrently() operator.

Parameters
  • local_buffer – Local buffer to use. Only one of this and replay_actors can be specified.

  • actors – List of replay actors. Only one of this and local_buffer can be specified.

  • num_async – In async mode, the max number of async requests in flight per actor.

Examples

>>> actors = [ReplayActor.remote() for _ in range(4)]
>>> replay_op = Replay(actors=actors)
>>> next(replay_op)
SampleBatch(...)
class ray.rllib.execution.SelectExperiences(policy_ids: Optional[Container[str]] = None, local_worker: Optional[RolloutWorker] = None)[source]

Callable used to select experiences from a MultiAgentBatch.

This should be used with the .for_each() operator.

Examples

>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.for_each(SelectExperiences(["pol1", "pol2"]))
>>> print(next(rollouts).policy_batches.keys())
{"pol1", "pol2"}
class ray.rllib.execution.SimpleReplayBuffer(num_slots: int, replay_proportion: Optional[float] = None)[source]

Simple replay buffer that operates over batches.

ray.rllib.execution.StandardMetricsReporting(train_op: ray.util.iter.LocalIterator[Any], workers: ray.rllib.evaluation.worker_set.WorkerSet, config: dict, selected_workers: Optional[List[ray.actor.ActorHandle]] = None, by_steps_trained: bool = False) ray.util.iter.LocalIterator[dict][source]

Operator to periodically collect and report metrics.

Parameters
  • train_op (LocalIterator) – Operator for executing training steps. We ignore the output values.

  • workers (WorkerSet) – Rollout workers to collect metrics from.

  • config (dict) – Trainer configuration, used to determine the frequency of stats reporting.

  • selected_workers (list) – Override the list of remote workers to collect metrics from.

  • by_steps_trained (bool) – If True, uses the STEPS_TRAINED_COUNTER instead of the STEPS_SAMPLED_COUNTER in metrics.

Returns

A local iterator over training results.

Return type

LocalIterator[dict]

Examples

>>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
>>> metrics_op = StandardMetricsReporting(train_op, workers, config)
>>> next(metrics_op)
{"episode_reward_max": ..., "episode_reward_mean": ..., ...}
class ray.rllib.execution.StandardizeFields(fields: List[str])[source]

Callable used to standardize fields of batches.

This should be used with the .for_each() operator. Note that the input may be mutated by this operator for efficiency.

Examples

>>> rollouts = ParallelRollouts(...)
>>> rollouts = rollouts.for_each(StandardizeFields(["advantages"]))
>>> print(np.std(next(rollouts)["advantages"]))
1.0
class ray.rllib.execution.StoreToReplayBuffer(*, local_buffer: Optional[ray.rllib.execution.buffers.multi_agent_replay_buffer.MultiAgentReplayBuffer] = None, actors: Optional[List[ray.actor.ActorHandle]] = None)[source]

Callable that stores data into replay buffer actors.

If constructed with a local replay actor, data will be stored into that buffer. If constructed with a list of replay actor handles, data will be stored randomly among those actors.

This should be used with the .for_each() operator on a rollouts iterator. The batch that was stored is returned.

Examples

>>> actors = [ReplayActor.remote() for _ in range(4)]
>>> rollouts = ParallelRollouts(...)
>>> store_op = rollouts.for_each(StoreToReplayActors(actors=actors))
>>> next(store_op)
SampleBatch(...)
class ray.rllib.execution.TrainOneStep(workers: ray.rllib.evaluation.worker_set.WorkerSet, policies: List[str] = frozenset({}), num_sgd_iter: int = 1, sgd_minibatch_size: int = 0)[source]

Callable that improves the policy and updates workers.

This should be used with the .for_each() operator. A tuple of the input and learner stats will be returned.

Examples

>>> rollouts = ParallelRollouts(...)
>>> train_op = rollouts.for_each(TrainOneStep(workers))
>>> print(next(train_op))  # This trains the policy on one batch.
SampleBatch(...), {"learner_stats": ...}

Updates the STEPS_TRAINED_COUNTER counter and LEARNER_INFO field in the local iterator context.

class ray.rllib.execution.MultiGPUTrainOneStep(*, workers: ray.rllib.evaluation.worker_set.WorkerSet, sgd_minibatch_size: int, num_sgd_iter: int, num_gpus: int, _fake_gpus: bool = False, shuffle_sequences=- 1, framework=- 1)[source]

Multi-GPU version of TrainOneStep.

This should be used with the .for_each() operator. A tuple of the input and learner stats will be returned.

Examples

>>> rollouts = ParallelRollouts(...)
>>> train_op = rollouts.for_each(MultiGPUTrainOneStep(workers, ...))
>>> print(next(train_op))  # This trains the policy on one batch.
SampleBatch(...), {"learner_stats": ...}

Updates the STEPS_TRAINED_COUNTER counter and LEARNER_INFO field in the local iterator context.

class ray.rllib.execution.UpdateTargetNetwork(workers: ray.rllib.evaluation.worker_set.WorkerSet, target_update_freq: int, by_steps_trained: bool = False, policies: List[str] = frozenset({}))[source]

Periodically call policy.update_target() on all trainable policies.

This should be used with the .for_each() operator after training step has been taken.

Examples

>>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
>>> update_op = train_op.for_each(
...     UpdateTargetIfNeeded(workers, target_update_freq=500))
>>> print(next(update_op))
None

Updates the LAST_TARGET_UPDATE_TS and NUM_TARGET_UPDATES counters in the local iterator context. The value of the last update counter is used to track when we should update the target next.