Preprocessing Data¶
This page describes how to perform data preprocessing in Ray AIR.
Data preprocessing is a common technique for transforming raw data into features that will be input to a machine learning model. In general, you may want to apply the same preprocessing logic to your offline training data and online inference data. Ray AIR provides several common preprocessors out of the box as well as interfaces that enable you to define your own custom logic.
Overview¶
Ray AIR exposes a Preprocessor
class for preprocessing. The Preprocessor
has four methods that make up its core interface.
fit()
: Compute state information about aDataset
(e.g. the mean or standard deviation of a column) and save it to thePreprocessor
. This information should then be used to performtransform()
. This is typically called on the training dataset.transform()
: Apply a transformation to aDataset
. If thePreprocessor
is stateful, thenfit()
must be called first. This is typically called on the training, validation, test datasets.transform_batch()
: Apply a transformation to a singlebatch
of data. This is typically called on online or offline inference data.fit_transform()
: Syntactic sugar for calling bothfit()
andtransform()
on aDataset
.
To show these in action, let’s walk through a basic example. First we’ll set up two simple Ray Dataset
s.
import pandas as pd
import ray
from ray.data.preprocessors import MinMaxScaler
# Generate two simple datasets.
dataset = ray.data.range_table(8)
dataset1, dataset2 = dataset.split(2)
print(dataset1.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]
print(dataset2.take())
# [{'value': 4}, {'value': 5}, {'value': 6}, {'value': 7}]
Next, fit
the Preprocessor
on one Dataset
, and transform
both Dataset
s with this fitted information.
# Fit the preprocessor on dataset1, and transform both dataset1 and dataset2.
preprocessor = MinMaxScaler(["value"])
dataset1_transformed = preprocessor.fit_transform(dataset1)
print(dataset1_transformed.take())
# [{'value': 0.0}, {'value': 0.3333333333333333}, {'value': 0.6666666666666666}, {'value': 1.0}]
dataset2_transformed = preprocessor.transform(dataset2)
print(dataset2_transformed.take())
# [{'value': 1.3333333333333333}, {'value': 1.6666666666666667}, {'value': 2.0}, {'value': 2.3333333333333335}]
Finally, call transform_batch
on a single batch of data.
batch = pd.DataFrame({"value": list(range(8, 12))})
batch_transformed = preprocessor.transform_batch(batch)
print(batch_transformed)
# value
# 0 2.666667
# 1 3.000000
# 2 3.333333
# 3 3.666667
Life of an AIR Preprocessor¶
Now that we’ve gone over the basics, let’s dive into how Preprocessor
s fit into an end-to-end application built with AIR.
The diagram below depicts an overview of the main steps of a Preprocessor
:
Passed into a
Trainer
tofit
andtransform
inputDataset
s.Saved as a
Checkpoint
.Reconstructed in a
Predictor
tofit_batch
on batches of data.
Throughout this section we’ll go through this workflow in more detail, with code examples using XGBoost. The same logic is applicable to other integrations as well.
Trainer¶
The journey of the Preprocessor
starts with the Trainer
.
If the Trainer
is instantiated with a Preprocessor
, then the following logic will be executed when Trainer.fit()
is called:
If a
"train"
Dataset
is passed in, then thePreprocessor
will callfit()
on it.The
Preprocessor
will then calltransform()
on allDataset
s, including the"train"
Dataset
.The
Trainer
will then perform training on the preprocessedDataset
s.
import ray
from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer
train_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(0, 32, 3)])
valid_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(1, 32, 3)])
preprocessor = MinMaxScaler(["x"])
trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config={"num_workers": 2},
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
)
result = trainer.fit()
Note
If you’re passing a Preprocessor
that is already fitted, it will be refitted on the "train"
Dataset
.
Adding the functionality to support passing in a fitted Preprocessor is being tracked
here.
Tune¶
If you’re using Ray Tune
for hyperparameter optimization, be aware that each Trial
will instantiate its own copy of
the Preprocessor
and the fitting and transformation logic will occur once per Trial
.
Checkpoint¶
Trainer.fit()
returns a Results
object which contains a Checkpoint
.
If a Preprocessor
was passed into the Trainer
, then it will be saved in the Checkpoint
along with any fitted state.
As a sanity check, let’s confirm the Preprocessor
is available in the Checkpoint
. In practice you should not need to do this.
import os
import ray.cloudpickle as cpickle
from ray.air.constants import PREPROCESSOR_KEY
checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_path:
path = os.path.join(checkpoint_path, PREPROCESSOR_KEY)
with open(path, "rb") as f:
preprocessor = cpickle.load(f)
print(preprocessor)
# MixMaxScaler(columns=['x'], stats={'min(x)': 0, 'max(x)': 30})
Predictor¶
A Predictor
can be constructed from a saved Checkpoint
. If the Checkpoint
contains a Preprocessor
,
then the Preprocessor
will be used to call transform_batch
on input batches prior to performing inference.
In the following example, we show the Batch Predictor flow. The same logic applies to the Online Inference flow.
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
test_dataset = ray.data.from_items([{"x": x} for x in range(2, 32, 3)])
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
# {'predictions': 0.09843720495700836}
# {'predictions': 5.604666709899902}
# {'predictions': 11.405311584472656}
# {'predictions': 15.684700012207031}
# {'predictions': 23.990947723388672}
# {'predictions': 29.900211334228516}
# {'predictions': 34.59944152832031}
# {'predictions': 40.6968994140625}
# {'predictions': 45.68107604980469}
Types of Preprocessors¶
Basic Preprocessors¶
Ray AIR provides a handful of Preprocessor
s that you can use out of the box, and more will be added over time.
Contributions are welcome!
Coming soon!
Chaining Preprocessors¶
More often than not, your preprocessing logic will contain multiple logical steps or apply different transformations to each column.
A simple Chain
Preprocessor
is provided which can be used to apply individual Preprocessor
operations sequentially.
import ray
from ray.data.preprocessors import Chain, MinMaxScaler, SimpleImputer
# Generate one simple dataset.
dataset = ray.data.from_items(
[{"value": 0}, {"value": 1}, {"value": 2}, {"value": 3}, {"value": None}]
)
print(dataset.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}, {'value': None}]
preprocessor = Chain(SimpleImputer(["value"]), MinMaxScaler(["value"]))
dataset_transformed = preprocessor.fit_transform(dataset)
print(dataset_transformed.take())
# [{'value': 0.0}, {'value': 0.3333333333333333}, {'value': 0.6666666666666666}, {'value': 1.0}, {'value': 0.5}]
Tip
Keep in mind that the operations are sequential. For example, if you define a Preprocessor
Chain([preprocessorA, preprocessorB])
, then preprocessorB.transform()
will be applied
to the result of preprocessorA.transform()
.
Custom Preprocessors¶
Stateless Preprocessors: Stateless preprocessors can be implemented with the BatchMapper
.
import ray
from ray.data.preprocessors import BatchMapper
# Generate a simple dataset.
dataset = ray.data.range_table(4)
print(dataset.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]
# Create a stateless preprocess that multiplies values by 2.
preprocessor = BatchMapper(lambda df: df * 2)
dataset_transformed = preprocessor.transform(dataset)
print(dataset_transformed.take())
# [{'value': 0}, {'value': 2}, {'value': 4}, {'value': 6}]
Stateful Preprocessors: Stateful preprocessors can be implemented with the CustomStatefulPreprocessor
.
from typing import Dict
import ray
from pandas import DataFrame
from ray.data.preprocessors import CustomStatefulPreprocessor
from ray.data import Dataset
from ray.data.aggregate import Max
def get_max(ds: Dataset):
return ds.aggregate(Max("value"))
def scale_by_max(df: DataFrame, stats: Dict):
return df * stats["max(value)"]
# Generate a simple dataset.
dataset = ray.data.range_table(4)
print(dataset.take())
# [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}]
# Create a stateful preprocessor that finds the max value and scales each value by it.
preprocessor = CustomStatefulPreprocessor(get_max, scale_by_max)
dataset_transformed = preprocessor.fit_transform(dataset)
print(dataset_transformed.take())
# [{'value': 0}, {'value': 3}, {'value': 6}, {'value': 9}]