# End-to-end: Offline Batch Inference

## Contents

# End-to-end: Offline Batch Inference#

Tip

Get in touch to get help using Ray Data, the industry’s fastest and cheapest solution for offline batch inference.

Offline batch inference is a process for generating model predictions on a fixed set of input data. Ray Data offers an efficient and scalable solution for batch inference, providing faster execution and cost-effectiveness for deep learning applications.

For an overview on why you should use Ray Data for offline batch inference, and how it compares to alternatives, see the Ray Data Overview.

## Quickstart#

To start, install Ray Data:

```
pip install -U "ray[data]"
```

Using Ray Data for offline inference involves four basic steps:

**Step 1:**Load your data into a Ray Dataset. Ray Data supports many different datasources and formats. For more details, see Loading Data.**Step 2:**Define a Python class to load the pre-trained model.**Step 3:**Transform your dataset using the pre-trained model by calling`ds.map_batches()`

. For more details, see Transforming Data.**Step 4:**Get the final predictions by either iterating through the output or saving the results. For more details, see the Iterating over data and Saving data user guides.

For more in-depth examples for your use case, see the batch inference examples. For how to configure batch inference, see the configuration guide.

```
from typing import Dict
import numpy as np
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
# Initialize a pre-trained GPT2 Huggingface pipeline.
self.model = pipeline("text-generation", model="gpt2")
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
# Get the predictions from the input batch.
predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
# `predictions` is a list of length-one lists. For example:
# [[{'generated_text': 'output_1'}], ..., [{'generated_text': 'output_2'}]]
# Modify the output to get it into the following format instead:
# ['output_1', 'output_2']
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(HuggingFacePredictor, compute=scale)
# Step 4: Show one prediction output.
predictions.show(limit=1)
```

```
{'data': 'Complete this', 'output': 'Complete this information or purchase any item from this site.\n\nAll purchases are final and non-'}
```

```
from typing import Dict
import numpy as np
import torch
import torch.nn as nn
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class TorchPredictor:
def __init__(self):
# Load a dummy neural network.
# Set `self.model` to your pre-trained PyTorch model.
self.model = nn.Sequential(
nn.Linear(in_features=100, out_features=1),
nn.Sigmoid(),
)
self.model.eval()
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
tensor = torch.as_tensor(batch["data"], dtype=torch.float32)
with torch.inference_mode():
# Get the predictions from the input batch.
return {"output": self.model(tensor).numpy()}
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(TorchPredictor, compute=scale)
# Step 4: Show one prediction output.
predictions.show(limit=1)
```

```
{'output': array([0.5590901], dtype=float32)}
```

```
from typing import Dict
import numpy as np
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class TFPredictor:
def __init__(self):
from tensorflow import keras
# Load a dummy neural network.
# Set `self.model` to your pre-trained Keras model.
input_layer = keras.Input(shape=(100,))
output_layer = keras.layers.Dense(1, activation="sigmoid")
self.model = keras.Sequential([input_layer, output_layer])
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Get the predictions from the input batch.
return {"output": self.model(batch["data"]).numpy()}
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(TFPredictor, compute=scale)
# Step 4: Show one prediction output.
predictions.show(limit=1)
```

```
{'output': array([0.625576], dtype=float32)}
```

## More examples#

## Configuration and troubleshooting#

### Using GPUs for inference#

To use GPUs for inference, make the following changes to your code:

Update the class implementation to move the model and data to and from GPU.

Specify

`num_gpus=1`

in the`ds.map_batches()`

call to indicate that each actor should use 1 GPU.Specify a

`batch_size`

for inference. For more details on how to configure the batch size, see Configuring Batch Size.

The remaining is the same as the Quickstart.

```
from typing import Dict
import numpy as np
import ray
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
# Set "cuda:0" as the device so the Huggingface pipeline uses GPU.
self.model = pipeline("text-generation", model="gpt2", device="cuda:0")
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
HuggingFacePredictor,
num_gpus=1,
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the ActorPool size to the number of GPUs in your cluster.
compute=ray.data.ActorPoolStrategy(size=2),
)
predictions.show(limit=1)
```

```
{'data': 'Complete this', 'output': 'Complete this poll. Which one do you think holds the most promise for you?\n\nThank you'}
```

```
from typing import Dict
import numpy as np
import torch
import torch.nn as nn
import ray
ds = ray.data.from_numpy(np.ones((1, 100)))
class TorchPredictor:
def __init__(self):
# Move the neural network to GPU device by specifying "cuda".
self.model = nn.Sequential(
nn.Linear(in_features=100, out_features=1),
nn.Sigmoid(),
).cuda()
self.model.eval()
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Move the input batch to GPU device by specifying "cuda".
tensor = torch.as_tensor(batch["data"], dtype=torch.float32, device="cuda")
with torch.inference_mode():
# Move the prediction output back to CPU before returning.
return {"output": self.model(tensor).cpu().numpy()}
# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
TorchPredictor,
num_gpus=1,
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the ActorPool size to the number of GPUs in your cluster.
compute=ray.data.ActorPoolStrategy(size=2)
)
predictions.show(limit=1)
```

```
{'output': array([0.5590901], dtype=float32)}
```

```
from typing import Dict
import numpy as np
import tensorflow as tf
from tensorflow import keras
import ray
ds = ray.data.from_numpy(np.ones((1, 100)))
class TFPredictor:
def __init__(self):
# Move the neural network to GPU by specifying the GPU device.
with tf.device("GPU:0"):
input_layer = keras.Input(shape=(100,))
output_layer = keras.layers.Dense(1, activation="sigmoid")
self.model = keras.Sequential([input_layer, output_layer])
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Move the input batch to GPU by specifying GPU device.
with tf.device("GPU:0"):
return {"output": self.model(batch["data"]).numpy()}
# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
TFPredictor,
num_gpus=1,
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the ActorPool size to the number of GPUs in your cluster.
compute=ray.data.ActorPoolStrategy(size=2)
)
predictions.show(limit=1)
```

```
{'output': array([0.625576], dtype=float32)}
```

### Configuring Batch Size#

Configure the size of the input batch that’s passed to `__call__`

by setting the `batch_size`

argument for `ds.map_batches()`

Increasing batch size results in faster execution because inference is a vectorized operation. For GPU inference, increasing batch size increases GPU utilization. Set the batch size to as large possible without running out of memory. If you encounter out-of-memory errors, decreasing `batch_size`

may help.

```
import numpy as np
import ray
ds = ray.data.from_numpy(np.ones((10, 100)))
def assert_batch(batch: Dict[str, np.ndarray]):
assert len(batch) == 2
return batch
# Specify that each input batch should be of size 2.
ds.map_batches(assert_batch, batch_size=2)
```

Caution

The default `batch_size`

of `4096`

may be too large for datasets with large rows
(for example, tables with many columns or a collection of large images).

### Handling GPU out-of-memory failures#

If you run into CUDA out-of-memory issues, your batch size is likely too large. Decrease the batch size by following these steps.

If your batch size is already set to 1, then use either a smaller model or GPU devices with more memory.

For advanced users working with large models, you can use model parallelism to shard the model across multiple GPUs.

### Optimizing expensive CPU preprocessing#

If your workload involves expensive CPU preprocessing in addition to model inference, you can optimize throughput by separating the preprocessing and inference logic into separate stages. This separation allows inference on batch \(N\) to execute concurrently with preprocessing on batch \(N+1\).

For an example where preprocessing is done in a separate `map`

call, see Image Classification Batch Inference with PyTorch ResNet18.

### Handling CPU out-of-memory failures#

If you run out of CPU RAM, you likely that you have too many model replicas that are running concurrently on the same node. For example, if a model
uses 5 GB of RAM when created / run, and a machine has 16 GB of RAM total, then no more
than three of these models can be run at the same time. The default resource assignments
of one CPU per task/actor might lead to `OutOfMemoryError`

from Ray in this situation.

Suppose your cluster has 4 nodes, each with 16 CPUs. To limit to at most 3 of these actors per node, you can override the CPU or memory:

```
from typing import Dict
import numpy as np
import ray
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
self.model = pipeline("text-generation", model="gpt2")
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
predictions = ds.map_batches(
HuggingFacePredictor,
# Require 5 CPUs per actor (so at most 3 can fit per 16 CPU node).
num_cpus=5,
# 3 actors per node, with 4 nodes in the cluster means ActorPool size of 12.
compute=ray.data.ActorPoolStrategy(size=12)
)
predictions.show(limit=1)
```

## Using models from Ray Train#

Models that have been trained with Ray Train can then be used for batch inference with Ray Data via the `Checkpoint`

that is returned by Ray Train.

**Step 1:** Train a model with Ray Train.

```
import ray
from ray.train import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
num_workers=2,
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
)
result = trainer.fit()
```

**Step 2:** Extract the `Checkpoint`

from the training `Result`

.

```
checkpoint = result.checkpoint
```

**Step 3:** Use Ray Data for batch inference. To load in the model from the `Checkpoint`

inside the Python class, use the methodology corresponding to the Trainer used to train the model.

**Deep Learning Trainers:**Saving and Loading Checkpoints**Tree-Based Trainers:**Save and load XGBoost and LightGBM checkpoints

In this case, use `XGBoostTrainer.get_model()`

to load the model.

The rest of the logic looks the same as in the Quickstart.

```
from typing import Dict
import pandas as pd
import numpy as np
import xgboost
from ray.train import Checkpoint
from ray.train.xgboost import XGBoostTrainer
test_dataset = valid_dataset.drop_columns(["target"])
class XGBoostPredictor:
def __init__(self, checkpoint: Checkpoint):
self.model = XGBoostTrainer.get_model(checkpoint)
def __call__(self, data: pd.DataFrame) -> Dict[str, np.ndarray]:
dmatrix = xgboost.DMatrix(data)
return {"predictions": self.model.predict(dmatrix)}
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Map the Predictor over the Dataset to get predictions.
predictions = test_dataset.map_batches(
XGBoostPredictor,
compute=scale,
batch_format="pandas",
# Pass in the Checkpoint to the XGBoostPredictor constructor.
fn_constructor_kwargs={"checkpoint": checkpoint}
)
predictions.show(limit=1)
```

```
{'predictions': 0.9969483017921448}
```