Using preprocessors

This page describes how to perform data preprocessing in Ray AIR.

Data preprocessing is a common technique for transforming raw data into features that will be input to a machine learning model. In general, you may want to apply the same preprocessing logic to your offline training data and online inference data. Ray AIR provides several common preprocessors out of the box as well as interfaces that enable you to define your own custom logic.


Ray AIR exposes a Preprocessor class for preprocessing. The Preprocessor has four methods that make up its core interface.

  1. fit(): Compute state information about a Dataset (e.g. the mean or standard deviation of a column) and save it to the Preprocessor. This information should then be used to perform transform(). This is typically called on the training dataset.

  2. transform(): Apply a transformation to a Dataset. If the Preprocessor is stateful, then fit() must be called first. This is typically called on the training, validation, test datasets.

  3. transform_batch(): Apply a transformation to a single batch of data. This is typically called on online or offline inference data.

  4. fit_transform(): Syntactic sugar for calling both fit() and transform() on a Dataset.

To show these in action, let’s walk through a basic example. First we’ll set up two simple Ray Datasets.

import pandas as pd
import ray
from import MinMaxScaler
from import StandardScaler

# Generate two simple datasets.
dataset =
dataset1, dataset2 = dataset.split(2)

# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]

# [{'value': 4}, {'value': 5}, {'value': 6}, {'value': 7}]

Next, fit the Preprocessor on one Dataset, and transform both Datasets with this fitted information.

# Fit the preprocessor on dataset1, and transform both dataset1 and dataset2.
preprocessor = MinMaxScaler(["value"])

dataset1_transformed = preprocessor.fit_transform(dataset1)
# [{'value': 0.0}, {'value': 0.3333333333333333}, {'value': 0.6666666666666666}, {'value': 1.0}]

dataset2_transformed = preprocessor.transform(dataset2)
# [{'value': 1.3333333333333333}, {'value': 1.6666666666666667}, {'value': 2.0}, {'value': 2.3333333333333335}]

Finally, call transform_batch on a single batch of data.

batch = pd.DataFrame({"value": list(range(8, 12))})
batch_transformed = preprocessor.transform_batch(batch)
#       value
# 0  2.666667
# 1  3.000000
# 2  3.333333
# 3  3.666667

Life of an AIR preprocessor

Now that we’ve gone over the basics, let’s dive into how Preprocessors fit into an end-to-end application built with AIR. The diagram below depicts an overview of the main steps of a Preprocessor:

  1. Passed into a Trainer to fit and transform input Datasets.

  2. Saved as a Checkpoint.

  3. Reconstructed in a Predictor to fit_batch on batches of data.


Throughout this section we’ll go through this workflow in more detail, with code examples using XGBoost. The same logic is applicable to other integrations as well.


The journey of the Preprocessor starts with the Trainer. If the Trainer is instantiated with a Preprocessor, then the following logic will be executed when is called:

  1. If a "train" Dataset is passed in, then the Preprocessor will call fit() on it.

  2. The Preprocessor will then call transform() on all Datasets, including the "train" Dataset.

  3. The Trainer will then perform training on the preprocessed Datasets.

import ray

from import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig

train_dataset =[{"x": x, "y": 2 * x} for x in range(0, 32, 3)])
valid_dataset =[{"x": x, "y": 2 * x} for x in range(1, 32, 3)])

preprocessor = MinMaxScaler(["x"])

trainer = XGBoostTrainer(
    params={"objective": "reg:squarederror"},
    datasets={"train": train_dataset, "valid": valid_dataset},
result =


If you’re passing a Preprocessor that is already fitted, it will be refitted on the "train" Dataset. Adding the functionality to support passing in a fitted Preprocessor is being tracked here.


If you’re using Ray Tune for hyperparameter optimization, be aware that each Trial will instantiate its own copy of the Preprocessor and the fitting and transformation logic will occur once per Trial.

Checkpoint returns a Result object which contains a Checkpoint. If a Preprocessor was passed into the Trainer, then it will be saved in the Checkpoint along with any fitted state.

As a sanity check, let’s confirm the Preprocessor is available in the Checkpoint. In practice you should not need to do this.

import os
import ray.cloudpickle as cpickle
from ray.air.constants import PREPROCESSOR_KEY

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_path:
    path = os.path.join(checkpoint_path, PREPROCESSOR_KEY)
    with open(path, "rb") as f:
        preprocessor = cpickle.load(f)
# MixMaxScaler(columns=['x'], stats={'min(x)': 0, 'max(x)': 30})


A Predictor can be constructed from a saved Checkpoint. If the Checkpoint contains a Preprocessor, then the Preprocessor will be used to call transform_batch on input batches prior to performing inference.

In the following example, we show the Batch Predictor flow. The same logic applies to the Online Inference flow.

from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor

test_dataset =[{"x": x} for x in range(2, 32, 3)])

batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
# {'predictions': 0.09843720495700836}
# {'predictions': 5.604666709899902}
# {'predictions': 11.405311584472656}
# {'predictions': 15.684700012207031}
# {'predictions': 23.990947723388672}
# {'predictions': 29.900211334228516}
# {'predictions': 34.59944152832031}
# {'predictions': 40.6968994140625}
# {'predictions': 45.68107604980469}

Types of preprocessors

Built-in preprocessors

Ray AIR provides a handful of preprocessors out of the box.

Generic preprocessors

Apply an arbitrary operation to a dataset.

Combine multiple preprocessors into a single Preprocessor.

Combine numeric columns into a column of type TensorDtype.

Implements an ML preprocessing operation.

Replace missing values with imputed values.

Categorical encoders

Convert columns to pd.CategoricalDtype.

Encode labels as integer targets.

Multi-hot encode categorical data.

One-hot encode categorical data.

Encode values within columns as ordered integer values.

Feature scalers

Scale each column by its absolute max value.

Scale each column by its range.

Scales each sample to have unit norm.

Apply a power transform to make your data more normally distributed.

Scale and translate each column using quantiles.

Translate and scale each column by its mean and standard deviation, respectively.

Text encoders

Count the frequency of tokens in a column of strings.

Count the frequency of tokens using the hashing trick.

Replace each string with a list of tokens.

Apply the hashing trick to a table that describes token frequencies.


Split the dataset into train and test subsets.

Which preprocessor should you use?

The type of preprocessor you use depends on what your data looks like. This section provides tips on handling common data formats.

Categorical data

Most models expect numerical inputs. To represent your categorical data in a way your model can understand, encode categories using one of the preprocessors described below.

Categorical Data Type




"cat", "dog", "airplane"


Ordered categories

"bs", "md", "phd"


Unordered categories

"red", "green", "blue"


Lists of categories

("sci-fi", "action"), ("action", "comedy", "animated")



If you’re using LightGBM, you don’t need to encode your categorical data. Instead, use Categorizer to convert your data to pandas.CategoricalDtype.

Numerical data

To ensure your models behaves properly, normalize your numerical data. Reference the table below to determine which preprocessor to use.

Data Property


Your data is approximately normal


Your data is sparse


Your data contains many outliers


Your data isn’t normal, but you need it to be


You need unit-norm rows


You aren’t sure what your data looks like



These preprocessors operate on numeric columns. If your dataset contains columns of type TensorDtype, you may need to implement a custom preprocessor.

Additionally, if your model expects a tensor or ndarray, create a tensor using Concatenator.


Built-in feature scalers like StandardScaler don’t work on TensorDtype columns, so apply Concatenator after feature scaling. Combine feature scaling and concatenation into a single preprocessor with Chain.

from import Chain, Concatenator, StandardScaler

# Generate a simple dataset.
dataset =[{"X": 1.0, "Y": 2.0}, {"X": 4.0, "Y": 0.0}])
# [{'X': 1.0, 'Y': 2.0}, {'X': 4.0, 'Y': 0.0}]

preprocessor = Chain(StandardScaler(columns=["X", "Y"]), Concatenator())
dataset_transformed = preprocessor.fit_transform(dataset)
# [{'concat_out': array([-1.,  1.])}, {'concat_out': array([ 1., -1.])}]

Text data

A document-term matrix is a table that describes text data, often used in natural language processing.

To generate a document-term matrix from a collection of documents, use HashingVectorizer or CountVectorizer. If you already know the frequency of tokens and want to store the data in a document-term matrix, use FeatureHasher.



You care about memory efficiency


You care about model interpretability


Filling in missing values

If your dataset contains missing values, replace them with SimpleImputer.

from import SimpleImputer

# Generate a simple dataset.
dataset =[{"value": 1.0}, {"value": None}, {"value": 3.0}])
# [{'value': 1.0}, {'value': None}, {'value': 3.0}]

imputer = SimpleImputer(columns=["value"], strategy="mean")
dataset_transformed = imputer.fit_transform(dataset)
# [{'value': 1.0}, {'value': 2.0}, {'value': 3.0}]

Chaining preprocessors

If you need to apply more than one preprocessor, compose them together with Chain.

Chain applies fit and transform sequentially. For example, if you construct Chain(preprocessorA, preprocessorB), then preprocessorB.transform is applied to the result of preprocessorA.transform.

import ray
from import Chain, MinMaxScaler, SimpleImputer

# Generate one simple dataset.
dataset =
    [{"value": 0}, {"value": 1}, {"value": 2}, {"value": 3}, {"value": None}]
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}, {'value': None}]

preprocessor = Chain(SimpleImputer(["value"]), MinMaxScaler(["value"]))

dataset_transformed = preprocessor.fit_transform(dataset)
# [{'value': 0.0}, {'value': 0.3333333333333333}, {'value': 0.6666666666666666}, {'value': 1.0}, {'value': 0.5}]

Implementing custom preprocessors

If you want to implement a custom preprocessor that needs to be fit, extend the Preprocessor base class.

from typing import Dict
import ray
from pandas import DataFrame
from import Preprocessor
from import Dataset
from import Max

class CustomPreprocessor(Preprocessor):
    def _fit(self, dataset: Dataset) -> Preprocessor:
        self.stats_ = dataset.aggregate(Max("value"))

    def _transform_pandas(self, df: DataFrame) -> DataFrame:
        return df * self.stats_["max(value)"]

# Generate a simple dataset.
dataset =
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]

# Create a stateful preprocessor that finds the max value and scales each value by it.
preprocessor = CustomPreprocessor()
dataset_transformed = preprocessor.fit_transform(dataset)
# [{'value': 0}, {'value': 3}, {'value': 6}, {'value': 9}]

If your preprocessor doesn’t need to be fit, construct a BatchMapper. BatchMapper can drop, add, or modify columns.

import ray
from import BatchMapper

# Generate a simple dataset.
dataset =
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]

# Create a stateless preprocess that multiplies values by 2.
preprocessor = BatchMapper(lambda df: df * 2)
dataset_transformed = preprocessor.transform(dataset)
# [{'value': 0}, {'value': 2}, {'value': 4}, {'value': 6}]