Key Concepts

Here, we cover the main concepts in AIR.

Preprocessors

Preprocessors are primitives that can be used to transform input data into features.

A preprocessor can be fitted during Training, and applied at runtime in both Training and Serving on data batches in the same way. AIR comes with a collection of built-in preprocessors, and you can also define your own with simple templates.

Preprocessors operate on Ray Datasets, which makes them scalable and compatible with a variety of datasources and dataframe libraries.

import ray
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

from ray.ml.preprocessors import *

data_raw = load_breast_cancer()
dataset_df = pd.DataFrame(data_raw["data"], columns=data_raw["feature_names"])
dataset_df["target"] = data_raw["target"]
train_df, test_df = train_test_split(dataset_df, test_size=0.3)
train_dataset = ray.data.from_pandas(train_df)
valid_dataset = ray.data.from_pandas(test_df)
test_dataset = ray.data.from_pandas(test_df.drop("target", axis=1))

columns_to_scale = ["mean radius", "mean texture"]
preprocessor = StandardScaler(columns=columns_to_scale)

Trainers

Trainers are wrapper classes around third-party training frameworks like XGBoost and Pytorch. They are built to help integrate with core Ray actors (for distribution), Ray Tune, and Ray Datasets.

See the documentation on Trainers.

from ray.ml.train.integrations.xgboost import XGBoostTrainer

num_workers = 2
use_gpu = False
# XGBoost specific params
params = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
    "max_depth": 2,
}

trainer = XGBoostTrainer(
    scaling_config={
        "num_workers": num_workers,
        "use_gpu": use_gpu,
    },
    label_column="target",
    params=params,
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
    num_boost_round=5,
)

result = trainer.fit()

Trainer objects will produce a Results object after calling .fit(). These objects will contain training metrics as long as checkpoints to retrieve the best model.

print(result.metrics)
print(result.checkpoint)

Tuner

Tuners offer scalable hyperparameter tuning as part of Ray Tune.

Tuners can work seamlessly with any Trainer but also can support arbitrary training functions.

from ray import tune
from ray.tune.tuner import Tuner, TuneConfig

tuner = Tuner(
    trainer,
    param_space={"params": {"max_depth": tune.randint(1, 9)}},
    tune_config=TuneConfig(num_samples=5, metric="train-logloss", mode="min"),
)
result_grid = tuner.fit()
best_result = result_grid.get_best_result()
print(best_result)

Batch Predictor

You can take a trained model and do batch inference using the BatchPredictor object.

from ray.ml.batch_predictor import BatchPredictor
from ray.ml.predictors.integrations.xgboost import XGBoostPredictor

batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, XGBoostPredictor)

predicted_labels = (
    batch_predictor.predict(test_dataset)
    .map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    .to_pandas(limit=float("inf"))
)

Online Inference

Deploy the model as an inference service by using Ray Serve and the ModelWrapperDeployment class.

from ray import serve
from fastapi import Request
from ray.serve.model_wrappers import ModelWrapperDeployment
from ray.serve.http_adapters import json_request


async def adapter(request: Request):
    content = await request.json()
    print(content)
    return pd.DataFrame.from_dict(content)


serve.start(detached=True)
deployment = ModelWrapperDeployment.options(name="XGBoostService")

deployment.deploy(
    XGBoostPredictor, result.checkpoint, batching_params=False, http_adapter=adapter
)

print(deployment.url)

After deploying the service, you can send requests to it.

import requests

sample_input = test_dataset.take(1)
sample_input = dict(sample_input[0])

output = requests.post(deployment.url, json=[sample_input]).json()
print(output)