Using Preprocessors
Contents
Using Preprocessors#
Data preprocessing is a common technique for transforming raw data into features for a machine learning model. In general, you may want to apply the same preprocessing logic to your offline training data and online inference data. Ray AIR provides several common preprocessors out of the box and interfaces to define your own custom logic.
Overview#
The most common way of using a preprocessor is by passing it as an argument to the constructor of a Trainer in conjunction with a Ray Dataset. For example, the following code trains a model with a preprocessor that normalizes the data.
import ray
from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
train_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(0, 32, 3)])
valid_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(1, 32, 3)])
preprocessor = MinMaxScaler(["x"])
trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
)
result = trainer.fit()
The Preprocessor
class with four public methods that can we used separately from a trainer:
fit()
: Compute state information about aDataset
(e.g., the mean or standard deviation of a column) and save it to thePreprocessor
. This information is used to performtransform()
, and the method is typically called on a training dataset.transform()
: Apply a transformation to aDataset
. If thePreprocessor
is stateful, thenfit()
must be called first. This method is typically called on training, validation, and test datasets.transform_batch()
: Apply a transformation to a singlebatch
of data. This method is typically called on online or offline inference data.fit_transform()
: Syntactic sugar for calling bothfit()
andtransform()
on aDataset
.
To show these methods in action, let’s walk through a basic example. First, we’ll set up two simple Ray Dataset
s.
import pandas as pd
import ray
from ray.data.preprocessors import MinMaxScaler
from ray.data.preprocessors.scaler import StandardScaler
# Generate two simple datasets.
dataset = ray.data.range_table(8)
dataset1, dataset2 = dataset.split(2)
print(dataset1.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]
print(dataset2.take())
# [{'value': 4}, {'value': 5}, {'value': 6}, {'value': 7}]
Next, fit
the Preprocessor
on one Dataset
, and then transform
both Dataset
s with this fitted information.
# Fit the preprocessor on dataset1, and transform both dataset1 and dataset2.
preprocessor = MinMaxScaler(["value"])
dataset1_transformed = preprocessor.fit_transform(dataset1)
print(dataset1_transformed.take())
# [{'value': 0.0}, {'value': 0.3333333333333333}, {'value': 0.6666666666666666}, {'value': 1.0}]
dataset2_transformed = preprocessor.transform(dataset2)
print(dataset2_transformed.take())
# [{'value': 1.3333333333333333}, {'value': 1.6666666666666667}, {'value': 2.0}, {'value': 2.3333333333333335}]
Finally, call transform_batch
on a single batch of data.
batch = pd.DataFrame({"value": list(range(8, 12))})
batch_transformed = preprocessor.transform_batch(batch)
print(batch_transformed)
# value
# 0 2.666667
# 1 3.000000
# 2 3.333333
# 3 3.666667
Life of an AIR preprocessor#
Now that we’ve gone over the basics, let’s dive into how Preprocessor
s fit into an end-to-end application built with AIR.
The diagram below depicts an overview of the main steps of a Preprocessor
:
Passed into a
Trainer
tofit
andtransform
inputDataset
sSaved as a
Checkpoint
Reconstructed in a
Predictor
tofit_batch
on batches of data
Throughout this section we’ll go through this workflow in more detail, with code examples using XGBoost. The same logic is applicable to other machine learning framework integrations as well.
Trainer#
The journey of the Preprocessor
starts with the Trainer
.
If the Trainer
is instantiated with a Preprocessor
, then the following logic is executed when Trainer.fit()
is called:
If a
"train"
Dataset
is passed in, then thePreprocessor
callsfit()
on it.The
Preprocessor
then callstransform()
on allDataset
s, including the"train"
Dataset
.The
Trainer
then performs training on the preprocessedDataset
s.
import ray
from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
train_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(0, 32, 3)])
valid_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(1, 32, 3)])
preprocessor = MinMaxScaler(["x"])
trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
)
result = trainer.fit()
Note
If you’re passing a Preprocessor
that is already fitted, it is refitted on the "train"
Dataset
.
Adding the functionality to support passing in a fitted Preprocessor is being tracked
here.
Tune#
If you’re using Ray Tune
for hyperparameter optimization, be aware that each Trial
instantiates its own copy of
the Preprocessor
and the fitting and transforming logic occur once per Trial
.
Checkpoint#
Trainer.fit()
returns a Result
object which contains a Checkpoint
.
If a Preprocessor
is passed into the Trainer
, then it is saved in the Checkpoint
along with any fitted state.
As a sanity check, let’s confirm the Preprocessor
is available in the Checkpoint
. In practice, you don’t need to check.
import os
import ray.cloudpickle as cpickle
from ray.air.constants import PREPROCESSOR_KEY
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_path:
path = os.path.join(checkpoint_path, PREPROCESSOR_KEY)
with open(path, "rb") as f:
preprocessor = cpickle.load(f)
print(preprocessor)
# MixMaxScaler(columns=['x'], stats={'min(x)': 0, 'max(x)': 30})
Predictor#
A Predictor
can be constructed from a saved Checkpoint
. If the Checkpoint
contains a Preprocessor
,
then the Preprocessor
calls transform_batch
on input batches prior to performing inference.
In the following example, we show the Batch Predictor flow. The same logic applies to the Online Inference flow.
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
test_dataset = ray.data.from_items([{"x": x} for x in range(2, 32, 3)])
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
# {'predictions': 0.09843720495700836}
# {'predictions': 5.604666709899902}
# {'predictions': 11.405311584472656}
# {'predictions': 15.684700012207031}
# {'predictions': 23.990947723388672}
# {'predictions': 29.900211334228516}
# {'predictions': 34.59944152832031}
# {'predictions': 40.6968994140625}
# {'predictions': 45.68107604980469}
Types of preprocessors#
Built-in preprocessors#
Ray AIR provides a handful of preprocessors out of the box.
Generic preprocessors
Apply an arbitrary operation to a dataset. |
|
Combine multiple preprocessors into a single |
|
Combine numeric columns into a column of type |
|
Implements an ML preprocessing operation. |
|
Replace missing values with imputed values. |
Categorical encoders
Convert columns to |
|
Encode labels as integer targets. |
|
Multi-hot encode categorical data. |
|
One-hot encode categorical data. |
|
Encode values within columns as ordered integer values. |
Feature scalers
Scale each column by its absolute max value. |
|
Scale each column by its range. |
|
Scales each sample to have unit norm. |
|
Apply a power transform to make your data more normally distributed. |
|
Scale and translate each column using quantiles. |
|
Translate and scale each column by its mean and standard deviation, respectively. |
Text encoders
Count the frequency of tokens in a column of strings. |
|
Count the frequency of tokens using the hashing trick. |
|
Replace each string with a list of tokens. |
|
Apply the hashing trick to a table that describes token frequencies. |
Utilities
Split the dataset into train and test subsets. |
Which preprocessor should you use?#
The type of preprocessor you use depends on what your data looks like. This section provides tips on handling common data formats.
Categorical data#
Most models expect numerical inputs. To represent your categorical data in a way your model can understand, encode categories using one of the preprocessors described below.
Categorical Data Type |
Example |
Preprocessor |
---|---|---|
Labels |
|
|
Ordered categories |
|
|
Unordered categories |
|
|
Lists of categories |
|
Note
If you’re using LightGBM, you don’t need to encode your categorical data. Instead,
use Categorizer
to convert your data to
pandas.CategoricalDtype
.
Numerical data#
To ensure your models behaves properly, normalize your numerical data. Reference the table below to determine which preprocessor to use.
Data Property |
Preprocessor |
---|---|
Your data is approximately normal |
|
Your data is sparse |
|
Your data contains many outliers |
|
Your data isn’t normal, but you need it to be |
|
You need unit-norm rows |
|
You aren’t sure what your data looks like |
Warning
These preprocessors operate on numeric columns. If your dataset contains columns of
type TensorDtype
, you may need to
implement a custom preprocessor.
Additionally, if your model expects a tensor or ndarray
, create a tensor using
Concatenator
.
Tip
Built-in feature scalers like StandardScaler
don’t
work on TensorDtype
columns, so apply
Concatenator
after feature scaling. Combine feature
scaling and concatenation into a single preprocessor with
Chain
.
from ray.data.preprocessors import Chain, Concatenator, StandardScaler
# Generate a simple dataset.
dataset = ray.data.from_items([{"X": 1.0, "Y": 2.0}, {"X": 4.0, "Y": 0.0}])
print(dataset.take())
# [{'X': 1.0, 'Y': 2.0}, {'X': 4.0, 'Y': 0.0}]
preprocessor = Chain(StandardScaler(columns=["X", "Y"]), Concatenator())
dataset_transformed = preprocessor.fit_transform(dataset)
print(dataset_transformed.take())
# [{'concat_out': array([-1., 1.])}, {'concat_out': array([ 1., -1.])}]
Text data#
A document-term matrix is a table that describes text data, often used in natural language processing.
To generate a document-term matrix from a collection of documents, use
HashingVectorizer
or
CountVectorizer
. If you already know the frequency of
tokens and want to store the data in a document-term matrix, use
FeatureHasher
.
Requirement |
Preprocessor |
---|---|
You care about memory efficiency |
|
You care about model interpretability |
Filling in missing values#
If your dataset contains missing values, replace them with
SimpleImputer
.
from ray.data.preprocessors import SimpleImputer
# Generate a simple dataset.
dataset = ray.data.from_items([{"value": 1.0}, {"value": None}, {"value": 3.0}])
print(dataset.take())
# [{'value': 1.0}, {'value': None}, {'value': 3.0}]
imputer = SimpleImputer(columns=["value"], strategy="mean")
dataset_transformed = imputer.fit_transform(dataset)
print(dataset_transformed.take())
# [{'value': 1.0}, {'value': 2.0}, {'value': 3.0}]
Chaining preprocessors#
If you need to apply more than one preprocessor, compose them together with
Chain
.
Chain
applies fit
and transform
sequentially. For example, if you construct
Chain(preprocessorA, preprocessorB)
, then preprocessorB.transform
is applied
to the result of preprocessorA.transform
.
import ray
from ray.data.preprocessors import Chain, MinMaxScaler, SimpleImputer
# Generate one simple dataset.
dataset = ray.data.from_items(
[{"value": 0}, {"value": 1}, {"value": 2}, {"value": 3}, {"value": None}]
)
print(dataset.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}, {'value': None}]
preprocessor = Chain(SimpleImputer(["value"]), MinMaxScaler(["value"]))
dataset_transformed = preprocessor.fit_transform(dataset)
print(dataset_transformed.take())
# [{'value': 0.0}, {'value': 0.3333333333333333}, {'value': 0.6666666666666666}, {'value': 1.0}, {'value': 0.5}]
Implementing custom preprocessors#
If you want to implement a custom preprocessor that needs to be fit, extend the
Preprocessor
base class.
from typing import Dict
import ray
from pandas import DataFrame
from ray.data.preprocessor import Preprocessor
from ray.data import Dataset
from ray.data.aggregate import Max
class CustomPreprocessor(Preprocessor):
def _fit(self, dataset: Dataset) -> Preprocessor:
self.stats_ = dataset.aggregate(Max("value"))
def _transform_pandas(self, df: DataFrame) -> DataFrame:
return df * self.stats_["max(value)"]
# Generate a simple dataset.
dataset = ray.data.range_table(4)
print(dataset.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]
# Create a stateful preprocessor that finds the max value and scales each value by it.
preprocessor = CustomPreprocessor()
dataset_transformed = preprocessor.fit_transform(dataset)
print(dataset_transformed.take())
# [{'value': 0}, {'value': 3}, {'value': 6}, {'value': 9}]
If your preprocessor doesn’t need to be fit, construct a
BatchMapper
to apply a UDF in parallel over your data.
BatchMapper
can drop, add, or modify columns, and you
can specify a batch_size
to control the size of the data batches provided to your UDF.
import ray
from ray.data.preprocessors import BatchMapper
# Generate a simple dataset.
dataset = ray.data.range_table(4)
print(dataset.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]
# Create a stateless preprocess that multiplies values by 2.
preprocessor = BatchMapper(lambda df: df * 2, batch_size=2, batch_format="pandas")
dataset_transformed = preprocessor.transform(dataset)
print(dataset_transformed.take())
# [{'value': 0}, {'value': 2}, {'value': 4}, {'value': 6}]