ConnectorV2 API#

rllib.connectors.connector_v2.ConnectorV2#

class ray.rllib.connectors.connector_v2.ConnectorV2(input_observation_space: gymnasium.Space | None = None, input_action_space: gymnasium.Space | None = None, **kwargs)[source]#

Base class defining the API for an individual “connector piece”.

A ConnectorV2 (“connector piece”) is usually part of a whole series of connector pieces within a so-called connector pipeline, which in itself also abides to this very API. For example, you might have a connector pipeline consisting of two connector pieces, A and B, both instances of subclasses of ConnectorV2 and each one performing a particular transformation on their input data. The resulting connector pipeline (A->B) itself also abides to this very ConnectorV2 API and could thus be part of yet another, higher-level connector pipeline, e.g. (A->B)->C->D.

Any ConnectorV2 instance (individual pieces or several connector pieces in a pipeline) is a callable and users should override the __call__() method. When called, they take the outputs of a previous connector piece (or an empty dict if there are no previous pieces) and all the data collected thus far in the ongoing episode(s) (only applies to connectors used in EnvRunners) or retrieved from a replay buffer or from an environment sampling step (only applies to connectors used in Learner pipelines). From this input data, a ConnectorV2 then performs a transformation step.

There are 3 types of pipelines any ConnectorV2 piece can belong to: 1) EnvToModulePipeline: The connector transforms environment data before it gets to the RLModule. This type of pipeline is used by an EnvRunner for transforming env output data into RLModule readable data (for the next RLModule forward pass). For example, such a pipeline would include observation postprocessors, -filters, or any RNN preparation code related to time-sequences and zero-padding. 2) ModuleToEnvPipeline: This type of pipeline is used by an EnvRunner to transform RLModule output data to env readable actions (for the next env.step() call). For example, in case the RLModule only outputs action distribution parameters (but not actual actions), the ModuleToEnvPipeline would take care of sampling the actions to be sent back to the end from the resulting distribution (made deterministic if exploration is off). 3) LearnerConnectorPipeline: This connector pipeline type transforms data coming from an EnvRunner.sample() call or a replay buffer and will then be sent into the RLModule’s forward_train() method in order to compute loss function inputs. This type of pipeline is used by a Learner worker to transform raw training data (a batch or a list of episodes) to RLModule readable training data (for the next RLModule forward_train() call).

Some connectors might be stateful, for example for keeping track of observation filtering stats (mean and stddev values). Any Algorithm, which uses connectors is responsible for frequently synchronizing the states of all connectors and connector pipelines between the EnvRunners (owning the env-to-module and module-to-env pipelines) and the Learners (owning the Learner pipelines).

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

recompute_output_observation_space(input_observation_space: gymnasium.Space, input_action_space: gymnasium.Space) gymnasium.Space[source]#

Re-computes a new (output) observation space based on the input spaces.

This method should be overridden by users to make sure a ConnectorPipelineV2 knows how the input spaces through its individual ConnectorV2 pieces are being transformed.

from gymnasium.spaces import Box, Discrete
import numpy as np

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.utils.numpy import one_hot
from ray.rllib.utils.test_utils import check

class OneHotConnector(ConnectorV2):
    def recompute_output_observation_space(
        self,
        input_observation_space,
        input_action_space,
    ):
        return Box(0.0, 1.0, (input_observation_space.n,), np.float32)

    def __call__(
        self,
        *,
        rl_module,
        batch,
        episodes,
        explore=None,
        shared_data=None,
        metrics=None,
        **kwargs,
    ):
        assert "obs" in batch
        batch["obs"] = one_hot(batch["obs"])
        return batch

connector = OneHotConnector(input_observation_space=Discrete(2))
batch = {"obs": np.array([1, 0, 0], np.int32)}
output = connector(rl_module=None, batch=batch, episodes=None)

check(output, {"obs": np.array([[0.0, 1.0], [1.0, 0.0], [1.0, 0.0]])})

If this ConnectorV2 does not change the observation space in any way, leave this parent method implementation untouched.

Parameters:
  • input_observation_space – The input observation space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

  • input_action_space – The input action space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

Returns:

The new observation space (after data has passed through this ConnectorV2 piece).

recompute_output_action_space(input_observation_space: gymnasium.Space, input_action_space: gymnasium.Space) gymnasium.Space[source]#

Re-computes a new (output) action space based on the input space.

This method should be overridden by users to make sure a ConnectorPipelineV2 knows how the input spaces through its individual ConnectorV2 pieces are being transformed.

If this ConnectorV2 does not change the action space in any way, leave this parent method implementation untouched.

Parameters:
  • input_observation_space – The input observation space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

  • input_action_space – The input action space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

Returns:

The new action space (after data has passed through this ConenctorV2 piece).

abstract __call__(*, rl_module: RLModule, batch: Dict[str, Any], episodes: List[SingleAgentEpisode | MultiAgentEpisode], explore: bool | None = None, shared_data: dict | None = None, metrics: MetricsLogger | None = None, **kwargs) Any[source]#

Method for transforming an input batch into an output batch.

Parameters:
  • rl_module – The RLModule object that the connector connects to or from.

  • batch – The input data to be transformed by this connector. Transformations might either be done in-place or a new structure may be returned. Note that the information in batch will eventually either become the forward batch for the RLModule (env-to-module and learner connectors) or the input to the env.step() call (module-to-env connectors). Note that in the first case (batch is a forward batch for RLModule), the information in batch will be discarded after that RLModule forward pass. Any transformation of information (e.g. observation preprocessing) that you have only done inside batch will be lost, unless you have written it back into the corresponding episodes during the connector pass.

  • episodes – The list of SingleAgentEpisode or MultiAgentEpisode objects, each corresponding to one slot in the vector env. Note that episodes can be read from (e.g. to place information into batch), but also written to. You should only write back (changed, transformed) information into the episodes, if you want these changes to be “permanent”. For example if you sample from an environment, pick up observations from the episodes and place them into batch, then transform these observations, and would like to make these transformations permanent (note that batch gets discarded after the RLModule forward pass), then you have to write the transformed observations back into the episode to make sure you do not have to perform the same transformation again on the learner (or replay buffer) side. The Learner will hence work on the already changed episodes (and compile the train batch using the Learner connector).

  • explore – Whether explore is currently on. Per convention, if True, the RLModule’s forward_exploration method should be called, if False, the EnvRunner should call forward_inference instead.

  • shared_data – Optional additional context data that needs to be exchanged between different ConnectorV2 pieces (in the same pipeline) or across ConnectorV2 pipelines (meaning between env-to-module and module-to-env).

  • metrics – Optional MetricsLogger instance to log custom metrics to.

  • kwargs – Forward API-compatibility kwargs.

Returns:

The transformed connector output.

static single_agent_episode_iterator(episodes: List[SingleAgentEpisode | MultiAgentEpisode], agents_that_stepped_only: bool = True, zip_with_batch_column: List[Any] | Dict[Tuple, Any] | None = None) Iterator[SingleAgentEpisode][source]#

An iterator over a list of episodes yielding always SingleAgentEpisodes.

In case items in the list are MultiAgentEpisodes, these are broken down into their individual agents’ SingleAgentEpisodes and those are then yielded one after the other.

Useful for connectors that operate on both single-agent and multi-agent episodes.

Parameters:
  • episodes – The list of SingleAgent- or MultiAgentEpisode objects.

  • agents_that_stepped_only – If True (and multi-agent setup), will only place items of those agents into the batch that have just stepped in the actual MultiAgentEpisode (this is checked via a MultiAgentEpside.episode.get_agents_to_act()). Note that this setting is ignored in a single-agent setups b/c the agent steps at each timestep regardless.

  • zip_with_batch_column – If provided, must be a list of batch items corresponding to the given episodes (single agent case) or a dict mapping (AgentID, ModuleID) tuples to lists of individual batch items corresponding to this agent/module combination. The iterator will then yield tuples of SingleAgentEpisode objects (1st item) along with the data item (2nd item) that this episode was responsible for generating originally.

Yields:

All SingleAgentEpisodes in the input list, whereby MultiAgentEpisodes will be broken down into their individual SingleAgentEpisode components.

static add_batch_item(batch: Dict[str, Any], column: str, item_to_add: Any, single_agent_episode: SingleAgentEpisode | None = None) None[source]#

Adds a data item under column to the given batch.

The item_to_add is stored in the batch in the following manner: 1) If single_agent_episode is not provided (None), will store the item in a list directly under column: column -> [item, item, …] 2) If single_agent_episode’s agent_id and module_id properties are None (single_agent_episode is not part of a multi-agent episode), will append item_to_add to a list under a (<episodeID>,) key under column: column -> (<episodeID>,) -> [item, item, …] 3) If single_agent_episode’s agent_id and module_id are NOT None (single_agent_episode is part of a multi-agent episode), will append item_to_add to a list under a (<episodeID>,<AgentID>,<ModuleID>) key under column: column -> (<episodeID>,<AgentID>,<ModuleID>) -> [item, item, …]

See the these examples here for clarification of these three cases:

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.env.multi_agent_episode import MultiAgentEpisode
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.utils.test_utils import check

# 1) Simple case (no episodes provided) -> Store data in a list directly
# under `column`:
batch = {}
ConnectorV2.add_batch_item(batch, "test_col", item_to_add=5)
ConnectorV2.add_batch_item(batch, "test_col", item_to_add=6)
check(batch, {"test_col": [5, 6]})
ConnectorV2.add_batch_item(batch, "test_col_2", item_to_add=-10)
check(batch, {
    "test_col": [5, 6],
    "test_col_2": [-10],
})

# 2) Single-agent case (SingleAgentEpisode provided) -> Store data in a list
# under the keys: `column` -> `(<eps_id>,)` -> [...]:
batch = {}
episode = SingleAgentEpisode(
    id_="SA-EPS0",
    observations=[0, 1, 2, 3],
    actions=[1, 2, 3],
    rewards=[1.0, 2.0, 3.0],
)
ConnectorV2.add_batch_item(batch, "test_col", 5, episode)
ConnectorV2.add_batch_item(batch, "test_col", 6, episode)
ConnectorV2.add_batch_item(batch, "test_col_2", -10, episode)
check(batch, {
    "test_col": {("SA-EPS0",): [5, 6]},
    "test_col_2": {("SA-EPS0",): [-10]},
})

# 3) Multi-agent case (SingleAgentEpisode provided that has `agent_id` and
# `module_id` information) -> Store data in a list under the keys:
# `column` -> `(<episodeID>,<AgentID>,<ModuleID>)` -> [...]:
batch = {}
ma_episode = MultiAgentEpisode(
    id_="MA-EPS1",
    observations=[
        {"ag0": 0, "ag1": 1}, {"ag0": 2, "ag1": 4}
    ],
    actions=[{"ag0": 0, "ag1": 1}],
    rewards=[{"ag0": -0.1, "ag1": -0.2}],
    # ag0 maps to mod0, ag1 maps to mod1, etc..
    agent_to_module_mapping_fn=lambda aid, eps: f"mod{aid[2:]}",
)
ConnectorV2.add_batch_item(
    batch,
    "test_col",
    item_to_add=5,
    single_agent_episode=ma_episode.agent_episodes["ag0"],
)
ConnectorV2.add_batch_item(
    batch,
    "test_col",
    item_to_add=6,
    single_agent_episode=ma_episode.agent_episodes["ag0"],
)
ConnectorV2.add_batch_item(
    batch,
    "test_col_2",
    item_to_add=10,
    single_agent_episode=ma_episode.agent_episodes["ag1"],
)
check(
    batch,
    {
        "test_col": {("MA-EPS1", "ag0", "mod0"): [5, 6]},
        "test_col_2": {("MA-EPS1", "ag1", "mod1"): [10]},
    },
)
Parameters:
  • batch – The batch to store item_to_add in.

  • column – The column name (str) within the batch to store item_to_add under.

  • item_to_add – The data item to store in the batch.

  • single_agent_episode – An optional SingleAgentEpisode. If provided and its agent_id and module_id properties are None, creates a further sub dictionary under column, mapping from (<episodeID>,) to a list of data items (to which item_to_add will be appended in this call). If provided and its agent_id and module_id properties are NOT None, creates a further sub dictionary under column, mapping from (<episodeID>,,<AgentID>,<ModuleID>) to a list of data items (to which item_to_add will be appended in this call). If not provided, will append item_to_add to a list directly under column.

static add_n_batch_items(batch: Dict[str, Any], column: str, items_to_add: Any, num_items: int, single_agent_episode: SingleAgentEpisode | None = None) None[source]#

Adds a list of items (or batched item) under column to the given batch.

If items_to_add is not a list, but an already batched struct (of np.ndarray leafs), the items_to_add will be appended to possibly existing data under the same column as-is. A subsequent BatchIndividualItems ConnectorV2 piece will recognize this and batch the data properly into a single (batched) item. This is much faster than first splitting up items_to_add and then adding each item individually.

If single_agent_episode is provided and its agent_id and module_id properties are None, creates a further sub dictionary under column, mapping from (<episodeID>,) to a list of data items (to which items_to_add will be appended in this call). If single_agent_episode is provided and its agent_id and module_id properties are NOT None, creates a further sub dictionary under column, mapping from (<episodeID>,,<AgentID>,<ModuleID>) to a list of data items (to which items_to_add will be appended in this call). If single_agent_episode is not provided, will append items_to_add to a list directly under column.

import numpy as np

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.env.multi_agent_episode import MultiAgentEpisode
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.utils.test_utils import check

# Simple case (no episodes provided) -> Store data in a list directly under
# `column`:
batch = {}
ConnectorV2.add_n_batch_items(
    batch,
    "test_col",
    # List of (complex) structs.
    [{"a": np.array(3), "b": 4}, {"a": np.array(5), "b": 6}],
    num_items=2,
)
check(
    batch["test_col"],
    [{"a": np.array(3), "b": 4}, {"a": np.array(5), "b": 6}],
)
# In a new column (test_col_2), store some already batched items.
# This way, you may avoid having to disassemble an already batched item
# (e.g. a numpy array of shape (10, 2)) into its individual items (e.g.
# split the array into a list of len=10) and then adding these individually.
# The performance gains may be quite large when providing already batched
# items (such as numpy arrays with a batch dim):
ConnectorV2.add_n_batch_items(
    batch,
    "test_col_2",
    # One (complex) already batched struct.
    {"a": np.array([3, 5]), "b": np.array([4, 6])},
    num_items=2,
)
# Add more already batched items (this time with a different batch size)
ConnectorV2.add_n_batch_items(
    batch,
    "test_col_2",
    {"a": np.array([7, 7, 7]), "b": np.array([8, 8, 8])},
    num_items=3,  # <- in this case, this must be the batch size
)
check(
    batch["test_col_2"],
    [
        {"a": np.array([3, 5]), "b": np.array([4, 6])},
        {"a": np.array([7, 7, 7]), "b": np.array([8, 8, 8])},
    ],
)

# Single-agent case (SingleAgentEpisode provided) -> Store data in a list
# under the keys: `column` -> `(<eps_id>,)`:
batch = {}
episode = SingleAgentEpisode(
    id_="SA-EPS0",
    observations=[0, 1, 2, 3],
    actions=[1, 2, 3],
    rewards=[1.0, 2.0, 3.0],
)
ConnectorV2.add_n_batch_items(
    batch=batch,
    column="test_col",
    items_to_add=[5, 6, 7],
    num_items=3,
    single_agent_episode=episode,
)
check(batch, {
    "test_col": {("SA-EPS0",): [5, 6, 7]},
})

# Multi-agent case (SingleAgentEpisode provided that has `agent_id` and
# `module_id` information) -> Store data in a list under the keys:
# `column` -> `(<episodeID>,<AgentID>,<ModuleID>)`:
batch = {}
ma_episode = MultiAgentEpisode(
    id_="MA-EPS1",
    observations=[
        {"ag0": 0, "ag1": 1}, {"ag0": 2, "ag1": 4}
    ],
    actions=[{"ag0": 0, "ag1": 1}],
    rewards=[{"ag0": -0.1, "ag1": -0.2}],
    # ag0 maps to mod0, ag1 maps to mod1, etc..
    agent_to_module_mapping_fn=lambda aid, eps: f"mod{aid[2:]}",
)
ConnectorV2.add_batch_item(
    batch,
    "test_col",
    item_to_add=5,
    single_agent_episode=ma_episode.agent_episodes["ag0"],
)
ConnectorV2.add_batch_item(
    batch,
    "test_col",
    item_to_add=6,
    single_agent_episode=ma_episode.agent_episodes["ag0"],
)
ConnectorV2.add_batch_item(
    batch,
    "test_col_2",
    item_to_add=10,
    single_agent_episode=ma_episode.agent_episodes["ag1"],
)
check(
    batch,
    {
        "test_col": {("MA-EPS1", "ag0", "mod0"): [5, 6]},
        "test_col_2": {("MA-EPS1", "ag1", "mod1"): [10]},
    },
)
Parameters:
  • batch – The batch to store n items_to_add in.

  • column – The column name (str) within the batch to store item_to_add under.

  • items_to_add – The list of data items to store in the batch OR an already batched (possibly nested) struct. In the latter case, the items_to_add will be appended to possibly existing data under the same column as-is. A subsequent BatchIndividualItems ConnectorV2 piece will recognize this and batch the data properly into a single (batched) item. This is much faster than first splitting up items_to_add and then adding each item individually.

  • num_items – The number of items in items_to_add. This arg is mostly for asserting the correct usage of this method by checking, whether the given data in items_to_add really has the right amount of individual items.

  • single_agent_episode – An optional SingleAgentEpisode. If provided and its agent_id and module_id properties are None, creates a further sub dictionary under column, mapping from (<episodeID>,) to a list of data items (to which items_to_add will be appended in this call). If provided and its agent_id and module_id properties are NOT None, creates a further sub dictionary under column, mapping from (<episodeID>,,<AgentID>,<ModuleID>) to a list of data items (to which items_to_add will be appended in this call). If not provided, will append items_to_add to a list directly under column.

static foreach_batch_item_change_in_place(batch: Dict[str, Any], column: str | List[str] | Tuple[str], func: Callable[[Any, int | None, Any | None, str | None], Any]) None[source]#

Runs the provided func on all items under one or more columns in the batch.

Use this method to conveniently loop through all items in a batch and transform them in place.

func takes the following as arguments: - The item itself. If column is a list of column names, this argument is a tuple of items. - The EpisodeID. This value might be None. - The AgentID. This value might be None in the single-agent case. - The ModuleID. This value might be None in the single-agent case.

The return value(s) of func are used to directly override the values in the given batch.

Parameters:
  • batch – The batch to process in-place.

  • column – A single column name (str) or a list thereof. If a list is provided, the first argument to func is a tuple of items. If a single str is provided, the first argument to func is an individual item.

  • func – The function to call on each item or tuple of item(s).

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.utils.test_utils import check

# Simple case: Batch items are in lists directly under their column names.
batch = {
    "col1": [0, 1, 2, 3],
    "col2": [0, -1, -2, -3],
}
# Increase all ints by 1.
ConnectorV2.foreach_batch_item_change_in_place(
    batch=batch,
    column="col1",
    func=lambda item, *args: item + 1,
)
check(batch["col1"], [1, 2, 3, 4])

# Further increase all ints by 1 in col1 and flip sign in col2.
ConnectorV2.foreach_batch_item_change_in_place(
    batch=batch,
    column=["col1", "col2"],
    func=(lambda items, *args: (items[0] + 1, -items[1])),
)
check(batch["col1"], [2, 3, 4, 5])
check(batch["col2"], [0, 1, 2, 3])

# Single-agent case: Batch items are in lists under (eps_id,)-keys in a dict
# under their column names.
batch = {
    "col1": {
        ("eps1",): [0, 1, 2, 3],
        ("eps2",): [400, 500, 600],
    },
}
# Increase all ints of eps1 by 1 and divide all ints of eps2 by 100.
ConnectorV2.foreach_batch_item_change_in_place(
    batch=batch,
    column="col1",
    func=lambda item, eps_id, *args: (
        item + 1 if eps_id == "eps1" else item / 100
    ),
)
check(batch["col1"], {
    ("eps1",): [1, 2, 3, 4],
    ("eps2",): [4, 5, 6],
})

# Multi-agent case: Batch items are in lists under
# (eps_id, agent_id, module_id)-keys in a dict
# under their column names.
batch = {
    "col1": {
        ("eps1", "ag1", "mod1"): [1, 2, 3, 4],
        ("eps2", "ag1", "mod2"): [400, 500, 600],
        ("eps2", "ag2", "mod3"): [-1, -2, -3, -4, -5],
    },
}
# Decrease all ints of "eps1" by 1, divide all ints of "mod2" by 100, and
# flip sign of all ints of "ag2".
ConnectorV2.foreach_batch_item_change_in_place(
    batch=batch,
    column="col1",
    func=lambda item, eps_id, ag_id, mod_id: (
        item - 1
        if eps_id == "eps1"
        else item / 100
        if mod_id == "mod2"
        else -item
    ),
)
check(batch["col1"], {
    ("eps1", "ag1", "mod1"): [0, 1, 2, 3],
    ("eps2", "ag1", "mod2"): [4, 5, 6],
    ("eps2", "ag2", "mod3"): [1, 2, 3, 4, 5],
})
static switch_batch_from_column_to_module_ids(batch: Dict[str, Dict[str, Any]]) Dict[str, Dict[str, Any]][source]#

Switches the first two levels of a col_name -> ModuleID -> data type batch.

Assuming that the top level consists of column names as keys and the second level (under these columns) consists of ModuleID keys, the resulting batch will have these two reversed and thus map ModuleIDs to dicts mapping column names to data items.

from ray.rllib.utils.test_utils import check

batch = {
    "obs": {"module_0": [1, 2, 3]},
    "actions": {"module_0": [4, 5, 6], "module_1": [7]},
}
switched_batch = ConnectorV2.switch_batch_from_column_to_module_ids(batch)
check(
    switched_batch,
    {
        "module_0": {"obs": [1, 2, 3], "actions": [4, 5, 6]},
        "module_1": {"actions": [7]},
    },
)
Parameters:

batch – The batch to switch from being column name based (then ModuleIDs) to being ModuleID based (then column names).

Returns:

A new batch dict mapping ModuleIDs to dicts mapping column names (e.g. “obs”) to data.

get_state(components: str | Collection[str] | None = None, *, not_components: str | Collection[str] | None = None, **kwargs) Dict[str, Any][source]#

Returns the implementing class’s current state as a dict.

The returned dict must only contain msgpack-serializable data if you want to use the AlgorithmConfig._msgpack_checkpoints option. Consider returning your non msgpack-serializable data from the Checkpointable.get_ctor_args_and_kwargs method, instead.

Parameters:
  • components – An optional collection of string keys to be included in the returned state. This might be useful, if getting certain components of the state is expensive (e.g. reading/compiling the weights of a large NN) and at the same time, these components are not required by the caller.

  • not_components – An optional list of string keys to be excluded in the returned state, even if the same string is part of components. This is useful to get the complete state of the class, except one or a few components.

  • kwargs – Forward-compatibility kwargs.

Returns:

The current state of the implementing class (or only the components specified, w/o those in not_components).

set_state(state: Dict[str, Any]) None[source]#

Sets the implementing class’ state to the given state dict.

If component keys are missing in state, these components of the implementing class will not be updated/set.

Parameters:

state – The state dict to restore the state from. Maps component keys to the corresponding subcomponent’s own state.

get_ctor_args_and_kwargs() Tuple[Tuple, Dict[str, Any]][source]#

Returns the args/kwargs used to create self from its constructor.

Returns:

A tuple of the args (as a tuple) and kwargs (as a Dict[str, Any]) used to construct self from its class constructor.

reset_state() None[source]#

Resets the state of this ConnectorV2 to some initial value.

Note that this may NOT be the exact state that this ConnectorV2 was originally constructed with.

merge_states(states: List[Dict[str, Any]]) Dict[str, Any][source]#

Computes a resulting state given self’s state and a list of other states.

Algorithms should use this method for merging states between connectors running on parallel EnvRunner workers. For example, to synchronize the connector states of n remote workers and a local worker, one could: - Gather all remote worker connector states in a list. - Call self.merge_states() on the local worker passing it the states list. - Broadcast the resulting local worker’s connector state back to all remote workers. After this, all workers (including the local one) hold a merged/synchronized new connecto state.

Parameters:

states – The list of n other ConnectorV2 states to merge with self’s state into a single resulting state.

Returns:

The resulting state dict.

property observation_space#

Getter for our (output) observation space.

Logic: Use user provided space (if set via observation_space setter) otherwise, use the same as the input space, assuming this connector piece does not alter the space.

property action_space#

Getter for our (output) action space.

Logic: Use user provided space (if set via action_space setter) otherwise, use the same as the input space, assuming this connector piece does not alter the space.

rllib.connectors.connector_pipeline_v2.ConnectorPipelineV2#

class ray.rllib.connectors.connector_pipeline_v2.ConnectorPipelineV2(input_observation_space: gymnasium.Space | None = None, input_action_space: gymnasium.Space | None = None, *, connectors: List[ConnectorV2] | None = None, **kwargs)[source]#

Utility class for quick manipulation of a connector pipeline.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

recompute_output_observation_space(input_observation_space: gymnasium.Space, input_action_space: gymnasium.Space) gymnasium.Space[source]#

Re-computes a new (output) observation space based on the input spaces.

This method should be overridden by users to make sure a ConnectorPipelineV2 knows how the input spaces through its individual ConnectorV2 pieces are being transformed.

from gymnasium.spaces import Box, Discrete
import numpy as np

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.utils.numpy import one_hot
from ray.rllib.utils.test_utils import check

class OneHotConnector(ConnectorV2):
    def recompute_output_observation_space(
        self,
        input_observation_space,
        input_action_space,
    ):
        return Box(0.0, 1.0, (input_observation_space.n,), np.float32)

    def __call__(
        self,
        *,
        rl_module,
        batch,
        episodes,
        explore=None,
        shared_data=None,
        metrics=None,
        **kwargs,
    ):
        assert "obs" in batch
        batch["obs"] = one_hot(batch["obs"])
        return batch

connector = OneHotConnector(input_observation_space=Discrete(2))
batch = {"obs": np.array([1, 0, 0], np.int32)}
output = connector(rl_module=None, batch=batch, episodes=None)

check(output, {"obs": np.array([[0.0, 1.0], [1.0, 0.0], [1.0, 0.0]])})

If this ConnectorV2 does not change the observation space in any way, leave this parent method implementation untouched.

Parameters:
  • input_observation_space – The input observation space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

  • input_action_space – The input action space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

Returns:

The new observation space (after data has passed through this ConnectorV2 piece).

recompute_output_action_space(input_observation_space: gymnasium.Space, input_action_space: gymnasium.Space) gymnasium.Space[source]#

Re-computes a new (output) action space based on the input space.

This method should be overridden by users to make sure a ConnectorPipelineV2 knows how the input spaces through its individual ConnectorV2 pieces are being transformed.

If this ConnectorV2 does not change the action space in any way, leave this parent method implementation untouched.

Parameters:
  • input_observation_space – The input observation space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

  • input_action_space – The input action space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

Returns:

The new action space (after data has passed through this ConenctorV2 piece).

remove(name_or_class: str | Type)[source]#

Remove a single connector piece in this pipeline by its name or class.

Parameters:

name_or_class – The name of the connector piece to be removed from the pipeline.

insert_before(name_or_class: str | type, connector: ConnectorV2) ConnectorV2[source]#

Insert a new connector piece before an existing piece (by name or class).

Parameters:
  • name_or_class – Name or class of the connector piece before which connector will get inserted.

  • connector – The new connector piece to be inserted.

Returns:

The ConnectorV2 before which connector has been inserted.

insert_after(name_or_class: str | Type, connector: ConnectorV2) ConnectorV2[source]#

Insert a new connector piece after an existing piece (by name or class).

Parameters:
  • name_or_class – Name or class of the connector piece after which connector will get inserted.

  • connector – The new connector piece to be inserted.

Returns:

The ConnectorV2 after which connector has been inserted.

prepend(connector: ConnectorV2) None[source]#

Prepend a new connector at the beginning of a connector pipeline.

Parameters:

connector – The new connector piece to be prepended to this pipeline.

append(connector: ConnectorV2) None[source]#

Append a new connector at the end of a connector pipeline.

Parameters:

connector – The new connector piece to be appended to this pipeline.

get_state(components: str | Collection[str] | None = None, *, not_components: str | Collection[str] | None = None, **kwargs) Dict[str, Any][source]#

Returns the implementing class’s current state as a dict.

The returned dict must only contain msgpack-serializable data if you want to use the AlgorithmConfig._msgpack_checkpoints option. Consider returning your non msgpack-serializable data from the Checkpointable.get_ctor_args_and_kwargs method, instead.

Parameters:
  • components – An optional collection of string keys to be included in the returned state. This might be useful, if getting certain components of the state is expensive (e.g. reading/compiling the weights of a large NN) and at the same time, these components are not required by the caller.

  • not_components – An optional list of string keys to be excluded in the returned state, even if the same string is part of components. This is useful to get the complete state of the class, except one or a few components.

  • kwargs – Forward-compatibility kwargs.

Returns:

The current state of the implementing class (or only the components specified, w/o those in not_components).

set_state(state: Dict[str, Any]) None[source]#

Sets the implementing class’ state to the given state dict.

If component keys are missing in state, these components of the implementing class will not be updated/set.

Parameters:

state – The state dict to restore the state from. Maps component keys to the corresponding subcomponent’s own state.

get_checkpointable_components() List[Tuple[str, Checkpointable]][source]#

Returns the implementing class’s own Checkpointable subcomponents.

Returns:

A list of 2-tuples (name, subcomponent) describing the implementing class’ subcomponents, all of which have to be Checkpointable themselves and whose state is therefore written into subdirectories (rather than the main state file (self.STATE_FILE_NAME) when calling self.save_to_path()).

get_ctor_args_and_kwargs() Tuple[Tuple, Dict[str, Any]][source]#

Returns the args/kwargs used to create self from its constructor.

Returns:

A tuple of the args (as a tuple) and kwargs (as a Dict[str, Any]) used to construct self from its class constructor.

reset_state() None[source]#

Resets the state of this ConnectorV2 to some initial value.

Note that this may NOT be the exact state that this ConnectorV2 was originally constructed with.

merge_states(states: List[Dict[str, Any]]) Dict[str, Any][source]#

Computes a resulting state given self’s state and a list of other states.

Algorithms should use this method for merging states between connectors running on parallel EnvRunner workers. For example, to synchronize the connector states of n remote workers and a local worker, one could: - Gather all remote worker connector states in a list. - Call self.merge_states() on the local worker passing it the states list. - Broadcast the resulting local worker’s connector state back to all remote workers. After this, all workers (including the local one) hold a merged/synchronized new connecto state.

Parameters:

states – The list of n other ConnectorV2 states to merge with self’s state into a single resulting state.

Returns:

The resulting state dict.

property observation_space#

Getter for our (output) observation space.

Logic: Use user provided space (if set via observation_space setter) otherwise, use the same as the input space, assuming this connector piece does not alter the space.

property action_space#

Getter for our (output) action space.

Logic: Use user provided space (if set via action_space setter) otherwise, use the same as the input space, assuming this connector piece does not alter the space.

Observation preprocessors#

rllib.connectors.env_to_module.observation_preprocessor.SingleAgentObservationPreprocessor#

class ray.rllib.connectors.env_to_module.observation_preprocessor.SingleAgentObservationPreprocessor(input_observation_space: gymnasium.Space | None = None, input_action_space: gymnasium.Space | None = None, **kwargs)[source]#

Env-to-module connector preprocessing the most recent single-agent observation.

This is a convenience class that simplifies the writing of few-step preprocessor connectors.

Note that this class also works in a multi-agent setup, in which case RLlib separately calls this connector piece with each agents’ observation and SingleAgentEpisode object.

Users must implement the preprocess() method, which simplifies the usual procedure of extracting some data from a list of episodes and adding it to the batch to a mere “old-observation –transform–> return new-observation” step.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

recompute_output_observation_space(input_observation_space: gymnasium.Space, input_action_space: gymnasium.Space) gymnasium.Space[source]#

Re-computes a new (output) observation space based on the input spaces.

This method should be overridden by users to make sure a ConnectorPipelineV2 knows how the input spaces through its individual ConnectorV2 pieces are being transformed.

from gymnasium.spaces import Box, Discrete
import numpy as np

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.utils.numpy import one_hot
from ray.rllib.utils.test_utils import check

class OneHotConnector(ConnectorV2):
    def recompute_output_observation_space(
        self,
        input_observation_space,
        input_action_space,
    ):
        return Box(0.0, 1.0, (input_observation_space.n,), np.float32)

    def __call__(
        self,
        *,
        rl_module,
        batch,
        episodes,
        explore=None,
        shared_data=None,
        metrics=None,
        **kwargs,
    ):
        assert "obs" in batch
        batch["obs"] = one_hot(batch["obs"])
        return batch

connector = OneHotConnector(input_observation_space=Discrete(2))
batch = {"obs": np.array([1, 0, 0], np.int32)}
output = connector(rl_module=None, batch=batch, episodes=None)

check(output, {"obs": np.array([[0.0, 1.0], [1.0, 0.0], [1.0, 0.0]])})

If this ConnectorV2 does not change the observation space in any way, leave this parent method implementation untouched.

Parameters:
  • input_observation_space – The input observation space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

  • input_action_space – The input action space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

Returns:

The new observation space (after data has passed through this ConnectorV2 piece).

abstract preprocess(observation, episode: SingleAgentEpisode)[source]#

Override to implement the preprocessing logic.

Parameters:
  • observation – A single (non-batched) observation item for a single agent to be preprocessed by this connector.

  • episode – The SingleAgentEpisode instance, from which observation was taken. You can extract information on the particular AgentID and the ModuleID through episode.agent_id and episode.module_id.

Returns:

The new observation for the agent after observation has been preprocessed.

rllib.connectors.env_to_module.observation_preprocessor.MultiAgentObservationPreprocessor#

class ray.rllib.connectors.env_to_module.observation_preprocessor.MultiAgentObservationPreprocessor(input_observation_space: gymnasium.Space | None = None, input_action_space: gymnasium.Space | None = None, **kwargs)[source]#

Env-to-module connector preprocessing the most recent multi-agent observation.

The observation is always a dict of individual agents’ observations.

This is a convenience class that simplifies the writing of few-step preprocessor connectors.

Users must implement the preprocess() method, which simplifies the usual procedure of extracting some data from a list of episodes and adding it to the batch to a mere “old-observation –transform–> return new-observation” step.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

recompute_output_observation_space(input_observation_space: gymnasium.Space, input_action_space: gymnasium.Space) gymnasium.Space[source]#

Re-computes a new (output) observation space based on the input spaces.

This method should be overridden by users to make sure a ConnectorPipelineV2 knows how the input spaces through its individual ConnectorV2 pieces are being transformed.

from gymnasium.spaces import Box, Discrete
import numpy as np

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.utils.numpy import one_hot
from ray.rllib.utils.test_utils import check

class OneHotConnector(ConnectorV2):
    def recompute_output_observation_space(
        self,
        input_observation_space,
        input_action_space,
    ):
        return Box(0.0, 1.0, (input_observation_space.n,), np.float32)

    def __call__(
        self,
        *,
        rl_module,
        batch,
        episodes,
        explore=None,
        shared_data=None,
        metrics=None,
        **kwargs,
    ):
        assert "obs" in batch
        batch["obs"] = one_hot(batch["obs"])
        return batch

connector = OneHotConnector(input_observation_space=Discrete(2))
batch = {"obs": np.array([1, 0, 0], np.int32)}
output = connector(rl_module=None, batch=batch, episodes=None)

check(output, {"obs": np.array([[0.0, 1.0], [1.0, 0.0], [1.0, 0.0]])})

If this ConnectorV2 does not change the observation space in any way, leave this parent method implementation untouched.

Parameters:
  • input_observation_space – The input observation space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

  • input_action_space – The input action space (either coming from the environment if self is the first connector piece in the pipeline or from the previous connector piece in the pipeline).

Returns:

The new observation space (after data has passed through this ConnectorV2 piece).

abstract preprocess(observations, episode: MultiAgentEpisode)[source]#

Override to implement the preprocessing logic.

Parameters:
  • observations – An observation dict containing each stepping agents’ (non-batched) observation to be preprocessed by this connector.

  • episode – The MultiAgentEpisode instance, where the observation dict originated from.

Returns:

The new multi-agent observation dict after observations has been preprocessed.