ray.rllib.evaluation.sampler.SyncSampler.tf_input_ops#

SyncSampler.tf_input_ops(queue_size: int = 1) Dict[str, numpy.array | jnp.ndarray | tf.Tensor | torch.Tensor]#

Returns TensorFlow queue ops for reading inputs from this reader.

The main use of these ops is for integration into custom model losses. For example, you can use tf_input_ops() to read from files of external experiences to add an imitation learning loss to your model.

This method creates a queue runner thread that will call next() on this reader repeatedly to feed the TensorFlow queue.

Parameters:

queue_size – Max elements to allow in the TF queue.

from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.offline.json_reader import JsonReader
imitation_loss = ...
class MyModel(ModelV2):
    def custom_loss(self, policy_loss, loss_inputs):
        reader = JsonReader(...)
        input_ops = reader.tf_input_ops()
        logits, _ = self._build_layers_v2(
            {"obs": input_ops["obs"]},
            self.num_outputs, self.options)
        il_loss = imitation_loss(logits, input_ops["action"])
        return policy_loss + il_loss

You can find a runnable version of this in examples/custom_loss.py.

Returns:

Dict of Tensors, one for each column of the read SampleBatch.