Working with offline data#

Note

Ray 2.40 uses RLlib’s new API stack by default. The Ray team has mostly completed transitioning algorithms, example scripts, and documentation to the new code base.

If you’re still using the old API stack, see New API stack migration guide for details on how to migrate.

RLlib’s offline RL API enables you to work with experiences read from offline storage (for example, disk, cloud storage, streaming systems, Hadoop Distributed File System (HDFS). For example, you might want to read experiences saved from previous training runs, collected from experts, or gathered from policies deployed in web applications. You can also log new agent experiences produced during online training for future use.

RLlib represents trajectory sequences (for example, (s, a, r, s', ...) tuples) with SingleAgentEpisode objects (multi-agent offline training is currently not supported). Using this episode format allows for efficient encoding and compression of experiences, rewriting trajectories, and user-friendly data access through getter methods. During online training, RLlib uses SingleAgentEnvRunner actors to generate episodes of experiences in parallel using the current policy. However, RLlib uses this same episode format for reading experiences from and writing experiences to offline storage (see OfflineSingleAgentEnvRunner).

You can store experiences either directly in RLlib’s episode format or in table (columns) format. You should use the episode format when

  1. You need experiences grouped by their trajectory and ordered in time (for example, to train stateful modules).

  2. You want to use recorded experiences exclusively within RLlib (for example for offline RL or behavior cloning).

Contrary, you should prefer the table (columns) format, if

  1. You need to read the data easily with other data tools or ML libraries.

Note

RLlib’s new API stack incorporates principles that support standalone applications. Consequently, the SingleAgentEpisode class is usable outside of an RLlib context. To enable faster access through external data tools (for example, for data transformations), it’s recommended to use the table record format.

Most importantly, RLlib’s offline RL API builds on top of Ray Data and therefore features in general all read and write methods supported by Ray Data (for example read_parquet, read_json, etc.) with read_parquet and write_parquet being the default read and write methods. A core design principle of the API is to apply as many data transformations as possible on-the-fly prior to engaging the learner, allowing the latter to focus exclusively on model updates.

Hint

During the transition phase from old- to new API stack you can use the new offline RL API also with your SampleBatch data recorded with the old API stack. To enable this feature set config.offline_data(input_read_sample_batches=True).

Example: Training an expert policy#

In this example you train a PPO agent on the CartPole-v1 environment until it reaches an episode mean return of 450.0. You checkpoint this agent and then use its policy to record expert data to local disk.

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.utils.metrics import (
    ENV_RUNNER_RESULTS,
    EVALUATION_RESULTS,
    EPISODE_RETURN_MEAN,
)
from ray import train, tune

# Configure the PPO algorithm.
config = (
    PPOConfig()
    .environment("CartPole-v1")
    .training(
        lr=0.0003,
        # Run 6 SGD minibatch iterations on a batch.
        num_epochs=6,
        # Weigh the value function loss smaller than
        # the policy loss.
        vf_loss_coeff=0.01,
    )
    .rl_module(
        model_config=DefaultModelConfig(
            fcnet_hiddens=[32],
            fcnet_activation="linear",
            # Share encoder layers between value network
            # and policy.
            vf_share_layers=True,
        ),
    )
)

# Define the metric to use for stopping.
metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"

# Define the Tuner.
tuner = tune.Tuner(
    "PPO",
    param_space=config,
    run_config=train.RunConfig(
        stop={
            metric: 450.0,
        },
        name="docs_rllib_offline_pretrain_ppo",
        verbose=2,
        checkpoint_config=train.CheckpointConfig(
            checkpoint_frequency=1,
            checkpoint_at_end=True,
        ),
    ),
)
results = tuner.fit()

# Store the best checkpoint to use it later for recording
# an expert policy.
best_checkpoint = (
    results
    .get_best_result(
        metric=metric,
        mode="max"
    )
    .checkpoint.path
)

In this example, you saved a checkpoint from an agent that has become an expert at playing CartPole-v1. You use this checkpoint in the next example to record expert data to disk, which is later utilized for offline training to clone another agent.

Example: Record expert data to local disk#

After you train an expert policy to play CartPole-v1 you load its policy here to record expert data during evaluation. You use 5 OfflineSingleAgentEnvRunner instances to collect 50 complete episodes per sample() call. In this example you store experiences directly in RLlib’s SingleAgentEpisode objects with no more than 25 episode objects per Parquet file. Altogether you run 10 evaluation runs, which should result in 500 recorded episodes from the expert policy. You use these data in the next example to train a new policy through Offline RL that should reach a return of 450.0 when playing CartPole-v1.

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core import (
    COMPONENT_LEARNER_GROUP,
    COMPONENT_LEARNER,
    COMPONENT_RL_MODULE,
    DEFAULT_MODULE_ID,
)
from ray.rllib.core.rl_module import RLModuleSpec

# Store recording data under the following path.
data_path = "/tmp/docs_rllib_offline_recording"

# Configure the algorithm for recording.
config = (
    PPOConfig()
    # The environment needs to be specified.
    .environment(
        env="CartPole-v1",
    )
    # Make sure to sample complete episodes because
    # you want to record RLlib's episode objects.
    .env_runners(
        batch_mode="complete_episodes",
    )
    # Set up 5 evaluation `EnvRunners` for recording.
    # Sample 50 episodes in each evaluation rollout.
    .evaluation(
        evaluation_num_env_runners=5,
        evaluation_duration=50,
        evaluation_duration_unit="episodes",
    )
    # Use the checkpointed expert policy from the preceding PPO training.
    # Note, we have to use the same `model_config` as
    # the one with which the expert policy was trained, otherwise
    # the module state can't be loaded.
    .rl_module(
        model_config=DefaultModelConfig(
            fcnet_hiddens=[32],
            fcnet_activation="linear",
            # Share encoder layers between value network
            # and policy.
            vf_share_layers=True,
        ),
    )
    # Define the output path and format. In this example you
    # want to store data directly in RLlib's episode objects.
    # Each Parquet file should hold no more than 25 episodes.
    .offline_data(
        output=data_path,
        output_write_episodes=True,
        output_max_rows_per_file=25,
    )
)

# Build the algorithm.
algo = config.build()
# Load now the PPO-trained `RLModule` to use in recording.
algo.restore_from_path(
    best_checkpoint,
    # Load only the `RLModule` component here.
    component=COMPONENT_RL_MODULE,
)

# Run 10 evaluation iterations and record the data.
for i in range(10):
    print(f"Iteration {i + 1}")
    eval_results = algo.evaluate()
    print(eval_results)

# Stop the algorithm. Note, this is important for when
# defining `output_max_rows_per_file`. Otherwise,
# remaining episodes in the `EnvRunner`s buffer isn't written to disk.
algo.stop()

Note

RLlib formats The stored episode data as binary. Each episode is converted into its dictionary representation and serialized using msgpack-numpy, ensuring version compatibility.

RLlib’s recording process is efficient because it utilizes multiple OfflineSingleAgentEnvRunner instances during evaluation, enabling parallel data writing. You can explore the folder to review the stored Parquet data:

$ ls -la /tmp/docs_rllib_offline_recording/cartpole-v1

drwxr-xr-x. 22 user user 440 21. Nov 17:23 .
drwxr-xr-x.  3 user user  60 21. Nov 17:23 ..
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00004
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00009
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00012
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00016
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000002-00004
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000002-00007

Hint

RLlib stores records under a folder named by the RL environment. Therein, you see one folder of Parquet files for each OfflineSingleAgentEnvRunner and write operation. The write operation count is given in the second numbering. For example: above, env-runner 1 has sampled 25 episodes at its 4th sample() call and writes then (because output_max_rows_per_file=25) all sampled episodes to disk into file run-000001-00004.

Note

The number of write operations per worker may vary because policy rollouts aren’t evenly distributed. Faster workers collect more episodes, leading to differences in write operation counts. As a result, the second numbering may differ across files generated by different env-runner instances.

Example: Training on previously saved experiences#

In this example you are using behavior cloning with the previously recorded Parquet data from your expert policy playing CartPole-v1. The data needs to be linked in the configuration of the algorithm (through the input_ attribute).

from ray import train, tune
from ray.rllib.algorithms.bc import BCConfig

# Setup the config for behavior cloning.
config = (
    BCConfig()
    .environment(
        # Use the `CartPole-v1` environment from which the
        # data was recorded. This is merely for receiving
        # action and observation spaces and to use it during
        # evaluation.
        env="CartPole-v1",
    )
    .learners(
        # Use a single learner.
        num_learners=0,
    )
    .training(
        # This has to be defined in the new offline RL API.
        train_batch_size_per_learner=1024,
    )
    .offline_data(
        # Link the data.
        input_=[data_path],
        # You want to read in RLlib's episode format b/c this
        # is how you recorded data.
        input_read_episodes=True,
        # Read smaller batches from the data than the learner
        # trains on. Note, each batch element is an episode
        # with multiple timesteps.
        input_read_batch_size=512,
        # Create exactly 2 `DataWorkers` that transform
        # the data on-the-fly. Give each of them a single
        # CPU.
        map_batches_kwargs={
            "concurrency": 2,
            "num_cpus": 1,
        },
        # When iterating over the data, prefetch two batches
        # to improve the data pipeline. Don't shuffle the
        # buffer (the data is too small).
        iter_batches_kwargs={
            "prefetch_batches": 2,
            "local_shuffle_buffer_size": None,
        },
        # You must set this for single-learner setups.
        dataset_num_iters_per_learner=1,
    )
    .evaluation(
        # Run evaluation to see how well the learned policy
        # performs. Run every 3rd training iteration an evaluation.
        evaluation_interval=3,
        # Use a single `EnvRunner` for evaluation.
        evaluation_num_env_runners=1,
        # In each evaluation rollout, collect 5 episodes of data.
        evaluation_duration=5,
        # Evaluate the policy parallel to training.
        evaluation_parallel_to_training=True,
    )
)

# Set the stopping metric to be the evaluation episode return mean.
metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"

# Configure Ray Tune.
tuner = tune.Tuner(
    "BC",
    param_space=config,
    run_config=train.RunConfig(
        name="docs_rllib_offline_bc",
        # Stop behavior cloning when we reach 450 in return.
        stop={metric: 450.0},
        checkpoint_config=train.CheckpointConfig(
            # Only checkpoint at the end to be faster.
            checkpoint_frequency=0,
            checkpoint_at_end=True,
        ),
        verbose=2,
    )
)
# Run the experiment.
analysis = tuner.fit()

Behavior cloning in RLlib is highly performant, completing a single training iteration in approximately 2 milliseconds. The experiment’s results should resemble the following:

Episode mean return over the course of BC training.

It should take you around 98 seconds (456 iterations) to achieve the same episode return mean as the PPO agent. While this may not seem impressive compared to the PPO training time, it’s important to note that CartPole-v1 is a very simple environment to learn. In more complex environments, which require more sophisticated agents and significantly longer training times, pre-training through behavior cloning can be highly beneficial. Combining behavior cloning with subsequent fine-tuning using a reinforcement learning algorithm can substantially reduce training time, resource consumption, and associated costs.

Using external expert experiences#

Your expert data is often already available, either recorded from an operational system or directly provided by human experts. Typically, you might store this data in a tabular (columnar) format. RLlib’s new Offline RL API simplifies the use of such data by allowing direct ingestion through a specified schema that organizes the expert data. The API default schema for reading data is provided in SCHEMA.

Lets consider a simple example in which your expert data is stored with the schema: (o_t, a_t, r_t, o_tp1, d_t, i_t, logprobs_t). In this case you provide this schema as follows:

from ray.rllib.algorithms.bc import BCConfig
from ray.rllib.core.columns import Columns

config = (
    BCConfig()
    ...
    .offline_data(
        input_=[<input_path>],
        # Provide the schema of your data (map to column names known to RLlib).
        input_read_schema={
            Columns.OBS: "o_t",
            Columns.ACTIONS: "a_t",
            Columns.REWARDS: "r_t",
            Columns.NEXT_OBS: "o_tp1",
            Columns.INFOS: "i_t",
            "done": "d_t",
        },
    )
)

Note

Internally, the legacy gym’s done signals are mapped to gymnasium’s terminated signals, with truncated values defaulting to False. RLlib’s SingleAgentEpisode structures align with gymnasium, adhering to the updated environment API standards in reinforcement learning.

Converting tabular data to RLlib’s episode format#

While the tabular format is widely compatible and seamlessly integrates with RLlib’s new Offline RL API, there are cases where you may prefer to use RLlib’s native episode format. As briefly mentioned earlier, such scenarios typically arise when full expert trajectories are required.

Note

RLlib processes tabular data in batches, converting each row into a single-step episode. This approach is primarily for procedural simplicity, as data can’t generally be assumed to arrive in time-ordered rows grouped by episodes, though this may occasionally be the case (however knowledge of such a structure resides with the user as RLlib can’t easily infer it automatically). While it’s possible to concatenate consecutive SingleAgentEpisode chunks, this can’t be done with chunks arriving in some scrambled order.

If you require full trajectories you can transform your tabular data into SingleAgentEpisode objects and store these in Parquet format. The next example shows how to do this. First, you store experiences of the preceding trained expert policy in tabular format (note the output_write_episodes=False setting below to activate tabular data output):

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core import (
    COMPONENT_LEARNER_GROUP,
    COMPONENT_LEARNER,
    COMPONENT_RL_MODULE,
    DEFAULT_MODULE_ID,
)
from ray.rllib.core.rl_module import RLModuleSpec

# Set up a path for the tabular data records.
tabular_data_path = "tmp/docs_rllib_offline_recording_tabular"

# Configure the algorithm for recording.
config = (
    PPOConfig()
    # The environment needs to be specified.
    .environment(
        env="CartPole-v1",
    )
    # Make sure to sample complete episodes because
    # you want to record RLlib's episode objects.
    .env_runners(
        batch_mode="complete_episodes",
    )
    # Set up 5 evaluation `EnvRunners` for recording.
    # Sample 50 episodes in each evaluation rollout.
    .evaluation(
        evaluation_num_env_runners=5,
        evaluation_duration=50,
    )
    # Use the checkpointed expert policy from the preceding PPO training.
    # Note, we have to use the same `model_config` as
    # the one with which the expert policy was trained, otherwise
    # the module state can't be loaded.
    .rl_module(
        model_config=DefaultModelConfig(
            fcnet_hiddens=[32],
            fcnet_activation="linear",
            # Share encoder layers between value network
            # and policy.
            vf_share_layers=True,
        ),
    )
    # Define the output path and format. In this example you
    # want to store data directly in RLlib's episode objects.
    .offline_data(
        output=tabular_data_path,
        # You want to store for this example tabular data.
        output_write_episodes=False,
    )
)

# Build the algorithm.
algo = config.build()
# Load the PPO-trained `RLModule` to use in recording.
algo.restore_from_path(
    best_checkpoint,
    # Load only the `RLModule` component here.
    component=COMPONENT_RL_MODULE,
)

# Run 10 evaluation iterations and record the data.
for i in range(10):
    print(f"Iteration {i + 1}")
    res_eval = algo.evaluate()
    print(res_eval)

# Stop the algorithm. Note, this is important for when
# defining `output_max_rows_per_file`. Otherwise,
# remaining episodes in the `EnvRunner`s buffer isn't written to disk.
algo.stop()

You may have noticed that recording data in tabular format takes significantly longer than recording in episode format. This slower performance is due to the additional post-processing required to convert episode data into a columnar format. To confirm that the recorded data is now in columnar format, you can print its schema:

from ray import data

# Read the tabular data into a Ray dataset.
ds = ray.data.read_parquet(tabular_data_path)
# Now, print its schema.
print("Tabular data schema of expert experiences:\n")
print(ds.schema())

# Column              Type
# ------              ----
# eps_id              string
# agent_id            null
# module_id           null
# obs                 numpy.ndarray(shape=(4,), dtype=float)
# actions             int32
# rewards             double
# new_obs             numpy.ndarray(shape=(4,), dtype=float)
# terminateds         bool
# truncateds          bool
# action_dist_inputs  numpy.ndarray(shape=(2,), dtype=float)
# action_logp         float
# weights_seq_no      int64

Note

infos aren’t stored to disk when they’re all empty.

If your expert data is given in columnar format and you need to train on full expert trajectories you can follow the code in the following example to convert your own data into RLlib’s SingleAgentEpisode objects:

import gymnasium as gym
import msgpack
import msgpack_numpy as mnp

from collections import defaultdict

from ray import data
from ray.rllib.env.single_agent_episode import SingleAgentEpisode

# Load the dataset with the tabular data.
ds = data.read_parquet(tabular_data_path)

# Build the environment from which the data was sampled to get the
# spaces.
env = gym.make("CartPole-v1")
# Define buffers for episode data.
eps_obs = []
eps_actions = []
eps_rewards = []
# Note, extra-model-outputs needs to be a dictionary with list
# values.
eps_extra_model_outputs = defaultdict(list)
# Define a buffer for unwritten episodes.
episodes = []

# Start iterating over the rows of your experience data.
for i, row in enumerate(ds.iter_rows(prefetch_batches=10)):
    # If the episode isn't terminated nor truncated, buffer the data.
    if not row["terminateds"] and not row["truncateds"]:
        eps_obs.append(row["obs"])
        eps_actions.append(row["actions"])
        eps_rewards.append(row["rewards"])
        eps_extra_model_outputs["action_dist_inputs"].append(row["action_dist_inputs"])
        eps_extra_model_outputs["action_logp"].append(row["action_logp"])
    # Otherwise, build the episode.
    else:
        eps_obs.append(row["new_obs"])
        episode = SingleAgentEpisode(
            id_=row["eps_id"],
            agent_id=row["agent_id"],
            module_id=row["module_id"],
            observations=eps_obs,
            # Use the spaces from the environment.
            observation_space=env.observation_space,
            action_space=env.action_space,
            actions=eps_actions,
            rewards=eps_rewards,
            # Set the starting timestep to zero.
            t_started=0,
            # You don't want to have a lookback buffer.
            len_lookback_buffer=0,
            terminated=row["terminateds"],
            truncated=row["truncateds"],
            extra_model_outputs=eps_extra_model_outputs,
        )
        # Store the ready-to-write episode to the episode buffer.
        episodes.append(msgpack.packb(episode.get_state(), default=mnp.encode))
        # Clear all episode data buffers.
        eps_obs.clear()
        eps_actions.clear()
        eps_rewards.clear()
        eps_extra_model_outputs = defaultdict(list)

    # Write episodes to disk when the episode buffer holds 50 episodes.
    if len(episodes) > 49:
        # Generate a Ray dataset from episodes.
        episodes_ds = data.from_items(episodes)
        # Write the Parquet data and compress it.
        episodes_ds.write_parquet(
            f"/tmp/test_converting/file-{i}".zfill(6),
            compression="gzip",
        )
        # Delete the dataset in memory and clear the episode buffer.
        del episodes_ds
        episodes.clear()

# If we are finished and have unwritten episodes, write them now.
if len(episodes) > 0:
    episodes_ds = data.from_items(episodes)
    episodes_ds.write_parquet(
        f"/tmp/test_converting/file-{i}".zfill(6),
        compression="gzip",
    )
    del episodes_ds
    episodes.clear()

Using old API stack SampleBatch recordings#

If you have expert data previously recorded using RLlib’s old API stack, it can be seamlessly utilized in the new stack’s Offline RL API by setting input_read_sample_batches=True. Alternatively, you can convert your SampleBatch recordings into SingleAgentEpisode format using RLlib’s OfflinePreLearner as demonstrated below:

import msgpack
import msgpack_numpy as mnp

from ray import data
from ray.rllib.offline.offline_prelearner import OfflinePreLearner

# Set up the data path to your `SampleBatch` expert data.
data_path = ...
# Set up the write path for the Parquet episode data.
output_data_path = "/tmp/sample_batch_data"

# Load the `SampleBatch` recordings.
ds = data.read_json(data_path)

# Iterate over batches (of `SampleBatch`es) and convert them to episodes.
for i, batch in enumerate(ds.iter_batches(batch_size=100, prefetch_batches=2)):
    # Use the RLlib's `OfflinePreLearner` to convert `SampleBatch`es to episodes.
    episodes = OfflinePreLearner._map_sample_batch_to_episode(False, batch)["episodes"]

    # Create a dataset from the episodes. Note, for storing episodes you need to
    # serialize them through `msgpack-numpy`.
    episode_ds = data.from_items([msgpack.packb(eps.get_state(), default=mnp.encode) for eps in episodes])
    # Write the batch of episodes to local disk.
    episode_ds.write_parquet(output_data_path + f"/file-{i}".zfill(6), compression="gzip")

print("Finished converting `SampleBatch` data to episode data.")

Note

RLlib considers your SampleBatch to represent a terminated/truncated episode and builds its SingleAgentEpisode according to this assumption.

Pre-processing, filtering and post-processing#

During recording, your expert policy may utilize pre-processing techniques for observations, such as frame-stacking, or filtering methods like mean-std filtering. Similarly, actions may undergo pre-processing, such as action sampling or scaling. In its EnvRunner instances, RLlib applies such pre-processing and filtering (through the env-to-module connector pipeline) before observations are passed to the RLModule. However, raw observations (as received directly from the environment) are stored in the episodes. Likewise, actions are recorded in their raw form (as output directly from the RLModule) while undergoing pre-processing (through RLlib’s module-to-env connectors) before being sent to the environment.

It’s crucial to carefully consider the pre-processing and filtering applied during the recording of experiences, as they significantly influence how the expert policy learns and subsequently performs in the environment. For example, if the expert policy uses mean-std filtering for observations, it learns a strategy based on the filtered observations, where the filter itself is highly dependent on the experiences collected during training. When deploying this expert policy, it’s essential to use the exact same filter during evaluation to avoid performance degradation. Similarly, a policy trained through behavior cloning may also require a mean-std filter for observations to accurately replicate the behavior of the expert policy.

Scaling I/O throughput#

Just as online training can be scaled, offline recording I/O throughput can also be increased by configuring the number of RLlib env-runners. Use the num_env_runners setting to scale recording during training or evaluation_num_env_runners for scaling during evaluation-only recording. Each worker operates independently, writing experiences in parallel, enabling linear scaling of I/O throughput for write operations. Within each OfflineSingleAgentEnvRunner, episodes are sampled and serialized before being written to disk.

Offline RL training in RLlib is highly parallelized, encompassing data reading, post-processing, and, if applicable, updates. When training on offline data, scalability is achieved by increasing the number of DataWorker instances used to transform offline experiences into a learner-compatible format (MultiAgentBatch). Ray Data optimizes reading operations under the hood by leveraging file metadata, predefined concurrency settings for batch post-processing, and available system resources. It’s strongly recommended not to override these defaults, as doing so may disrupt this optimization process.

Data processing in RLlib involves three key layers, all of which are highly scalable:

  1. Read Operations: This layer handles data ingestion from files in a specified folder. It’s automatically optimized by Ray Data and shouldn’t be manually scaled or adjusted.

  2. Post-processing (PreLearner): In this stage, batches are converted, if necessary, into RLlib’s SingleAgentEpisode format and passed through the learner connector pipeline. The processed data is then transformed into MultiAgentBatch objects for updating. This layer can be scaling the DataWorker instances.

  3. Updating (Learner): This stage involves updating the policy and associated modules. Scalability is achieved by increasing the number of learners (num_learners), enabling parallel processing of batches during updates.

The diagram below illustrates the layers and their scalability:

Key layers of RLlib's fully scalable Offline RL API.

Read operations are executed exclusively on the CPU and are primarily scaled by allocating additional resources (see How to tune performance for details), as they’re fully managed by Ray Data. Post-processing can be scaled by increasing the concurrency level specified in the keyword arguments for the mapping operation:

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": 10,
            "num_cpus": 4,
        }
    )
)

This initiates an actor pool with 10 DataWorker instances, each running an instance of RLlib’s callable OfflinePreLearner class to post-process batches for updating the RLModule.

Note

The num_cpus (and similarly the num_gpus) attribute defines the resources allocated to each DataWorker not the full actor pool.

You scale the number of learners in RLlib’s learners() configuration block:

config = (
    AlgorithmConfig()
    .learners(
        num_learners=4,
        num_gpus_per_learner=1,
    )
)

With this configuration you start an application with 4 (remote) Learner`s (see :ref:`Learner (Alpha) for more details about RLlib’s learners) each of them using a single GPU.

Using cloud storage#

Unlike RLlib’s previous stack, the new Offline RL API is cloud-agnostic and fully integrates with PyArrow. You can utilize any available cloud storage path or PyArrow-compatible filesystem. If using a PyArrow or compatible filesystem, ensure that your input_ path is a relative path within this filesystem. Similar to Ray Data, you can also use placeholders, lists of files or folders, or simply specify a single folder to read recursively from.

For example, to read from a storage bucket in GCS, you can specify the folder location as follows:

config=(
    AlgorithmConfig()
    .offline_data(
        input_="gs://<your-bucket>/dir1",
    )
)

This configuration allows RLlib to read data recursively from any folder beneath the specified path. If you are using a filesystem for GCS (for instance, due to authentication requirements), use the following syntax:

import pyarrow.fs

# Define the PyArrow filesystem
gcs = pyarrow.fs.GcsFilesystem(
    # This is needed to resolve the hostname for public buckets.
    anonymous=True,
    retry_time_limit=timedelta(seconds=15)
)

# Define the configuration.
config= (
    AlgorithmConfig()
    .offline_data(
        # NOTE: Use a relative file path now
        input_="<public-bucket>/dir1",
        input_filesystem=gcs,
    )
)

You can learn more about PyArrow’s filesystems, particularly regarding cloud filesystems and required authentication, in PyArrow Filesystem Interface.

Using cloud storage for recording#

You can use cloud storage in a similar way when recording experiences from an expert policy:

config= (
    AlgorithmConfig()
    .offline_data(
        output="gs://<your-bucket>/dir1",
    )
)

RLlib writes then directly into the folder in the cloud storage and creates it if not already existent in the bucket. The only difference to reading is that you can’t use multiple paths for writing. So something like

config= (
    AlgorithmConfig()
    .offline_data(
        output=["gs://<your-bucket>/dir1", "gs://<your-bucket>/dir2"],
    )
)

would not work. If the storage requires special permissions for creating folders and/or writing files, ensure that the cluster user is granted the necessary permissions. Failure to do so results in denied write access, causing the recording process to stop.

Note

When using cloud storage, Ray Data typically streams data, meaning it’s consumed in chunks. This allows postprocessing and training to begin after a brief warmup phase. More specifically, even if your cloud storage is large, the same amount of space isn’t required on the nodes running RLlib.

How to tune performance#

In RLlib’s Offline RL API the various key layers are managed by distinct modules and configurations, making it non-trivial to scale these layers effectively. It’s important to understand the specific parameters and their respective impact on system performance.

How to tune reading operations#

As noted earlier, the Reading Operations layer is automatically handled and dynamically optimized by Ray Data. It’s strongly recommended to avoid modifying this process. However, there are certain parameters that can enhance performance on this layer to some extent, including:

  1. Available resources (dedicated to the job).

  2. Data locality.

  3. Data sharding.

  4. Data pruning.

Available resources#

The scheduling strategy employed by Ray Data operates independently of any existing placement group, scheduling tasks and actors separately. Consequently, it’s essential to reserve adequate resources for other tasks and actors within your job. To optimize Ray Data’s scalability for read operations and improve reading performance, consider increasing the available resources in your cluster while preserving the resource allocation for existing tasks and actors. The key resources to monitor and provision are CPUs and object store memory. Insufficient object store memory, especially under heavy backpressure, may lead to objects being spilled to disk, which can severely impact application performance.

Bandwidth is a crucial factor influencing the throughput within your cluster. In some cases, scaling the number of nodes can increase bandwidth, thereby enhancing the flow of data from storage to consuming processes. Scenarios where this approach is beneficial include:

  • Independent connections to the network backbone: Nodes utilize dedicated bandwidth, avoiding shared up-links and potential bottlenecks (see for ex. here for AWS and here for GCP network bandwidth documentations).

  • Optimized cloud access: Employing features like S3 Transfer Acceleration, Google Cloud Storage FUSE , or parallel and accelerated data transfer methods to enhance performance.

Data locality#

Data locality is a critical factor in achieving fast data processing. For instance, if your data resides on GCP, running a Ray cluster on AWS S3 or a local machine inevitably results in low transfer rates and slow data processing. To ensure optimal performance, storing data within the same region, same zone and cloud provider as the Ray cluster is generally sufficient to enable efficient streaming for RLlib’s Offline RL API. Additional adjustments to consider include:

  • Multi-Region Buckets: Use multi-region storage to improve data availability and potentially enhance access speeds for distributed systems.

  • Storage class optimization within buckets: Use standard storage for frequent access and low-latency streaming. Avoid archival storage classes like AWS Glacier or GCP Archive for streaming workloads due to high retrieval times.

Data sharding#

Data sharding improves the efficiency of fetching, transferring, and reading data by balancing chunk sizes. If chunks are too large, they can cause delays during transfer and processing, leading to bottlenecks. Conversely, chunks that are too small can result in high metadata fetching overhead, slowing down overall performance. Finding an optimal chunk size is critical for balancing these trade-offs and maximizing throughput.

  • As a rule-of-thumb keep data file sizes in between 64MiB to 256MiB.

Data pruning#

If your data is in Parquet format (the recommended offline data format for RLlib), you can leverage data pruning to optimize performance. Ray Data supports pruning in its read_parquet() method through projection pushdown (column filtering) and filter pushdown (row filtering). These filters are applied directly during file scans, reducing the amount of unnecessary data loaded into memory.

For instance, if you only require specific columns from your offline data (for example, to avoid loading the infos column):

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.columns import Columns

config = (
    AlgorithmConfig()
    .offline_Data(
        input_read_method_kwargs={
            "columns": [
                Columns.EPS_ID,
                Columns.AGENT_ID,
                Columns.OBS,
                Columns.NEXT_OBS,
                Columns.REWARDS,
                Columns.ACTIONS,
                Columns.TERMINATED,
                Columns.TRUNCATED,
            ],
        },
    )
)

Similarly, if you only require specific rows from your dataset, you can apply pushdown filters as shown below:

import pyarrow.dataset

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.columns import Columns

config = (
    AlgorithmConfig()
    .offline_data(
        input_read_method_kwargs={
            "filter": pyarrow.dataset.field(Columns.AGENT_ID) == "agent_1",
        },
    )
)

How to tune post-processing (PreLearner)#

When enabling high throughput in Read Operations, it’s essential to ensure sufficient processing capacity in the Post-Processing (Pre-Learner) stage. Insufficient capacity in this stage can cause backpressure, leading to increased memory usage and, in severe cases, object spilling to disk or even Out-Of-Memory (see Out-Of-Memory Prevention) errors.

Tuning the Post-Processing (Pre-Learner) layer is generally more straightforward than optimizing the Read Operations layer. Tuning the Post-Processing (Pre-Learner) layer is generally more straightforward than optimizing the Read Operations layer. The following parameters can be adjusted to optimize its performance:

  • Actor Pool Size

  • Allocated Resources

  • Read Batch and Buffer Sizes.

Actor pool size#

Internally, the Post-Processing (PreLearner) layer is defined by a map_batches() operation that starts an _ActorPool. Each actor in this pool runs an OfflinePreLearner instances to transform batches on their way from disk to RLlib’s Learner. Obviously, the size of this _ActorPool defines the throughput of this layer and needs to be fine-tuned in regard to the pervious layer’s throughput to avoid backpressure. You can use the concurrency in RLlib’s map_batches_kwargs parameter to define this pool size:

from ray.rllib.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": 4,
        },
    )
)

With the preceding code you would enable Ray Data to start up to 4 parallel OfflinePreLearner actors that can post-process your data for training.

Note

Ray Data dynamically adjusts its read operations based on the parallelism of your Post-Processing (Pre-Learner) layer. It scales read operations up or down depending on the backpressure in the Post-Processing (Pre-Learner) stage. This means the throughput of your entire streaming pipeline is determined by the performance of the downstream tasks and the resources allocated to the Reading Operations layer (see How to tune reading operations). However, due to the overhead associated with scaling reading operations up or down, backpressure - and in severe cases, object spilling or Out-Of-Memory (OOM) errors - can’t always be entirely avoided.

You can also enable auto-scaling in your Post-Processing (PreLearner) by providing an interval instead of a straight number:

from ray.rllib.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": (4, 8),
        },
    )
)

This allows Ray Data to start up to 8 post-processing actors to downstream data faster, for example in case of backpressure.

Note

Implementing an autoscaled actor pool in the Post-Processing (Pre-Learner) layer doesn’t guarantee you the elimination of backpressure. Adding more OffLinePreLearner instances introduces additional overhead to the system. RLlib’s offline RL pipeline is optimized for streaming data, which typically exhibits stable throughput and resource usage, except in cases of imbalances between upstream and downstream tasks. As a rule of thumb, consider using autoscaling only under the following conditions: (1) throughput is expected to be highly variable, (2) Cluster resources are subject to fluctuations (for example, in shared or dynamic environments), and/or (3) workload characteristics are highly unpredictable.

Allocated resources#

Other than the number of post-processing actors you can tune performance on the Post-Processing (PreLearner) layer through defining resources to be allocated to each OffLinePreLearner in the actor pool. Such resources can be defined either through num_cpus and num_gpus or in the ray_remote_args.

Note

Typically, increasing the number of CPUs is sufficient for performance tuning in the post-processing stage of your pipeline. GPUs are only needed in specialized cases, such as in customized pipelines. For example, RLlib’s MARWIL implementation uses the GeneralAdvantageEstimation connector in its ConnectorPipelineV2 to apply General Advantage Estimation on experience batches. In these calculations, the value model of the algorithm’s RLModule is applied, which you can accelerate by running on a GPU.

As an example, to provide each of your 4 OfflinePreLearner in the Post-Processing (PreLearner) 2 CPUs you can use the following syntax:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": 4,
            "num_cpus": 2,
        },
    )
)

Warning

Don’t override the batch_size in RLlib’s map_batches_kwargs. This usually leads to high performance degradations. Note, this batch_size differs from the train_batch_size_per_learner: the former specifies the batch size in transformations of the streaming pipeline, while the latter defines the batch size used for training within each Learner (the batch size of the actual model forward- and backward passes performed for training).

Read batch- and buffer sizes#

When working with data from SingleAgentEpisode or the legacy SampleBatch format, fine-tuning the input_read_batch_size parameter provides additional optimization opportunities. This parameter controls the size of batches retrieved from data files. Its effectiveness is particularly notable when handling episodic or legacy SampleBatch data because the streaming pipeline utilizes for these data an EpisodeReplayBuffer to handle the multiple timesteps contained in each data row. All incoming data is converted into SingleAgentEpisode instances - if not already in this format - and stored in an episode replay buffer, which precisely manages the sampling of train_batch_size_per_learner for training.

The OfflinePreLearner converts and buffers episodes before sampling the batches used in learning.

Achieving an optimal balance between data ingestion efficiency and sampling variation in your streaming pipeline is crucial. Consider the following example: suppose each SingleAgentEpisode has a length of 100 timesteps, and your train_batch_size_per_learner is configured to be 1000. Each EpisodeReplayBuffer instance is set with a capacity of 1000:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        # Train on a batch of 1000 timesteps each iteration.
        train_batch_size_per_learner=1000,
    )
    .offline_data(
        # Read in RLlib's new stack `SingleAgentEpisode` data.
        input_read_episodes=True
        # Define an input read batch size of 10 episodes.
        input_read_batch_size=10,
        # Set the replay buffer in the `OfflinePrelearner`
        # to 1,000 timesteps.
        prelearner_buffer_kwargs={
            "capacity": 1000,
        },
    )
)

If you configure input_read_batch_size to 10 as shown in the code, each of the 10 SingleAgentEpisode fit into the buffer, enabling sampling across a wide variety of timesteps from multiple episodes. This results in high sampling variation. Now, consider the case where the buffer capacity is reduced to 500:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        # Train on a batch of 1000 timesteps each iteration.
        train_batch_size_per_learner=1000,
    )
    .offline_data(
        # Read in RLlib's new stack `SingleAgentEpisode` data.
        input_read_episodes=True
        # Define an input read batch size of 10 episodes.
        input_read_batch_size=10,
        # Set the replay buffer in the `OfflinePrelearner`
        # to 500 timesteps.
        prelearner_buffer_kwargs={
            "capacity": 500,
        },
    )
)

With the same input_read_batch_size, only 5 SingleAgentEpisode can be buffered at a time, causing inefficiencies as more data is read than can be retained for sampling.

In another scenario, if each SingleAgentEpisode still has a length of 100 timesteps and the train_batch_size_per_learner is set to 4000 timesteps as in the code below, the buffer holds 10 SingleAgentEpisode instances. This configuration results in lower sampling variation because many timesteps are repeatedly sampled, reducing diversity across training batches. These examples highlight the importance of tuning these parameters to balance data ingestion and sampling diversity in your offline streaming pipeline effectively.

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        # Train on a batch of 4000 timesteps each iteration.
        train_batch_size_per_learner=4000,
    )
    .offline_data(
        # Read in RLlib's new stack `SingleAgentEpisode` data.
        input_read_episodes=True
        # Define an input read batch size of 10 episodes.
        input_read_batch_size=10,
        # Set the replay buffer in the `OfflinePrelearner`
        # to 1,000 timesteps.
        prelearner_buffer_kwargs={
            "capacity": 500,
        },
    )
)

Tip

To choose an adequate input_read_batch_size take a look at the length of your recorded episodes. In some cases each single episode is long enough to fulfill the train_batch_size_per_learner and you could choose a input_read_batch_size of 1. Most times it’s not and you need to consider how many episodes should be buffered to balance the amount of data digested from read input and the variation of data sampled from the EpisodeReplayBuffer instances in the OfflinePreLearner.

How to tune updating (Learner)#

Updating (Learner) is the final downstream task in RLlib’s Offline RL pipeline, and its consumption speed determines the overall throughput of the data pipeline. If the learning process is slow, it can cause backpressure in upstream layers, potentially leading to object spilling or Out-Of-Memory (OOM) errors. Therefore, it’s essential to fine-tune this layer in coordination with the upstream components. Several parameters can be adjusted to optimize the learning speed in your Offline algorithm:

  • Actor Pool Size

  • Allocated Resources

  • Scheduling Strategy

  • Batch Sizing

  • Batch Prefetching

  • Learner Iterations.

Actor pool size#

RLlib supports scaling Learner instances through the parameter num_learners. When this value is 0, RLlib uses a Learner instance in the local process, whereas for values >0, RLlib scales out using a backend_executor_BackendExecutor. This executor spawns your specified number of Learner instances, manages distributed training and aggregates intermediate results across Learner actors. Learner scaling increases training throughput and you should only apply it, if the upstream components in your Offline Data pipeline can supply data at a rate sufficient to match the increased training capacity. RLlib’s Offline API offers powerful scalability at its final layer by utilizing streaming_split. This functionality divides the data stream into multiple substreams, which are then processed by individual Learner instances, enabling efficient parallel consumption and enhancing overall throughput.

For example to set the number of learners to 4, you use the following syntax:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .learners(num_learners=4)
)

Allocated resources#

Just as with the Post-Processing (Pre-Learner) layer, allocating additional resources can help address slow training issues. The primary resource to leverage is the GPU, as training involves forward and backward passes through the RLModule, which GPUs can accelerate significantly. If your training already utilizes GPUs and performance still remains an issue, consider scaling up by either adding more GPUs to each Learner to increase GPU memory and computational capacity (set config.learners(num_gpus_per_learner=...)), or by adding additional Learner workers to further distribute the workload (by setting config.learners(num_learners=...)). Additionally, ensure that data throughput and upstream components are optimized to keep the learners fully utilized, as insufficient upstream capacity can bottleneck the training process.

To provide your learners with more compute use num_gpus_per_learner or num_cpus_per_learner as follows:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .learners(num_learners=4, num_gpus_per_learner=2)
)

Scheduling strategy#

The scheduling strategy in Ray plays a key role in task and actor placement by attempting to distribute them across multiple nodes in a cluster, thereby maximizing resource utilization and fault tolerance. When running on a single-node cluster (that’s: one large head node), the scheduling strategy has little to no noticeable impact. However, in a multi-node cluster, scheduling can significantly influence the performance of your Offline Data pipeline due to the importance of data locality. Data processing occurs across all nodes, and maintaining data locality during training can enhance performance.

In such scenarios, you can improve data locality by changing RLlib’s default scheduling strategy from "PACK" to "SPREAD". This strategy distributes the Learner actors across the cluster, allowing Ray Data <data> to take advantage of locality-aware bundle selection, which can improve efficiency.

Here is an example of how you can change the scheduling strategy:

"""Just for show-casing, don't run."""
import os
from ray import data
from ray.rllib.algorithms.algorithm_config.AlgorithmConfig

# Configure a "SPREAD" scheduling strategy for learners.
os.environ["TRAIN_ENABLE_WORKER_SPREAD_ENV"] = "1"

# Get the current data context.
data_context = data.DataContext.get_current()
# Set the execution options such that the Ray Data tries to match
# the locality of an output stream with where learners are located.
data_context.execution_options = data.ExecutionOptions(
    locality_with_output=True,
)

# Build the config.
config = (
    AlgorithmConfig()
    .learners(
        # Scale the learners.
        num_learners=4,
        num_gpus_per_learner=2,
    )
    .offline_data(
        ...,
        # Run in each RLlib training iteration 10
        # iterations per learner (each of them with
        # `train_batch_size_per_learner`).
        dataset_num_iters_per_learner=20,
    )
)

# Build the algorithm from the config.
algo = config.build()

# Train for 10 iterations.
for _ in range(10)
    res = algo.train()

Batch size#

Batch size is one of the simplest parameters to adjust for optimizing performance in RLlib’s new Offline RL API. Small batch sizes may under-utilize hardware, leading to inefficiencies, while overly large batch sizes can exceed memory limits. In a streaming pipeline, the selected batch size impacts how data is partitioned and processed across parallel workers. Larger batch sizes reduce the overhead of frequent task coordination, but if they exceed hardware constraints, they can slow down the entire pipeline. You can configure the training batch size using the train_batch_size_per_learner attribute as shown below.

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        train_batch_size_per_learner=1024,
    )
)

In Ray Data <data>, it’s common practice to use batch sizes that are powers of two. However, you are free to select any integer value for the batch size based on your needs.

Batch prefetching#

Batch prefetching allows you to control data consumption on the downstream side of your offline data pipeline. The primary goal is to ensure that learners remain active, maintaining a continuous flow of data. This is achieved by preparing the next batch while the learner processes the current one. Prefetching determines how many batches are kept ready for learners and should be tuned based on the time required to produce the next batch and the learner’s update speed. Prefetching too many batches can lead to memory inefficiencies and, in some cases, backpressure in upstream tasks.

You can configure batch prefetching in the iter_batches_kwargs:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        iter_batches_kwargs={
            "prefetch_batches": 2,
        }
    )
)

Warning

Don’t override the batch_size in RLlib’s map_batches_kwargs. This usually leads to high performance degradations. Note, this batch_size differs from the train_batch_size_per_learner: the former specifies the batch size in iterating over data output of the streaming pipeline, while the latter defines the batch size used for training within each Learner.

Learner iterations#

This tuning parameter is available only when using multiple instances of :Learner. In distributed learning, each Learner instance processes a sub-stream of the offline streaming pipeline, iterating over batches from that sub-stream. You can control the number of iterations each Learner instance runs per RLlib training iteration. Result reporting occurs after each RLlib training iteration. Setting this parameter too low results in inefficiencies, while excessively high values can hinder training monitoring and, in some cases - such as in RLlib’s MARWIL implementation - lead to stale training data. This happens because some data transformations rely on the same RLModule that the Learner instances are training on. The number of iterations per sub-stream is controlled by the attribute dataset_num_iters_per_learner, which has a default value of None, meaning it runs one epoch on the sub-stream.

You can modify this value as follows:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        # Train on 20 batches from the substream in each learner.
        dataset_num_iters_per_learner=20,
    )
)

Customization#

Customization of the Offline RL components in RLlib, such as the Algorithm, Learner, or RLModule, follows a similar process to that of their Online RL counterparts. For detailed guidance, refer to the documentation on Algorithms, Learners, and RLlib’s RLModule. The new stack Offline RL streaming pipeline in RLlib supports customization at various levels and locations within the dataflow, allowing for tailored solutions to meet the specific requirements of your offline RL algorithm.

  • Connector Level

  • PreLearner Level

  • Pipeline Level.

Connector level#

Small data transformations on instances of SingleAgentEpisode can be easily implemented by modifying the ConnectorPipelineV2, which is part of the OfflinePreLearner and prepares episodes for training. You can leverage any connector from RLlib’s library (see RLlib’s default connectors) or create a custom connector (see RLlib’s ConnectorV2 examples) to integrate into the Learner’s ConnectorPipelineV2. Careful consideration must be given to the order in which ConnectorV2 instances are applied, as demonstrated in the implementation of RLlib’s MARWIL algorithm (see the MARWIL paper).

The MARWIL algorithm computes a loss that extends beyond behavior cloning by improving the expert’s strategy during training using advantages. These advantages are calculated through General Advantage Estimation (GAE) using a value model. GAE is computed on-the-fly through the GeneralAdvantageEstimation connector. This connector has specific requirements: it processes a list of SingleAgentEpisode instances and must be one of the final components in the ConnectorPipelineV2. This is because it relies on fully prepared batches containing OBS, REWARDS, NEXT_OBS, TERMINATED, and TRUNCATED fields. Additionally, the incoming SingleAgentEpisode instances must already include one artificially elongated timestep.

To meet these requirements, the pipeline must include the following sequence of ConnectorV2 instances:

  1. ray.rllib.connectors.learner.add_one_ts_to_episodes_and_truncate.AddOneTsToEpisodesAndTruncate ensures the SingleAgentEpisode objects are elongated by one timestep.

  2. ray.rllib.connectors.common.add_observations_from_episodes_to_batch.AddObservationsFromEpisodesToBatch incorporates the observations (OBS) into the batch.

  3. ray.rllib.connectors.learner.add_next_observations_from_episodes_to_train_batch.AddNextObservationsFromEpisodesToTrainBatch adds the next observations (NEXT_OBS).

  4. Finally, the ray.rllib.connectors.learner.general_advantage_estimation.GeneralAdvantageEstimation connector piece is applied.

Below is the example code snippet from RLlib’s MARWIL algorithm demonstrating this setup:

@override(AlgorithmConfig)
def build_learner_connector(
    self,
    input_observation_space,
    input_action_space,
    device=None,
):
    pipeline = super().build_learner_connector(
        input_observation_space=input_observation_space,
        input_action_space=input_action_space,
        device=device,
    )

    # Before anything, add one ts to each episode (and record this in the loss
    # mask, so that the computations at this extra ts aren't used to compute
    # the loss).
    pipeline.prepend(AddOneTsToEpisodesAndTruncate())

    # Prepend the "add-NEXT_OBS-from-episodes-to-train-batch" connector piece (right
    # after the corresponding "add-OBS-..." default piece).
    pipeline.insert_after(
        AddObservationsFromEpisodesToBatch,
        AddNextObservationsFromEpisodesToTrainBatch(),
    )

    # At the end of the pipeline (when the batch is already completed), add the
    # GAE connector, which performs a vf forward pass, then computes the GAE
    # computations, and puts the results of this (advantages, value targets)
    # directly back in the batch. This is then the batch used for
    # `forward_train` and `compute_losses`.
    pipeline.append(
        GeneralAdvantageEstimation(gamma=self.gamma, lambda_=self.lambda_)
    )

    return pipeline

Define a primer LearnerConnector pipeline#

There are multiple ways to customize the LearnerConnectorPipeline. One approach, as demonstrated above, is to override the build_learner_connector method in the Algorithm. Alternatively, you can directly define a custom ConnectorV2 piece to the LearnerConnectorPipeline by utilizing the learner_connector attribute:

def _make_learner_connector(input_observation_space, input_action_space):
    # Create the learner connector.
    return CustomLearnerConnector(
        parameter_1=0.3,
        parameter_2=100,
    )

config = (
    AlgorithmConfig()
    .training(
        # Add the connector pipeline as the starting point for
        # the learner connector pipeline.
        learner_connector=_make_learner_connector,
    )
)

As noted in the comments, this approach to adding a ConnectorV2 piece to the LearnerConnectorPipeline is suitable only if you intend to manipulate raw episodes, as your ConnectorV2 piece serves as the foundation for building the remainder of the pipeline (including batching and other processing steps). If your goal is to modify data further along in the LearnerConnectorPipeline, you should either override the Algorithm’s build_learner_connector method or consider the third option: overriding the entire PreLearner.

PreLearner level#

If you need to perform data transformations at a deeper level - before your data reaches the SingleAgentEpisode stage - consider overriding the OfflinePreLearner. This class orchestrates the complete data transformation pipeline, converting raw input data into MultiAgentBatch objects ready for training. For instance, if your data is stored in specialized formats requiring pre-parsing and restructuring (for example, XML, HTML, Protobuf, images, or videos), you may need to handle these custom formats directly. You can leverage tools such as Ray Data's custom datasources <custom_datasource> (for example, read_binary_files()) to manage the ingestion process. To ensure this data is appropriately structured and sorted into SingleAgentEpisode objects, you can override the _map_to_episodes() static method.

For more extensive customization, you can rewrite the __call__ method to define custom transformation steps, implement a unique LearnerConnectorPipeline, and construct MultiAgentBatch instances for the Learner.

The following example demonstrates how to use a custom OfflinePreLearner to process text data and construct training batches:

import gymnasium as gym
import numpy as np
import uuid
from typing import Any, Dict, List, Optional, Union

from ray import data
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType

class TextOfflinePreLearner(OfflinePreLearner):

    @staticmethod
    @override(OfflinePreLearner)
    def _map_to_episodes(
        is_multi_agent: bool,
        batch: Dict[str, Union[list, np.ndarray]],
        schema: Dict[str, str] = SCHEMA,
        to_numpy: bool = False,
        input_compress_columns: Optional[List[str]] = None,
        observation_space: gym.Space = None,
        action_space: gym.Space = None,
        vocabulary: Dict[str, Any] = None,
        **kwargs: Dict[str, Any],
    ) -> Dict[str, List[EpisodeType]]:

        # If we have no vocabulary raise an error.
        if not vocabulary:
            raise ValueError(
                "No `vocabulary`. It needs a vocabulary in form of dictionary ",
                "mapping tokens to their IDs."
            )
        # Define container for episodes.
        episodes = []

        # Data comes in batches of string arrays under the `"text"` key.
        for text in batch["text"]:
            # Split the text and tokenize.
            tokens = text.split(" ")
            # Encode tokens.
            encoded = [vocabulary[token] for token in tokens]
            one_hot_vectors = np.zeros((len(tokens), len(vocabulary), 1, 1))
            for i, token in enumerate(tokens):
                if token in vocabulary:
                    one_hot_vectors[i][vocabulary[token] - 1] = 1.0

            # Build the `SingleAgentEpisode`.
            episode = SingleAgentEpisode(
                # Generate a unique ID.
                id_=uuid.uuid4().hex,
                # agent_id="default_policy",
                # module_id="default_policy",
                # We use the starting token with all added tokens as observations.
                observations=[ohv for ohv in one_hot_vectors],
                observation_space=observation_space,
                # Actions are defined to be the "chosen" follow-up token after
                # given the observation.
                actions=encoded[1:],
                action_space=action_space,
                # Rewards are zero until the end of a sequence.
                rewards=[0.0 for i in range(len(encoded) - 2)] + [1.0],
                # The episode is always terminated (as sentences in the dataset are).
                terminated=True,
                truncated=False,
                # No lookback. You want the episode to start at timestep zero.
                len_lookback_buffer=0,
                t_started=0,
            )

            # If episodes should be numpy'ized. Some connectors need this.
            if to_numpy:
                episode.to_numpy()

            # Append the episode to the list of episodes.
            episodes.append(episode)

        # Return a batch with key `"episodes"`.
        return {"episodes": episodes}

# Define the dataset.
ds = data.read_text("s3://anonymous@ray-example-data/this.txt")

# Create a vocabulary.
tokens = []
for b in ds.iter_rows():
    tokens.extend(b["text"].split(" "))
vocabulary = {token: idx for idx, token in enumerate(set(tokens), start=1)}

# Take a small batch of 10 from the dataset.
batch = ds.take_batch(10)

# Now use your `OfflinePreLearner`.
episodes = TextOfflinePreLearner._map_to_episodes(
    is_multi_agent=False,
    batch=batch,
    to_numpy=True,
    schema=None,
    input_compress_columns=False,
    action_space=None,
    observation_space=None,
    vocabulary=vocabulary,
)

# Show the constructed episodes.
print(f"Episodes: {episodes}")

The preceding example illustrates the flexibility of RLlib’s Offline RL API for custom data transformation. In this case, a customized OfflinePreLearner processes a batch of text data - organized as sentences - and converts each sentence into a SingleAgentEpisode. The static method returns a dictionary containing a list of these SingleAgentEpisode instances. Similarly, you can extend this functionality by overriding the __call__() method. For instance, you could implement a ray.rllib.connectors.learner.learner_connector_pipeline.LearnerConnectorPipeline that stacks multiple observations (for example, tokens) together. This can be achieved using RLlib’s FrameStackingLearner and is shown in the example below.

import gymnasium as gym
import numpy as np
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union

from ray import data
from ray.actor import ActorHandle
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.bc.bc_catalog import BCCatalog
from ray.rllib.algorithms.bc.torch.default_bc_torch_rl_module import DefaultBCTorchRLModule
from ray.rllib.connectors.common import AddObservationsFromEpisodesToBatch, BatchIndividualItems, NumpyToTensor, AgentToModuleMapping
from ray.rllib.connectors.learner.add_columns_from_episodes_to_train_batch import AddColumnsFromEpisodesToTrainBatch
from ray.rllib.connectors.learner.frame_stacking import FrameStackingLearner
from ray.rllib.connectors.learner.learner_connector_pipeline import LearnerConnectorPipeline
from ray.rllib.core.learner.learner import Learner
from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA

from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType, ModuleID

class TextOfflinePreLearner(OfflinePreLearner):

    @override(OfflinePreLearner)
    def __init__(
        self,
        config: "AlgorithmConfig",
        learner: Union[Learner, List[ActorHandle]] = None,
        locality_hints: Optional[List[str]] = None,
        spaces: Optional[Tuple[gym.Space, gym.Space]] = None,
        module_spec: Optional[MultiRLModuleSpec] = None,
        module_state: Optional[Dict[ModuleID, Any]] = None,
        vocabulary: Dict[str, Any] = None,
        **kwargs: Dict[str, Any],
    ):
        self.config = config
        self.spaces = spaces
        self.vocabulary = vocabulary
        self.vocabulary_size = len(self.vocabulary)

        # Build the `RLModule`.
        self._module = module_spec.build()
        if module_state:
            self._module.set_state(module_state)

        # Build the learner connector pipeline.
        self._learner_connector = LearnerConnectorPipeline(
            connectors=[
                FrameStackingLearner(
                    num_frames=4,
                )
            ],
            input_action_space=module_spec.action_space,
            input_observation_space=module_spec.observation_space,
        )
        self._learner_connector.append(
            AddObservationsFromEpisodesToBatch(as_learner_connector=True),
        )
        self._learner_connector.append(
            AddColumnsFromEpisodesToTrainBatch(),
        )
        self._learner_connector.append(
            BatchIndividualItems(multi_agent=False),
        )
        # Let us run exclusively on CPU, then we can convert here to Tensor.
        self._learner_connector.append(
            NumpyToTensor(as_learner_connector=True),
        )

    @override(OfflinePreLearner)
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, List[EpisodeType]]:

        # Convert raw data to episodes.
        episodes = TextOfflinePreLearner._map_to_episodes(
            is_multi_agent=False,
            batch=batch,
            to_numpy=True,
            schema=None,
            input_compress_columns=False,
            action_space=self.spaces[0],
            observation_space=self.spaces[1],
            vocabulary=self.vocabulary,
        )["episodes"]

        # Run the learner connector pipeline with the
        # `FrameStackLearner` piece.
        batch = self._learner_connector(
            rl_module=self._module,
            batch={},
            episodes=episodes,
            shared_data={},
        )

        # Convert to `MultiAgentBatch` for the learner.
        batch = MultiAgentBatch(
            {
                module_id: SampleBatch(module_data)
                for module_id, module_data in batch.items()
            },
            # TODO (simon): This can be run once for the batch and the
            # metrics, but we run it twice: here and later in the learner.
            env_steps=sum(e.env_steps() for e in episodes),
        )

        # Return the `MultiAgentBatch` under the `"batch"` key.
        return {"batch": batch}

    @staticmethod
    @override(OfflinePreLearner)
    def _map_to_episodes(
        is_multi_agent: bool,
        batch: Dict[str, Union[list, np.ndarray]],
        schema: Dict[str, str] = SCHEMA,
        to_numpy: bool = False,
        input_compress_columns: Optional[List[str]] = None,
        observation_space: gym.Space = None,
        action_space: gym.Space = None,
        vocabulary: Dict[str, Any] = None,
        **kwargs: Dict[str, Any],
    ) -> Dict[str, List[EpisodeType]]:

        # If we have no vocabulary raise an error.
        if not vocabulary:
            raise ValueError(
                "No `vocabulary`. It needs a vocabulary in form of dictionary ",
                "mapping tokens to their IDs."
            )
        # Define container for episodes.
        episodes = []

        # Data comes in batches of string arrays under the `"text"` key.
        for text in batch["text"]:
            # Split the text and tokenize.
            tokens = text.split(" ")
            # Encode tokens.
            encoded = [vocabulary[token] for token in tokens]
            one_hot_vectors = np.zeros((len(tokens), len(vocabulary), 1, 1))
            for i, token in enumerate(tokens):
                if token in vocabulary:
                    one_hot_vectors[i][vocabulary[token] - 1] = 1.0

            # Build the `SingleAgentEpisode`.
            episode = SingleAgentEpisode(
                # Generate a unique ID.
                id_=uuid.uuid4().hex,
                # agent_id="default_policy",
                # module_id="default_policy",
                # We use the starting token with all added tokens as observations.
                observations=[ohv for ohv in one_hot_vectors],
                observation_space=observation_space,
                # Actions are defined to be the "chosen" follow-up token after
                # given the observation.
                actions=encoded[1:],
                action_space=action_space,
                # Rewards are zero until the end of a sequence.
                rewards=[0.0 for i in range(len(encoded) - 2)] + [1.0],
                # The episode is always terminated (as sentences in the dataset are).
                terminated=True,
                truncated=False,
                # No lookback. You want the episode to start at timestep zero.
                len_lookback_buffer=0,
                t_started=0,
            )

            # If episodes should be numpy'ized. Some connectors need this.
            if to_numpy:
                episode.to_numpy()

            # Append the episode to the list of episodes.
            episodes.append(episode)

        # Return a batch with key `"episodes"`.
        return {"episodes": episodes}

# Define dataset on sample data.
ds = data.read_text("s3://anonymous@ray-example-data/this.txt")

# Create a vocabulary.
tokens = []
for b in ds.iter_rows():
    tokens.extend(b["text"].split(" "))
vocabulary = {token: idx for idx, token in enumerate(set(tokens), start=1)}

# Specify an `RLModule` and wrap it with a `MultiRLModuleSpec`. Note,
# on `Learner`` side any `RLModule` is an `MultiRLModule`.
module_spec = MultiRLModuleSpec(
    rl_module_specs={
        "default_policy": RLModuleSpec(
            model_config=DefaultModelConfig(
                conv_filters=[[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]],
                conv_activation="relu",
            ),
            inference_only=False,
            module_class=DefaultBCTorchRLModule,
            catalog_class=BCCatalog,
            action_space = gym.spaces.Discrete(len(vocabulary)),
            observation_space=gym.spaces.Box(0.0, 1.0, (len(vocabulary), 1, 1), np.float32),
        ),
    },
)

# Take a small batch.
batch = ds.take_batch(10)

# Build and instance your `OfflinePreLearner`.
oplr = TextOfflinePreLearner(
    config=AlgorithmConfig(),
    spaces=(
        gym.spaces.Discrete(len(vocabulary)),
        gym.spaces.Box(0.0, 1.0, (len(vocabulary), 1, 1), np.float32)),
    module_spec=module_spec,
    vocabulary=vocabulary,
)

# Run your `OfflinePreLearner`.
transformed = oplr(batch)

# Show the generated batch.
print(f"Batch: {batch}")

The ability to fully customize the OfflinePreLearner empowers you to design tailored data transformation workflows. This includes defining a specific learner connector pipeline and implementing raw data mapping, enabling multi-step processing of text data from its raw format to a MultiAgentBatch.

To integrate your custom OfflinePreLearner, simply specify it within your AlgorithmConfig:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        # Provide your custom `OfflinePreLearner`.
        prelearner_class=TextOfflinePreLearner,
        # Provide special keyword arguments your `OfflinePreLearner` needs.
        prelearner_kwargs={
            "vocabulary": vocabulary,
        },
    )
)

If these customization capabilities still don’t meet your requirements, consider moving to the Pipeline Level for even greater flexibility.

Pipeline level#

On this level of RLlib’s Offline RL API you can redefine your complete pipeline from data reading to batch iteration by overriding the OfflineData class. In most cases however the other two levels should be sufficient for your requirements. Manipulating the complete pipeline needs sensible handling because it could degrade performance of your pipeline to a high degree. Study carefully the OfflineData class to reach a good understanding of how the default pipeline works before going over to program your own one. There are mainly two methods that define this pipeline:

  • The __init__() method that defines the data reading process.

  • The sample() method that defines the data mapping and batch iteration.

For example consider overriding the __init__() method, if you have some foundational data transformations as for example transforming image files into numpy arrays.

import io
import logging
import numpy as np

from PIL import Image
from typing import Any, Dict

from ray import data
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.offline.offline_data import OfflineData
from ray.rllib.offline.offline_prelearner import OfflinePreLearner
from ray.rllib.utils.annotations import override

logger = logging.getLogger(__name__)


class ImageOfflineData(OfflineData):
    """This class overrides `OfflineData` to read in raw image data.

    The image data is from Ray Data`s S3 example bucket, namely
    `ray-example-data/batoidea/JPEGImages/`.
    To read in this data the raw bytes have to be decoded and then
    converted to `numpy` arrays. Each image array has a dimension
    (32, 32, 3).

    To just read in the raw image data and convert it to arrays it
    suffices to override the `OfflineData.__init__` method only.
    Note, that further transformations of the data - specifically
    into `SingleAgentEpisode` data - will be performed in a custom
    `OfflinePreLearner` defined in the `image_offline_prelearner`
    file. You could hard-code the usage of this prelearner here,
    but you will use the `prelearner_class` attribute in the
    `AlgorithmConfig` instead.
    """

    @override(OfflineData)
    def __init__(self, config: AlgorithmConfig):

        # Set class attributes.
        self.config = config
        self.is_multi_agent = self.config.is_multi_agent
        self.materialize_mapped_data = False
        self.path = self.config.input_

        self.data_read_batch_size = self.config.input_read_batch_size
        self.data_is_mapped = False

        # Define your function to map images to numpy arrays.
        def map_to_numpy(row: Dict[str, Any]) -> Dict[str, Any]:
            # Convert to byte stream.
            bytes_stream = io.BytesIO(row["bytes"])
            # Convert to image.
            image = Image.open(bytes_stream)
            # Return an array of the image.
            return {"array": np.array(image)}

        try:
            # Load the dataset and transform to arrays on-the-fly.
            self.data = data.read_binary_files(self.path).map(map_to_numpy)
        except Exception as e:
            logger.error(e)

        # Define further attributes needed in the `sample` method.
        self.batch_iterator = None
        self.map_batches_kwargs = self.config.map_batches_kwargs
        self.iter_batches_kwargs = self.config.iter_batches_kwargs
        # Use a custom OfflinePreLearner if needed.
        self.prelearner_class = self.config.prelearner_class or OfflinePreLearner

        # For remote learner setups.
        self.locality_hints = None
        self.learner_handles = None
        self.module_spec = None

In the code example provided, you define a custom OfflineData class to handle the reading and preprocessing of image data, converting it from a binary encoding format into numpy arrays. Additionally, you implement a custom OfflinePreLearner to process this data further, transforming it into a learner-ready MultiAgentBatch format.

import gymnasium as gym
import numpy as np
import random
import uuid

from typing import Any, Dict, List, Optional, Tuple, Union

from ray.actor import ActorHandle
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.learner.learner import Learner
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType, ModuleID


class ImageOfflinePreLearner(OfflinePreLearner):
    """This class transforms image data to `MultiAgentBatch`es.

    While the `ImageOfflineData` class transforms raw image
    bytes to `numpy` arrays, this class maps these data in
    `SingleAgentEpisode` instances through the learner connector
    pipeline and finally outputs a >`MultiAgentBatch` ready for
    training in RLlib's `Learner`s.

    Note, the basic transformation from images to `SingleAgentEpisode`
    instances creates synthetic data that does not rely on any MDP
    and therefore no agent can learn from it. However, this example
    should show how to transform data into this form through
    overriding the `OfflinePreLearner`.
    """

    def __init__(
        self,
        config: "AlgorithmConfig",
        learner: Union[Learner, List[ActorHandle]],
        spaces: Optional[Tuple[gym.Space, gym.Space]] = None,
        module_spec: Optional[MultiRLModuleSpec] = None,
        module_state: Optional[Dict[ModuleID, Any]] = None,
        **kwargs: Dict[str, Any],
    ):
        # Set up necessary class attributes.
        self.config = config
        self.action_space = spaces[1]
        self.observation_space = spaces[0]
        self.input_read_episodes = self.config.input_read_episodes
        self.input_read_sample_batches = self.config.input_read_sample_batches
        self._policies_to_train = "default_policy"
        self._is_multi_agent = False

        # Build the `MultiRLModule` needed for the learner connector.
        self._module = module_spec.build()

        # Build the learner connector pipeline.
        self._learner_connector = self.config.build_learner_connector(
            input_observation_space=self.observation_space,
            input_action_space=self.action_space,
        )

    @override(OfflinePreLearner)
    @staticmethod
    def _map_to_episodes(
        is_multi_agent: bool,
        batch: Dict[str, Union[list, np.ndarray]],
        schema: Dict[str, str] = SCHEMA,
        to_numpy: bool = False,
        input_compress_columns: Optional[List[str]] = None,
        observation_space: gym.Space = None,
        action_space: gym.Space = None,
        **kwargs: Dict[str, Any],
    ) -> Dict[str, List[EpisodeType]]:

        # Define a container for the episodes.
        episodes = []

        # Batches come in as numpy arrays.
        for i, obs in enumerate(batch["array"]):

            # Construct your episode.
            episode = SingleAgentEpisode(
                id_=uuid.uuid4().hex,
                observations=[obs, obs],
                observation_space=observation_space,
                actions=[action_space.sample()],
                action_space=action_space,
                rewards=[random.random()],
                terminated=True,
                truncated=False,
                len_lookback_buffer=0,
                t_started=0,
            )

            # Numpy'ize, if necessary.
            if to_numpy:
                episode.to_numpy()

            # Store the episode in the container.
            episodes.append(episode)

        return {"episodes": episodes}

This demonstrates how the entire Offline Data Pipeline can be customized with your own logic. You can run the example by using the following code:

"""Example showing how to customize an offline data pipeline.

This example:
    - demonstrates how you can customized your offline data pipeline.
    - shows how you can override the `OfflineData` to read raw image
    data and transform it into `numpy ` arrays.
    - explains how you can override the `OfflinePreLearner` to
    transform data further into `SingleAgentEpisode` instances that
    can be processes by the learner connector pipeline.

How to run this script
----------------------
`python [script file name].py --checkpoint-at-end`

For debugging, use the following additional command line options
`--no-tune --num-env-runners=0`
which should allow you to set breakpoints anywhere in the RLlib code and
have the execution stop there for inspection and debugging.

For logging to your WandB account, use:
`--wandb-key=[your WandB API key] --wandb-project=[some project name]
--wandb-run-name=[optional: WandB run name (within the defined project)]`

Results to expect
-----------------
2024-12-03 19:59:23,043 INFO streaming_executor.py:109 -- Execution plan
of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadBinary] ->
TaskPoolMapOperator[Map(map_to_numpy)] -> LimitOperator[limit=128]
✔️  Dataset execution finished in 10.01 seconds: 100%|███████████████████
███████████████████████████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- ReadBinary->SplitBlocks(11): Tasks: 0; Queued blocks: 0; Resources: 0.0
CPU, 0.0B object store: 100%|█████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- Map(map_to_numpy): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU,
0.0B object store: 100%|███████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- limit=128: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 3.0KB object
store: 100%|██████████████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
Batch: {'batch': [MultiAgentBatch({}, env_steps=3)]}
"""

import gymnasium as gym
import numpy as np

from ray.rllib.algorithms.bc import BCConfig
from ray.rllib.algorithms.bc.bc_catalog import BCCatalog
from ray.rllib.algorithms.bc.torch.bc_torch_rl_module import BCTorchRLModule
from ray.rllib.core.rl_module.rl_module import RLModuleSpec, DefaultModelConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.examples.offline_rl.classes.image_offline_data import ImageOfflineData
from ray.rllib.examples.offline_rl.classes.image_offline_prelearner import (
    ImageOfflinePreLearner,
)

# Create an Algorithm configuration.
# TODO: Make this an actually running/learning example with RLunplugged
# data from S3 and add this to the CI.
config = (
    BCConfig()
    .environment(
        action_space=gym.spaces.Discrete(2),
        observation_space=gym.spaces.Box(0, 255, (32, 32, 3), np.float32),
    )
    .offline_data(
        input_=["s3://anonymous@ray-example-data/batoidea/JPEGImages/"],
        prelearner_class=ImageOfflinePreLearner,
    )
)

# Specify an `RLModule` and wrap it with a `MultiRLModuleSpec`. Note,
# on `Learner`` side any `RLModule` is an `MultiRLModule`.
module_spec = MultiRLModuleSpec(
    rl_module_specs={
        "default_policy": RLModuleSpec(
            model_config=DefaultModelConfig(
                conv_filters=[[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]],
                conv_activation="relu",
            ),
            inference_only=False,
            module_class=BCTorchRLModule,
            catalog_class=BCCatalog,
            action_space=gym.spaces.Discrete(2),
            observation_space=gym.spaces.Box(0, 255, (32, 32, 3), np.float32),
        ),
    },
)

# Construct your `OfflineData` class instance.
offline_data = ImageOfflineData(config)

# Check, how the data is transformed. Note, the
# example dataset has only 3 such images.
batch = offline_data.data.take_batch(3)

# Construct your `OfflinePreLearner`.
offline_prelearner = ImageOfflinePreLearner(
    config=config,
    learner=None,
    spaces=(
        config.observation_space,
        config.action_space,
    ),
    module_spec=module_spec,
)

# Transform the raw data to `MultiAgentBatch` data.
batch = offline_prelearner(batch)

# Show the transformed batch.
print(f"Batch: {batch}")

Tip

Consider this approach carefully: in many cases, fully transforming your data into a suitable format before engaging RLlib’s offline RL API can be more efficient. For instance, in the example above, you could preprocess the entire image dataset into numpy arrays beforehand and utilize RLlib’s default OfflineData class for subsequent steps.

Monitoring#

To effectively monitor your offline data pipeline, leverage Ray Data’s built-in monitoring capacities. Focus on ensuring that all stages of your offline data streaming pipeline are actively processing data. Additionally, keep an eye on the Learner instance, particularly the learner_update_timer, which should maintain low values - around 0.02 for small models - to indicate efficient data processing and model updates.

Note

RLlib doesn’t include Ray Data metrics in its results or display them in Tensorboard through Ray Tune’s TBXLoggerCallback. It’s strongly recommended to enable the Ray Dashboard, accessible at 127.0.0.1:8265, for comprehensive monitoring and insights.

Input API#

You can configure experience input for an agent using the following options:

def offline_data(
    self,
    *,
    # Specify how to generate experiences:
    # - A local directory or file glob expression (for example "/tmp/*.json").
    # - A cloud storage path or file glob expression (for example "gs://rl/").
    # - A list of individual file paths/URIs (for example ["/tmp/1.json",
    #   "s3://bucket/2.json"]).
    # - A file or directory path in a given `input_filesystem`.
    input_: Optional[Union[str, Callable[[IOContext], InputReader]]],
    # Read method for the `ray.data.Dataset` to read in the
    # offline data from `input_`. The default is `read_parquet` for Parquet
    # files. See https://docs.ray.io/en/latest/data/api/input_output.html for
    # more info about available read methods in `ray.data`.
    input_read_method: Optional[Union[str, Callable]],
    # Keyword args for `input_read_method`. These
    # are passed into the read method without checking. Use these
    # keyword args together with `map_batches_kwargs` and
    # `iter_batches_kwargs` to tune the performance of the data pipeline. It
    # is strongly recommended to rely on Ray Data's automatic read performance
    # tuning
    input_read_method_kwargs: Optional[Dict],
    # Table schema for converting offline data to episodes.
    # This schema maps the offline data columns to
    # `ray.rllib.core.columns.Columns`:
    # `{Columns.OBS: 'o_t', Columns.ACTIONS: 'a_t', ...}`. Columns in
    # the data set that aren't mapped through this schema are sorted into
    # episodes' `extra_model_outputs`. If no schema is passed in the default
    # schema used is `ray.rllib.offline.offline_data.SCHEMA`. If your data set
    # contains already the names in this schema, no `input_read_schema` is
    # needed. The same applies, if the offline data is in RLlib's
    # `EpisodeType` or old `SampleBatch` format
    input_read_schema: Optional[Dict[str, str]],
    # Whether offline data is already stored in RLlib's
    # `EpisodeType` format, i.e. `ray.rllib.env.SingleAgentEpisode` (multi
    # -agent is planned but not supported, yet). Reading episodes directly
    # avoids additional transform steps and is usually faster and
    # therefore the recommended format when your application remains fully
    # inside of RLlib's schema. The other format is a columnar format and is
    # agnostic to the RL framework used. Use the latter format, if you are
    # unsure when to use the data or in which RL framework. The default is
    # to read column data, i.e. `False`. `input_read_episodes` and
    # `input_read_sample_batches` can't be `True` at the same time. See
    # also `output_write_episodes` to define the output data format when
    # recording.
    input_read_episodes: Optional[bool],
    # Whether offline data is stored in RLlib's old
    # stack `SampleBatch` type. This is usually the case for older data
    # recorded with RLlib in JSON line format. Reading in `SampleBatch`
    # data needs extra transforms and might not concatenate episode chunks
    # contained in different `SampleBatch`es in the data. If possible avoid
    # to read `SampleBatch`es and convert them in a controlled form into
    # RLlib's `EpisodeType` (i.e. `SingleAgentEpisode`). The default is
    # `False`. `input_read_episodes` and `input_read_sample_batches` can't
    # be True at the same time.
    input_read_sample_batches: Optional[bool],
    # Batch size to pull from the data set. This could
    # differ from the `train_batch_size_per_learner`, if a dataset holds
    # `EpisodeType` (i.e. `SingleAgentEpisode`) or `SampleBatch`, or any
    # other data type that contains multiple timesteps in a single row of the
    # dataset. In such cases a single batch of size
    # `train_batch_size_per_learner` potentially pulls a multiple of
    # `train_batch_size_per_learner` timesteps from the offline dataset. The
    # default is `None` in which the `train_batch_size_per_learner` is pulled.
    input_read_batch_size: Optional[int],
    # A cloud filesystem to handle access to cloud storage when
    # reading experiences. Can be "gcs" for Google Cloud Storage, "s3" for AWS
    # S3 buckets, "abs" for Azure Blob Storage, or any filesystem supported
    # by PyArrow. In general the file path is sufficient for accessing data
    # from public or local storage systems. See
    # https://arrow.apache.org/docs/python/filesystems.html for details.
    input_filesystem: Optional[str],
    # A dictionary holding the kwargs for the filesystem
    # given by `input_filesystem`. See `gcsfs.GCSFilesystem` for GCS,
    # `pyarrow.fs.S3FileSystem`, for S3, and `ablfs.AzureBlobFilesystem` for
    # ABS filesystem arguments.
    input_filesystem_kwargs: Optional[Dict],
    # What input columns are compressed with LZ4 in the
    # input data. If data is stored in RLlib's `SingleAgentEpisode` (
    # `MultiAgentEpisode` not supported, yet). Note the providing
    # `rllib.core.columns.Columns.OBS` also tries to decompress
    # `rllib.core.columns.Columns.NEXT_OBS`.
    input_compress_columns: Optional[List[str]],
    # Whether the raw data should be materialized in memory.
    # This boosts performance, but requires enough memory to avoid an OOM, so
    # make sure that your cluster has the resources available. For very large
    # data you might want to switch to streaming mode by setting this to
    # `False` (default). If your algorithm doesn't need the RLModule in the
    # Learner connector pipeline or all (learner) connectors are stateless
    # you should consider setting `materialize_mapped_data` to `True`
    # instead (and set `materialize_data` to `False`). If your data doesn't
    # fit into memory and your Learner connector pipeline requires an RLModule
    # or is stateful, set both `materialize_data` and
    # `materialize_mapped_data` to `False`.
    materialize_data: Optional[bool],
    # Whether the data should be materialized after
    # running it through the Learner connector pipeline (i.e. after running
    # the `OfflinePreLearner`). This improves performance, but should only be
    # used in case the (learner) connector pipeline doesn't require an
    # RLModule and the (learner) connector pipeline is stateless. For example,
    # MARWIL's Learner connector pipeline requires the RLModule for value
    # function predictions and training batches would become stale after some
    # iterations causing learning degradation or divergence. Also ensure that
    # your cluster has enough memory available to avoid an OOM. If set to
    # `True`, make sure that `materialize_data` is set to `False` to
    # avoid materialization of two datasets. If your data doesn't fit into
    # memory and your Learner connector pipeline requires an RLModule or is
    # stateful, set both `materialize_data` and `materialize_mapped_data` to
    # `False`.
    materialize_mapped_data: Optional[bool],
    # Keyword args for the `map_batches` method. These are
    # passed into the `ray.data.Dataset.map_batches` method when sampling
    # without checking. If no arguments passed in the default arguments
    # `{'concurrency': max(2, num_learners), 'zero_copy_batch': True}` is
    # used. Use these keyword args together with `input_read_method_kwargs`
    # and `iter_batches_kwargs` to tune the performance of the data pipeline.
    map_batches_kwargs: Optional[Dict],
    # Keyword args for the `iter_batches` method. These are
    # passed into the `ray.data.Dataset.iter_batches` method when sampling
    # without checking. If no arguments are passed in, the default argument
    # `{'prefetch_batches': 2}` is used. Use these keyword args
    # together with `input_read_method_kwargs` and `map_batches_kwargs` to
    # tune the performance of the data pipeline.
    iter_batches_kwargs: Optional[Dict],
    # An optional `OfflinePreLearner` class that's used to
    # transform data batches in `ray.data.map_batches` used in the
    # `OfflineData` class to transform data from columns to batches that can
    # be used in the `Learner.update...()` methods. Override the
    # `OfflinePreLearner` class and pass your derived class in here, if you
    # need to make some further transformations specific for your data or
    # loss. The default is `None`` which uses the base `OfflinePreLearner`
    # defined in `ray.rllib.offline.offline_prelearner`.
    prelearner_class: Optional[Type],
    # An optional `EpisodeReplayBuffer` class is
    # used to buffer experiences when data is in `EpisodeType` or
    # RLlib's previous `SampleBatch` type format. In this case, a single
    # data row may contain multiple timesteps and the buffer serves two
    # purposes: (a) to store intermediate data in memory, and (b) to ensure
    # that exactly `train_batch_size_per_learner` experiences are sampled
    # per batch. The default is RLlib's `EpisodeReplayBuffer`.
    prelearner_buffer_class: Optional[Type],
    # Optional keyword arguments for intializing the
    # `EpisodeReplayBuffer`. In most cases this is simply the `capacity`
    # for the default buffer used (`EpisodeReplayBuffer`), but it may
    # differ if the `prelearner_buffer_class` uses a custom buffer.
    prelearner_buffer_kwargs: Optional[Dict],
    # Number of updates to run in each learner
    # during a single training iteration. If None, each learner runs a
    # complete epoch over its data block (the dataset is partitioned into
    # at least as many blocks as there are learners). The default is `None`.
    # This must be set to `1`, if a single (local) learner is used.
    dataset_num_iters_per_learner: Optional[int],
)

Output API#

You can configure experience output for an agent using the following options:

def offline_data(
    # Specify where experiences should be saved:
    # - None: don't save any experiences
    # - a path/URI to save to a custom output directory (for example, "s3://bckt/")
    output: Optional[str],
    # What sample batch columns to LZ4 compress in the output data.
    # Note that providing `rllib.core.columns.Columns.OBS` also
    # compresses `rllib.core.columns.Columns.NEXT_OBS`.
    output_compress_columns: Optional[List[str]],
    # Max output file size (in bytes) before rolling over to a new
    # file.
    output_max_file_size: Optional[float],
    # Max output row numbers before rolling over to a new file.
    output_max_rows_per_file: Optional[int],
    # Write method for the `ray.data.Dataset` to write the
    # offline data to `output`. The default is `read_parquet` for Parquet
    # files. See https://docs.ray.io/en/latest/data/api/input_output.html for
    # more info about available read methods in `ray.data`.
    output_write_method: Optional[str],
    # Keyword arguments for the `output_write_method`. These are
    # passed into the write method without checking.
    output_write_method_kwargs: Optional[Dict],
    # A cloud filesystem to handle access to cloud storage when
    # writing experiences. Can be "gcs" for Google Cloud Storage, "s3" for AWS
    # S3 buckets, "abs" for Azure Blob Storage, or any filesystem supported
    # by PyArrow. In general the file path is sufficient for accessing data
    # from public or local storage systems. See
    # https://arrow.apache.org/docs/python/filesystems.html for details.
    output_filesystem: Optional[str],
    # A dictionary holding the keyword arguments for the filesystem
    # given by `output_filesystem`. See `gcsfs.GCSFilesystem` for GCS,
    # `pyarrow.fs.S3FileSystem`, for S3, and `ablfs.AzureBlobFilesystem` for
    # ABS filesystem arguments.
    output_filesystem_kwargs: Optional[Dict],
    # If data should be recorded in RLlib's `EpisodeType`
    # format (i.e. `SingleAgentEpisode` objects). Use this format, if you
    # need data to be ordered in time and directly grouped by episodes for
    # example to train stateful modules or if you plan to use recordings
    # exclusively in RLlib. Otherwise data is recorded in tabular (columnar)
    # format. Default is `True`.
    output_write_episodes: Optional[bool],