Using Predictors for Inference
Contents
Using Predictors for Inference#
Tip
Refer to the blog on Model Batch Inference in Ray for an overview of batch inference strategies in Ray and additional examples.

After you train a model, you will often want to use the model to do inference and prediction. To do so, you can use a Ray AIR Predictor. In this guide, we’ll cover how to use the Predictor on different types of data.
What are predictors?#
Ray AIR Predictors are a class that loads models from Checkpoint
to perform inference.
Predictors are used by BatchPredictor
and PredictorDeployment
to do large-scale scoring or online inference.
Let’s walk through a basic usage of the Predictor. In the below example, we create Checkpoint
object from a model definition.
Checkpoints can be generated from a variety of different ways – see the Checkpoints user guide for more details.
The checkpoint then is used to create a framework specific Predictor (in our example, a TensorflowPredictor
), which then can be used for inference:
import numpy as np
import tensorflow as tf
import ray
from ray.train.batch_predictor import BatchPredictor
from ray.train.tensorflow import (
TensorflowCheckpoint,
TensorflowPredictor,
)
def build_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[
tf.keras.layers.InputLayer(input_shape=()),
# Add feature dimension, expanding (batch_size,) to (batch_size, 1).
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(1),
]
)
return model
model = build_model()
checkpoint = TensorflowCheckpoint.from_model(model)
predictor = TensorflowPredictor.from_checkpoint(
checkpoint, model_definition=build_model
)
data = np.array([1, 2, 3, 4])
predictions = predictor.predict(data)
print(predictions)
# [[-1.6930283]
# [-3.3860567]
# [-5.079085 ]
# [-6.7721133]]
Predictors expose a predict
method that accepts an input batch of type DataBatchType
(which is a typing union of different standard Python ecosystem data types, such as Pandas Dataframe or Numpy Array) and outputs predictions of the same type as the input batch.
Life of a prediction: Underneath the hood, when the Predictor.predict
method is called the following occurs:
The input batch is converted into a Pandas DataFrame. Tensor input (like a
np.ndarray
) will be converted into a single-column Pandas Dataframe.If there is a Preprocessor saved in the provided Checkpoint, the preprocessor will be used to transform the DataFrame.
The transformed DataFrame will be passed to the model for inference.
The predictions will be outputted by
predict
in the same type as the original input.
Batch Prediction#
Ray AIR provides a BatchPredictor
utility for large-scale batch inference.
The BatchPredictor takes in a checkpoint and a predictor class and executes
large-scale batch prediction on a given dataset in a parallel/distributed fashion when calling predict()
.
Note
predict()
will load the entire given dataset into memory, which may be a problem if your dataset
size is larger than your available cluster memory. See the Lazy/Pipelined Prediction (experimental) section for a workaround.
import pandas as pd
from ray.train.batch_predictor import BatchPredictor
batch_predictor = BatchPredictor(
checkpoint, TensorflowPredictor, model_definition=build_model
)
# Create a dummy dataset.
ds = ray.data.from_pandas(pd.DataFrame({"feature_1": [1, 2, 3], "label": [1, 2, 3]}))
# Use `feature_columns` to specify the input columns to your model.
predictions = batch_predictor.predict(ds, feature_columns=["feature_1"])
print(predictions.show())
# {'predictions': array([-1.2789773], dtype=float32)}
# {'predictions': array([-2.5579545], dtype=float32)}
# {'predictions': array([-3.8369317], dtype=float32)}
Additionally, you can compute metrics from the predictions. Do this by:
specifying a function for computing metrics
using
keep_columns
to keep the label column in the returned datasetusing
map_batches
to compute metrics on a batch-by-batch basisAggregate batch metrics via
mean()
def calculate_accuracy(df):
return pd.DataFrame({"correct": int(df["predictions"][0]) == df["label"]})
predictions = batch_predictor.predict(
ds, feature_columns=["feature_1"], keep_columns=["label"]
)
print(predictions.show())
# {'predictions': array([-1.2789773], dtype=float32), 'label': 0}
# {'predictions': array([-2.5579545], dtype=float32), 'label': 1}
# {'predictions': array([-3.8369317], dtype=float32), 'label': 0}
correct = predictions.map_batches(calculate_accuracy)
print("Final accuracy: ", correct.mean(on="correct"))
# Final accuracy: 0.5
Batch Inference Examples#
Below, we provide examples of using common frameworks to do batch inference for different data types:
Tabular#
import ray
from ray.data.preprocessors import StandardScaler
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostTrainer, XGBoostPredictor
from ray.air.config import ScalingConfig
# Split data into train and validation.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
test_dataset = valid_dataset.drop_columns(["target"])
columns_to_scale = ["mean radius", "mean texture"]
preprocessor = StandardScaler(columns=columns_to_scale)
trainer = XGBoostTrainer(
label_column="target",
num_boost_round=20,
scaling_config=ScalingConfig(num_workers=2),
params={
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset},
preprocessor=preprocessor,
)
result = trainer.fit()
# You can also create a checkpoint from a trained model using
# `XGBoostCheckpoint.from_model`.
# import xgboost as xgb
# from ray.train.xgboost import XGBoostCheckpoint
# model = xgb.Booster()
# model.load_model(...)
# checkpoint = XGBoostCheckpoint.from_model(model, path=".")
checkpoint = result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
from typing import List
import numpy as np
import torch.nn as nn
import ray
from ray.data.preprocessors import Concatenator
from ray.train.torch import TorchCheckpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor
def create_model(input_features: int):
return nn.Sequential(
nn.Linear(in_features=input_features, out_features=16),
nn.ReLU(),
nn.Linear(16, 16),
nn.ReLU(),
nn.Linear(16, 1),
nn.Sigmoid(),
)
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
all_features: List[str] = dataset.schema().names
all_features.remove("target")
num_features = len(all_features)
prep = Concatenator(dtype=np.float32)
checkpoint = TorchCheckpoint.from_model(
model=create_model(num_features), preprocessor=prep
)
# You can also fetch a checkpoint from a Trainer
# checkpoint = best_result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, TorchPredictor)
predicted_probabilities = batch_predictor.predict(dataset, feature_columns=all_features)
predicted_probabilities.show()
# {'predictions': array([1.], dtype=float32)}
# {'predictions': array([0.], dtype=float32)}
from typing import List
import numpy as np
import ray
from ray.data.preprocessors import Concatenator
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowPredictor
from ray.train.batch_predictor import BatchPredictor
def create_model(input_features):
from tensorflow import keras # this is needed for tf<2.9
from tensorflow.keras import layers
return keras.Sequential(
[
keras.Input(shape=(input_features,)),
layers.Dense(16, activation="relu"),
layers.Dense(16, activation="relu"),
layers.Dense(1, activation="sigmoid"),
]
)
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
all_features: List[str] = dataset.schema().names
all_features.remove("target")
num_features = len(all_features)
prep = Concatenator(dtype=np.float32)
checkpoint = TensorflowCheckpoint.from_model(
model=create_model(num_features), preprocessor=prep
)
# You can also fetch a checkpoint from a Trainer
# checkpoint = trainer.fit().checkpoint
batch_predictor = BatchPredictor.from_checkpoint(
checkpoint, TensorflowPredictor, model_definition=lambda: create_model(num_features)
)
predicted_probabilities = batch_predictor.predict(dataset, feature_columns=all_features)
predicted_probabilities.show()
# {'predictions': array([1.], dtype=float32)}
# {'predictions': array([0.], dtype=float32)}
Image#
import numpy as np
import torch
from torchvision import transforms
from torchvision.models import resnet18
import ray
from ray.train.torch import TorchCheckpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor
from ray.data.preprocessors import BatchMapper
def preprocess(image_batch: np.ndarray) -> np.ndarray:
"""
User Pytorch code to transform user image with outer dimension of batch size.
"""
preprocess = transforms.Compose(
[
# Torchvision's ToTensor does not accept outer batch dimension
transforms.CenterCrop(224),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]
)
# Outer dimension is batch size such as (10, 256, 256, 3) -> (10, 3, 256, 256)
transposed_torch_tensor = torch.Tensor(image_batch.transpose(0, 3, 1, 2))
return preprocess(transposed_torch_tensor).numpy()
data_url = "s3://anonymous@air-example-data-2/1G-image-data-synthetic-raw"
print(f"Running GPU batch prediction with 1GB data from {data_url}")
dataset = ray.data.read_images(data_url, size=(256, 256)).limit(10)
model = resnet18(pretrained=True)
preprocessor = BatchMapper(preprocess, batch_format="numpy")
ckpt = TorchCheckpoint.from_model(model=model, preprocessor=preprocessor)
predictor = BatchPredictor.from_checkpoint(ckpt, TorchPredictor)
predictor.predict(dataset, batch_size=80)
Coming soon!
Text#
Coming soon!
Lazy/Pipelined Prediction (experimental)#
If you have a large dataset but not a lot of available memory, you can use the
predict_pipelined
method.
Unlike predict()
which will load the entire data into memory, predict_pipelined
will create a
DatasetPipeline
object, which will lazily load the data and perform inference on a smaller batch of data at a time.
The lazy loading of the data will allow you to operate on datasets much greater than your available memory. Execution can be triggered by pulling from the pipeline, as shown in the example below.
import pandas as pd
import ray
from ray.air import Checkpoint
from ray.train.predictor import Predictor
from ray.train.batch_predictor import BatchPredictor
# Create a BatchPredictor that always returns `42` for each input.
batch_pred = BatchPredictor.from_pandas_udf(
lambda data: pd.DataFrame({"a": [42] * len(data)})
)
# Create a dummy dataset.
ds = ray.data.range_tensor(200, parallelism=4)
# Setup a prediction pipeline.
pipeline = batch_pred.predict_pipelined(ds, blocks_per_window=1)
for batch in pipeline.iter_batches():
print("Pipeline result", batch)
# 0 42
# 1 42
# ...
Online Inference#
Check out the Deploying Predictors with Serve for details on how to perform online inference with AIR.
Developer Guide: Implementing your own Predictor#
To implement a new Predictor for your particular framework, you should subclass the base Predictor
and implement the following two methods:
_predict_pandas
: Given a pandas.DataFrame input, return a pandas.DataFrame containing predictions.from_checkpoint
: Logic for creating a Predictor from an AIR Checkpoint.Optionally
_predict_numpy
for better performance when working with tensor data to avoid extra copies from Pandas conversions.