Using Predictors for Inference

../_images/predictors.png

After you train a model, you will often want to use the model to do inference and prediction. To do so, you can use a Ray AIR Predictor. In this guide, we’ll cover how to use the Predictor on different types of data.

What are predictors?

Ray AIR Predictors are a class that loads models from Checkpoint to perform inference.

Predictors are used by BatchPredictor and PredictorDeployment to do large-scale scoring or online inference.

Let’s walk through a basic usage of the Predictor. In the below example, we create Checkpoint object from a model definition. Checkpoints can be generated from a variety of different ways – see the Checkpoints user guide for more details.

The checkpoint then is used to create a framework specific Predictor (in our example, a TensorflowPredictor), which then can be used for inference:

import numpy as np
import tensorflow as tf

import ray
from ray.train.batch_predictor import BatchPredictor
from ray.train.tensorflow import (
    TensorflowCheckpoint,
    TensorflowPredictor,
)


def build_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=()),
            # Add feature dimension, expanding (batch_size,) to (batch_size, 1).
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(1),
        ]
    )
    return model


model = build_model()
checkpoint = TensorflowCheckpoint.from_model(model)
predictor = TensorflowPredictor.from_checkpoint(
    checkpoint, model_definition=build_model
)

data = np.array([1, 2, 3, 4])
predictions = predictor.predict(data)
print(predictions)
# [[-1.6930283]
#  [-3.3860567]
#  [-5.079085 ]
#  [-6.7721133]]

Predictors expose a predict method that accepts an input batch of type DataBatchType (which is a typing union of different standard Python ecosystem data types, such as Pandas Dataframe or Numpy Array) and outputs predictions of the same type as the input batch.

Life of a prediction: Underneath the hood, when the Predictor.predict method is called the following occurs:

  • The input batch is converted into a Pandas DataFrame. Tensor input (like a np.ndarray) will be converted into a single-column Pandas Dataframe.

  • If there is a Preprocessor saved in the provided Checkpoint, the preprocessor will be used to transform the DataFrame.

  • The transformed DataFrame will be passed to the model for inference.

  • The predictions will be outputted by predict in the same type as the original input.

Batch Prediction

Ray AIR provides a BatchPredictor utility for large-scale batch inference.

The BatchPredictor takes in a checkpoint and a predictor class and executes large-scale batch prediction on a given dataset in a parallel/distributed fashion when calling predict().

Note

predict() will load the entire given dataset into memory, which may be a problem if your dataset size is larger than your available cluster memory. See the Lazy/Pipelined Prediction (experimental) section for a workaround.

import pandas as pd
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor(
    checkpoint, TensorflowPredictor, model_definition=build_model
)
# Create a dummy dataset.
ds = ray.data.from_pandas(pd.DataFrame({"feature_1": [1, 2, 3], "label": [1, 2, 3]}))

# Use `feature_columns` to specify the input columns to your model.
predictions = batch_predictor.predict(ds, feature_columns=["feature_1"])
print(predictions.show())
# {'predictions': array([-1.2789773], dtype=float32)}
# {'predictions': array([-2.5579545], dtype=float32)}
# {'predictions': array([-3.8369317], dtype=float32)}

Additionally, you can compute metrics from the predictions. Do this by:

  1. specifying a function for computing metrics

  2. using keep_columns to keep the label column in the returned dataset

  3. using map_batches to compute metrics on a batch-by-batch basis

  4. Aggregate batch metrics via mean()

def calculate_accuracy(df):
    return pd.DataFrame({"correct": int(df["predictions"][0]) == df["label"]})


predictions = batch_predictor.predict(
    ds, feature_columns=["feature_1"], keep_columns=["label"]
)
print(predictions.show())
# {'predictions': array([-1.2789773], dtype=float32), 'label': 0}
# {'predictions': array([-2.5579545], dtype=float32), 'label': 1}
# {'predictions': array([-3.8369317], dtype=float32), 'label': 0}

correct = predictions.map_batches(calculate_accuracy)
print("Final accuracy: ", correct.mean(on="correct"))
# Final accuracy:  0.5

Batch Inference Examples

Below, we provide examples of using common frameworks to do batch inference for different data types:

Tabular

import ray
from ray.data.preprocessors import StandardScaler
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostTrainer, XGBoostPredictor
from ray.air.config import ScalingConfig

# Split data into train and validation.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
test_dataset = valid_dataset.drop_columns(["target"])

columns_to_scale = ["mean radius", "mean texture"]
preprocessor = StandardScaler(columns=columns_to_scale)

trainer = XGBoostTrainer(
    label_column="target",
    num_boost_round=20,
    scaling_config=ScalingConfig(num_workers=2),
    params={
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    datasets={"train": train_dataset},
    preprocessor=preprocessor,
)
result = trainer.fit()

# You can also create a checkpoint from a trained model using
# `XGBoostCheckpoint.from_model`.

# import xgboost as xgb
# from ray.train.xgboost import XGBoostCheckpoint
# model = xgb.Booster()
# model.load_model(...)
# checkpoint = XGBoostCheckpoint.from_model(model, path=".")
checkpoint = result.checkpoint

batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)

predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
from typing import List
import numpy as np
import torch.nn as nn

import ray
from ray.data.preprocessors import Concatenator
from ray.train.torch import TorchCheckpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor


def create_model(input_features: int):
    return nn.Sequential(
        nn.Linear(in_features=input_features, out_features=16),
        nn.ReLU(),
        nn.Linear(16, 16),
        nn.ReLU(),
        nn.Linear(16, 1),
        nn.Sigmoid(),
    )


dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
all_features: List[str] = dataset.schema().names
all_features.remove("target")

num_features = len(all_features)

prep = Concatenator(dtype=np.float32)

checkpoint = TorchCheckpoint.from_model(
    model=create_model(num_features), preprocessor=prep
)
# You can also fetch a checkpoint from a Trainer
# checkpoint = best_result.checkpoint

batch_predictor = BatchPredictor.from_checkpoint(checkpoint, TorchPredictor)

predicted_probabilities = batch_predictor.predict(dataset, feature_columns=all_features)
predicted_probabilities.show()
# {'predictions': array([1.], dtype=float32)}
# {'predictions': array([0.], dtype=float32)}
from typing import List
import numpy as np

import ray
from ray.data.preprocessors import Concatenator
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowPredictor
from ray.train.batch_predictor import BatchPredictor


def create_model(input_features):
    from tensorflow import keras  # this is needed for tf<2.9
    from tensorflow.keras import layers

    return keras.Sequential(
        [
            keras.Input(shape=(input_features,)),
            layers.Dense(16, activation="relu"),
            layers.Dense(16, activation="relu"),
            layers.Dense(1, activation="sigmoid"),
        ]
    )


dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
all_features: List[str] = dataset.schema().names
all_features.remove("target")
num_features = len(all_features)

prep = Concatenator(dtype=np.float32)

checkpoint = TensorflowCheckpoint.from_model(
    model=create_model(num_features), preprocessor=prep
)
# You can also fetch a checkpoint from a Trainer
# checkpoint = trainer.fit().checkpoint

batch_predictor = BatchPredictor.from_checkpoint(
    checkpoint, TensorflowPredictor, model_definition=lambda: create_model(num_features)
)

predicted_probabilities = batch_predictor.predict(dataset, feature_columns=all_features)
predicted_probabilities.show()
# {'predictions': array([1.], dtype=float32)}
# {'predictions': array([0.], dtype=float32)}

Image

import pandas as pd

from torchvision import transforms
from torchvision.models import resnet18

import ray
from ray.train.torch import TorchCheckpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor
from ray.data.preprocessors import BatchMapper
from ray.data.datasource import ImageFolderDatasource


def preprocess(df: pd.DataFrame) -> pd.DataFrame:
    """
    User Pytorch code to transform user image. Note we still use pandas as
    intermediate format to hold images as shorthand of python dictionary.
    """
    preprocess = transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    )
    df["image"] = [preprocess(x).numpy() for x in df["image"]]
    return df


data_url = "s3://anonymous@air-example-data-2/1G-image-data-synthetic-raw"
print(f"Running GPU batch prediction with 1GB data from {data_url}")
dataset = ray.data.read_datasource(
    ImageFolderDatasource(), root=data_url, size=(256, 256)
)

model = resnet18(pretrained=True)

preprocessor = BatchMapper(preprocess)
ckpt = TorchCheckpoint.from_model(model=model, preprocessor=preprocessor)

predictor = BatchPredictor.from_checkpoint(ckpt, TorchPredictor)
predictor.predict(dataset, feature_columns=["image"])

Coming soon!

Text

Coming soon!

Lazy/Pipelined Prediction (experimental)

If you have a large dataset but not a lot of available memory, you can use the predict_pipelined method.

Unlike predict() which will load the entire data into memory, predict_pipelined will create a DatasetPipeline object, which will lazily load the data and perform inference on a smaller batch of data at a time.

The lazy loading of the data will allow you to operate on datasets much greater than your available memory. Execution can be triggered by pulling from the pipeline, as shown in the example below.

import pandas as pd
import ray
from ray.air import Checkpoint
from ray.train.predictor import Predictor
from ray.train.batch_predictor import BatchPredictor

# Create a BatchPredictor that always returns `42` for each input.
batch_pred = BatchPredictor.from_pandas_udf(
    lambda data: pd.DataFrame({"a": [42] * len(data)})
)

# Create a dummy dataset.
ds = ray.data.range_tensor(200, parallelism=4)
# Setup a prediction pipeline.
pipeline = batch_pred.predict_pipelined(ds, blocks_per_window=1)
for batch in pipeline.iter_batches():
    print("Pipeline result", batch)
    # 0    42
    # 1    42
    # ...

Online Inference

Check out the Deploying Predictors with Serve for details on how to perform online inference with AIR.

Developer Guide: Implementing your own Predictor

To implement a new Predictor for your particular framework, you should subclass the base Predictor and implement the following two methods:

  1. _predict_pandas: Given a pandas.DataFrame input, return a pandas.DataFrame containing predictions.

  2. from_checkpoint: Logic for creating a Predictor from an AIR Checkpoint.

  3. Optionally _predict_arrow for better performance when working with tensor data to avoid extra copies from Pandas conversions.