Offline RL

InputReaders

The InputReader API is used by an individual RolloutWorker to produce batches of experiences either from an simulator/environment or from an offline source (e.g. a file).

Here, we introduce the generic API and its child classes used for reading offline data (for offline RL). For details on RLlib’s Sampler implementations for collecting data from simulators/environments, see the Sampler docs here.

class ray.rllib.offline.input_reader.InputReader[source]

API for collecting and returning experiences during policy evaluation.

abstract next() Union[SampleBatch, MultiAgentBatch][source]

Returns the next batch of read experiences.

Returns

The experience read (SampleBatch or MultiAgentBatch).

tf_input_ops(queue_size: int = 1) Dict[str, Union[numpy.array, tf.Tensor, torch.Tensor]][source]

Returns TensorFlow queue ops for reading inputs from this reader.

The main use of these ops is for integration into custom model losses. For example, you can use tf_input_ops() to read from files of external experiences to add an imitation learning loss to your model.

This method creates a queue runner thread that will call next() on this reader repeatedly to feed the TensorFlow queue.

Parameters

queue_size – Max elements to allow in the TF queue.

Example

>>> from ray.rllib.models.modelv2 import ModelV2
>>> from ray.rllib.offline.json_reader import JsonReader
>>> imitation_loss = ... # doctest +SKIP
>>> class MyModel(ModelV2): # doctest +SKIP
...     def custom_loss(self, policy_loss, loss_inputs):
...         reader = JsonReader(...)
...         input_ops = reader.tf_input_ops()
...         logits, _ = self._build_layers_v2(
...             {"obs": input_ops["obs"]},
...             self.num_outputs, self.options)
...         il_loss = imitation_loss(logits, input_ops["action"])
...         return policy_loss + il_loss

You can find a runnable version of this in examples/custom_loss.py.

Returns

Dict of Tensors, one for each column of the read SampleBatch.

JsonReader (ray.rllib.offline.json_reader.JsonReader)

For reading data from offline files (for example when no simulator/environment is available), you can use the built-in JsonReader class.

You will have to change the input config value from “sampler” (default) to a JSON file name (str), a list of JSON files, or a path name (str) that contains JSON files. Alternatively, you can specify a callable that takes a IOContext object as only arg and returns a new InputReader instance, for example:

config = {
    "input": lambda io_ctx: MyReader([arg1], [arg2], [io_ctx]),
}

For details on the IOContext class, see below.

class ray.rllib.offline.json_reader.JsonReader(inputs: Union[str, List[str]], ioctx: Optional[ray.rllib.offline.io_context.IOContext] = None)[source]

Reader object that loads experiences from JSON file chunks.

The input files will be read from in random order.

__init__(inputs: Union[str, List[str]], ioctx: Optional[ray.rllib.offline.io_context.IOContext] = None)[source]

Initializes a JsonReader instance.

Parameters
  • inputs – Either a glob expression for files, e.g. /tmp/**/*.json, or a list of single file paths or URIs, e.g., [“s3://bucket/file.json”, “s3://bucket/file2.json”].

  • ioctx – Current IO context object or None.

next() Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch][source]

Returns the next batch of read experiences.

Returns

The experience read (SampleBatch or MultiAgentBatch).

read_all_files() Union[ray.rllib.policy.sample_batch.SampleBatch, ray.rllib.policy.sample_batch.MultiAgentBatch][source]

Reads through all files and yields one SampleBatchType per line.

When reaching the end of the last file, will start from the beginning again.

Yields

One SampleBatch or MultiAgentBatch per line in all input files.

MixedInput (ray.rllib.offline.mixed_input.MixedInput)

In order to mix different input readers with each other in different custom ratios, you can use the MixedInput reader. This reader is chosen automatically by RLlib when you provide a dict under the input config key that maps input reader specifiers to probabilities, e.g.:

"input": {
   "sampler": 0.4,  # 40% of samples will come from environment
   "/tmp/experiences/*.json": 0.4,  # the rest from different JSON files
   "s3://bucket/expert.json": 0.2,
}
class ray.rllib.offline.mixed_input.MixedInput(dist: Dict[ray.rllib.offline.json_reader.JsonReader, float], ioctx: ray.rllib.offline.io_context.IOContext)[source]

Mixes input from a number of other input sources.

Examples

>>> from ray.rllib.offline.io_context import IOContext
>>> from ray.rllib.offline.mixed_input import MixedInput
>>> ioctx = IOContext(...) 
>>> MixedInput({ 
...    "sampler": 0.4, 
...    "/tmp/experiences/*.json": 0.4, 
...    "s3://bucket/expert.json": 0.2, 
... }, ioctx) 
__init__(dist: Dict[ray.rllib.offline.json_reader.JsonReader, float], ioctx: ray.rllib.offline.io_context.IOContext)[source]

Initialize a MixedInput.

Parameters
  • dist – dict mapping JSONReader paths or “sampler” to probabilities. The probabilities must sum to 1.0.

  • ioctx – current IO context object.

next() Union[SampleBatch, MultiAgentBatch][source]

Returns the next batch of read experiences.

Returns

The experience read (SampleBatch or MultiAgentBatch).

D4RLReader (ray.rllib.offline.d4rl_reader.D4RLReader)

class ray.rllib.offline.d4rl_reader.D4RLReader(inputs: str, ioctx: Optional[ray.rllib.offline.io_context.IOContext] = None)[source]

Reader object that loads the dataset from the D4RL dataset.

__init__(inputs: str, ioctx: Optional[ray.rllib.offline.io_context.IOContext] = None)[source]

Initializes a D4RLReader instance.

Parameters
  • inputs – String corresponding to the D4RL environment name.

  • ioctx – Current IO context object.

next() Union[SampleBatch, MultiAgentBatch][source]

Returns the next batch of read experiences.

Returns

The experience read (SampleBatch or MultiAgentBatch).

IOContext

IOContext instances are used in every InputReader and OutputWriter class. They serve as simple containers for the properties: log_dir, config, and worker_index.

class ray.rllib.offline.io_context.IOContext(log_dir: Optional[str] = None, config: Optional[dict] = None, worker_index: int = 0, worker: Optional[RolloutWorker] = None)[source]

Class containing attributes to pass to input/output class constructors.

RLlib auto-sets these attributes when constructing input/output classes, such as InputReaders and OutputWriters.

__init__(log_dir: Optional[str] = None, config: Optional[dict] = None, worker_index: int = 0, worker: Optional[RolloutWorker] = None)[source]

Initializes a IOContext object.

Parameters
  • log_dir – The logging directory to read from/write to.

  • config – The Algorithm’s main config dict.

  • worker_index – When there are multiple workers created, this uniquely identifies the current worker. 0 for the local worker, >0 for any of the remote workers.

  • worker – The RolloutWorker object reference.

default_sampler_input() Optional[SamplerInput][source]

Returns the RolloutWorker’s SamplerInput object, if any.

Returns None if the RolloutWorker has no SamplerInput. Note that local workers in case there are also one or more remote workers by default do not create a SamplerInput object.

Returns

The RolloutWorkers’ SamplerInput object or None if none exists.