ReplayBuffer API

The following classes don’t take into account the separation of experiences from different policies, multi-agent replay buffers will be explained further below.

ray.rllib.utils.replay_buffers.replay_buffer

class ray.rllib.utils.replay_buffers.replay_buffer.StorageUnit(value)[source]

Specifies how batches are structured in a ReplayBuffer.

timesteps: One buffer slot per timestep. sequences: One buffer slot per sequence. episodes: One buffer slot per episode. fragemts: One buffer slot per incoming batch.

DeveloperAPI: This API may change across minor Ray releases.

class ray.rllib.utils.replay_buffers.replay_buffer.ReplayBuffer(capacity: int = 10000, storage_unit: Union[str, ray.rllib.utils.replay_buffers.replay_buffer.StorageUnit] = 'timesteps', **kwargs)[source]

Bases: ray.util.iter.ParallelIteratorWorker

The lowest-level replay buffer interface used by RLlib.

This class implements a basic ring-type of buffer with random sampling. ReplayBuffer is the base class for advanced types that add functionality while retaining compatibility through inheritance.

The following examples show how buffers behave with different storage_units and capacities. This behaviour is generally similar for other buffers, although they might not implement all storage_units.

Examples

>>> from ray.rllib.utils.replay_buffers import ReplayBuffer, 
...                         StorageUnit 
>>> from ray.rllib.policy.sample_batch import SampleBatch 
>>> # Store any batch as a whole
>>> buffer = ReplayBuffer(capacity=10,
...                         storage_unit=StorageUnit.FRAGMENTS) 
>>> buffer.add(SampleBatch({"a": [1], "b": [2, 3, 4]})) 
>>> print(buffer.sample(1)) 
>>> # SampleBatch(1: ['a', 'b'])
>>> # Store only complete episodes
>>> buffer = ReplayBuffer(capacity=10,
...                         storage_unit=StorageUnit.EPISODES) 
>>> buffer.add(SampleBatch({"c": [1, 2, 3, 4], 
...                        SampleBatch.T: [0, 1, 0, 1],
...                        SampleBatch.DONES: [False, True, False, True],
...                        SampleBatch.EPS_ID: [0, 0, 1, 1]})) 
>>> eps_n = buffer.sample(1) 
>>> print(eps_n[SampleBatch.EPS_ID]) 
>>> # [1 1]
>>> # Store single timesteps
>>> buffer = ReplayBuffer(capacity=2,  
...                         storage_unit=StorageUnit.TIMESTEPS) 
>>> buffer.add(SampleBatch({"a": [1, 2],
...                         SampleBatch.T: [0, 1]})) 
>>> t_n = buffer.sample(1) 
>>> print(t_n["a"]) 
>>> # [2]
>>> buffer.add(SampleBatch({"a": [3], SampleBatch.T: [2]})) 
>>> print(buffer._eviction_started) 
>>> # True
>>> t_n = buffer.sample(1) 
>>> print(t_n["a"]) 
>>> # [3] 
>>> buffer = ReplayBuffer(capacity=10, 
...                         storage_unit=StorageUnit.SEQUENCES) 
>>> buffer.add(SampleBatch({"c": [1, 2, 3], 
...                        SampleBatch.SEQ_LENS: [1, 2]})) 
>>> seq_n = buffer.sample(1) 
>>> print(seq_n["c"]) 
>>> # [1]

DeveloperAPI: This API may change across minor Ray releases.

add(batch: Union[SampleBatch, MultiAgentBatch], **kwargs) None[source]

Adds a batch of experiences to this buffer.

Splits batch into chunks of timesteps, sequences or episodes, depending on self._storage_unit. Calls self._add_single_batch to add resulting slices to the buffer storage.

Args:

batch: Batch to add. **kwargs: Forward compatibility kwargs.

DeveloperAPI: This API may change across minor Ray releases.

sample(num_items: int, **kwargs) Optional[Union[SampleBatch, MultiAgentBatch]][source]

Samples num_items items from this buffer.

The items depend on the buffer’s storage_unit. Samples in the results may be repeated.

Examples for sampling results:

1) If storage unit ‘timesteps’ has been chosen and batches of size 5 have been added, sample(5) will yield a concatenated batch of 15 timesteps.

2) If storage unit ‘sequences’ has been chosen and sequences of different lengths have been added, sample(5) will yield a concatenated batch with a number of timesteps equal to the sum of timesteps in the 5 sampled sequences.

3) If storage unit ‘episodes’ has been chosen and episodes of different lengths have been added, sample(5) will yield a concatenated batch with a number of timesteps equal to the sum of timesteps in the 5 sampled episodes.

Args:

num_items: Number of items to sample from this buffer. **kwargs: Forward compatibility kwargs.

Returns:

Concatenated batch of items.

DeveloperAPI: This API may change across minor Ray releases.

stats(debug: bool = False) dict[source]

Returns the stats of this buffer.

Args:
debug: If True, adds sample eviction statistics to the returned

stats dict.

Returns:

A dictionary of stats about this buffer.

DeveloperAPI: This API may change across minor Ray releases.

get_state() Dict[str, Any][source]

Returns all local state.

Returns:

The serializable local state.

DeveloperAPI: This API may change across minor Ray releases.

set_state(state: Dict[str, Any]) None[source]

Restores all local state to the provided state.

Args:
state: The new state to set this buffer. Can be

obtained by calling self.get_state().

DeveloperAPI: This API may change across minor Ray releases.

get_host() str[source]

Returns the computer’s network name.

Returns:

The computer’s networks name or an empty string, if the network name could not be determined.

DeveloperAPI: This API may change across minor Ray releases.

apply(func: Callable[[ray.rllib.utils.replay_buffers.replay_buffer.ReplayBuffer, Optional[Any], Optional[Any]], ray.rllib.utils.typing.T], *args, **kwargs) ray.rllib.utils.typing.T[source]

Calls the given function with this ReplayBuffer instance.

This is useful if we want to apply a function to a set of remote actors.

Args:

func: A callable that accepts the replay buffer itself, args and kwargs *args: Any args to pass to func **kwargs: Any kwargs to pass to func

Returns:

Return value of the induced function call

DeveloperAPI: This API may change across minor Ray releases.

ray.rllib.utils.replay_buffers.prioritized_replay_buffer

class ray.rllib.utils.replay_buffers.prioritized_replay_buffer.PrioritizedReplayBuffer(capacity: int = 10000, storage_unit: str = 'timesteps', alpha: float = 1.0, **kwargs)[source]

Bases: ray.rllib.utils.replay_buffers.replay_buffer.ReplayBuffer

This buffer implements Prioritized Experience Replay

The algorithm has been described by Tom Schaul et. al. in “Prioritized Experience Replay”. See https://arxiv.org/pdf/1511.05952.pdf for the full paper.

DeveloperAPI: This API may change across minor Ray releases.

sample(num_items: int, beta: float, **kwargs) Optional[Union[SampleBatch, MultiAgentBatch]][source]

Sample num_items items from this buffer, including prio. weights.

Samples in the results may be repeated.

Examples for storage of SamplesBatches: - If storage unit timesteps has been chosen and batches of size 5 have been added, sample(5) will yield a concatenated batch of 15 timesteps. - If storage unit ‘sequences’ has been chosen and sequences of different lengths have been added, sample(5) will yield a concatenated batch with a number of timesteps equal to the sum of timesteps in the 5 sampled sequences. - If storage unit ‘episodes’ has been chosen and episodes of different lengths have been added, sample(5) will yield a concatenated batch with a number of timesteps equal to the sum of timesteps in the 5 sampled episodes.

Args:

num_items: Number of items to sample from this buffer. beta: To what degree to use importance weights (0 - no corrections, 1 - full correction). **kwargs: Forward compatibility kwargs.

Returns:

Concatenated SampleBatch of items including “weights” and “batch_indexes” fields denoting IS of each sampled transition and original idxes in buffer of sampled experiences.

DeveloperAPI: This API may change across minor Ray releases.

update_priorities(idxes: List[int], priorities: List[float]) None[source]

Update priorities of items at given indices.

Sets priority of item at index idxes[i] in buffer to priorities[i].

Args:

idxes: List of indices of items priorities: List of updated priorities corresponding to items at the idxes denoted by variable idxes.

DeveloperAPI: This API may change across minor Ray releases.

stats(debug: bool = False) Dict[source]

Returns the stats of this buffer.

Args:

debug: If true, adds sample eviction statistics to the returned stats dict.

Returns:

A dictionary of stats about this buffer.

DeveloperAPI: This API may change across minor Ray releases.

get_state() Dict[str, Any][source]

Returns all local state.

Returns:

The serializable local state.

DeveloperAPI: This API may change across minor Ray releases.

set_state(state: Dict[str, Any]) None[source]

Restores all local state to the provided state.

Args:

state: The new state to set this buffer. Can be obtained by calling self.get_state().

DeveloperAPI: This API may change across minor Ray releases.

ray.rllib.utils.replay_buffers.reservoir_replay_buffer

class ray.rllib.utils.replay_buffers.reservoir_replay_buffer.ReservoirReplayBuffer(capacity: int = 10000, storage_unit: str = 'timesteps', **kwargs)[source]

Bases: ray.rllib.utils.replay_buffers.replay_buffer.ReplayBuffer

This buffer implements reservoir sampling.

The algorithm has been described by Jeffrey S. Vitter in “Random sampling with a reservoir”. See https://www.cs.umd.edu/~samir/498/vitter.pdf for the full paper.

stats(debug: bool = False) dict[source]

Returns the stats of this buffer.

Parameters

debug – If True, adds sample eviction statistics to the returned stats dict.

Returns

A dictionary of stats about this buffer.

get_state() Dict[str, Any][source]

Returns all local state.

Returns

The serializable local state.

set_state(state: Dict[str, Any]) None[source]

Restores all local state to the provided state.

Parameters

state – The new state to set this buffer. Can be obtained by calling self.get_state().

MultiAgentReplayBuffer classes

The following classes use the above, “single-agent”, buffers as underlying buffers to facilitate splitting up experiences between the different agents’ policies. In multi-agent RL, more than one agent exists in the environment and not all of these agents may utilize the same policy (mapping M agents to N policies, where M <= N). This leads to the need for MultiAgentReplayBuffers that store the experiences of different policies separately.

ray.rllib.utils.replay_buffers.multi_agent_replay_buffer

class ray.rllib.utils.replay_buffers.multi_agent_replay_buffer.MultiAgentReplayBuffer(capacity: int = 10000, storage_unit: str = 'timesteps', num_shards: int = 1, learning_starts: int = 1000, replay_mode: str = 'independent', replay_sequence_override: bool = True, replay_sequence_length: int = 1, replay_burn_in: int = 0, replay_zero_init_states: bool = True, underlying_buffer_config: Optional[dict] = None, **kwargs)[source]

Bases: ray.rllib.utils.replay_buffers.replay_buffer.ReplayBuffer

A replay buffer shard for multiagent setups.

This buffer is meant to be run in parallel to distribute experiences across num_shards shards. Unlike simpler buffers, it holds a set of buffers - one for each policy ID.

DeveloperAPI: This API may change across minor Ray releases.

replay(**kwargs)

DeveloperAPI: This API may change across minor Ray releases.

add(batch: Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch], **kwargs) None[source]

Adds a batch to the appropriate policy’s replay buffer.

Turns the batch into a MultiAgentBatch of the DEFAULT_POLICY_ID if it is not a MultiAgentBatch. Subsequently, adds the individual policy batches to the storage.

Args:

batch : The batch to be added. **kwargs: Forward compatibility kwargs.

DeveloperAPI: This API may change across minor Ray releases.

sample(num_items: int, policy_id: Optional[str] = None, **kwargs) Optional[Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch]][source]

Samples a MultiAgentBatch of num_items per one policy’s buffer.

If less than num_items records are in the policy’s buffer, some samples in the results may be repeated to fulfil the batch size num_items request. Returns an empty batch if there are no items in the buffer.

Args:

num_items: Number of items to sample from a policy’s buffer. policy_id: ID of the policy that created the experiences we sample. If none is given, sample from all policies.

Returns:

Concatenated MultiAgentBatch of items. **kwargs: Forward compatibility kwargs.

DeveloperAPI: This API may change across minor Ray releases.

stats(debug: bool = False) Dict[source]

Returns the stats of this buffer and all underlying buffers.

Args:

debug: If True, stats of underlying replay buffers will be fetched with debug=True.

Returns:

stat: Dictionary of buffer stats.

DeveloperAPI: This API may change across minor Ray releases.

get_state() Dict[str, Any][source]

Returns all local state.

Returns:

The serializable local state.

DeveloperAPI: This API may change across minor Ray releases.

set_state(state: Dict[str, Any]) None[source]

Restores all local state to the provided state.

Args:
state: The new state to set this buffer. Can be obtained by

calling self.get_state().

DeveloperAPI: This API may change across minor Ray releases.

ray.rllib.utils.replay_buffers.multi_agent_prioritized_replay_buffer

class ray.rllib.utils.replay_buffers.multi_agent_prioritized_replay_buffer.MultiAgentPrioritizedReplayBuffer(capacity: int = 10000, storage_unit: str = 'timesteps', num_shards: int = 1, learning_starts: int = 1000, replay_mode: str = 'independent', replay_sequence_length: int = 1, replay_burn_in: int = 0, replay_zero_init_states: bool = True, prioritized_replay_alpha: float = 0.6, prioritized_replay_beta: float = 0.4, prioritized_replay_eps: float = 1e-06, underlying_buffer_config: Optional[dict] = None, **kwargs)[source]

Bases: ray.rllib.utils.replay_buffers.multi_agent_replay_buffer.MultiAgentReplayBuffer, ray.rllib.utils.replay_buffers.prioritized_replay_buffer.PrioritizedReplayBuffer

A prioritized replay buffer shard for multiagent setups.

This buffer is meant to be run in parallel to distribute experiences across num_shards shards. Unlike simpler buffers, it holds a set of buffers - one for each policy ID.

DeveloperAPI: This API may change across minor Ray releases.

update_priorities(prio_dict: Dict) None[source]

Updates the priorities of underlying replay buffers.

Computes new priorities from td_errors and prioritized_replay_eps. These priorities are used to update underlying replay buffers per policy_id.

Args:

prio_dict: A dictionary containing td_errors for batches saved in underlying replay buffers.

DeveloperAPI: This API may change across minor Ray releases.

stats(debug: bool = False) Dict[source]

Returns the stats of this buffer and all underlying buffers.

Args:

debug: If True, stats of underlying replay buffers will be fetched with debug=True.

Returns:

stat: Dictionary of buffer stats.

DeveloperAPI: This API may change across minor Ray releases.

Utility Methods

utils.update_priorities_in_replay_buffer(config: dict, train_batch: Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch], train_results: dict) None

Updates the priorities in a prioritized replay buffer, given training results.

The abs(TD-error) from the loss (inside train_results) is used as new priorities for the row-indices that were sampled for the train batch.

Don’t do anything if the given buffer does not support prioritized replay.

Parameters
  • replay_buffer – The replay buffer, whose priority values to update. This may also be a buffer that does not support priorities.

  • config – The Algorithm’s config dict.

  • train_batch – The batch used for the training update.

  • train_results – A train results dict, generated by e.g. the train_one_step() utility.

utils.sample_min_n_steps_from_buffer(min_steps: int, count_by_agent_steps: bool) Optional[Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch]]

Samples a minimum of n timesteps from a given replay buffer.

This utility method is primarily used by the QMIX algorithm and helps with sampling a given number of time steps which has stored samples in units of sequences or complete episodes. Samples n batches from replay buffer until the total number of timesteps reaches train_batch_size.

Parameters
  • replay_buffer – The replay buffer to sample from

  • num_timesteps – The number of timesteps to sample

  • count_by_agent_steps – Whether to count agent steps or env steps

Returns

A concatenated SampleBatch or MultiAgentBatch with samples from the buffer.