Ray AI Runtime (alpha)

Ray AI Runtime (AIR) is an open-source toolkit for building end-to-end ML applications. By leveraging Ray and its library ecosystem, it brings scalability and programmability to ML platforms.

../_images/ai_runtime.jpg

The main focuses of the Ray AI Runtime:

  • Ray AIR focuses on providing the compute layer for ML workloads.

  • It is designed to interoperate with other systems for storage and metadata needs.

Ray AIR consists of 5 key components – Data processing (Ray Data), Model Training (Ray Train), Reinforcement Learning (Ray RLlib), Hyperparameter Tuning (Ray Tune), and Model Serving (Ray Serve).

Users can use these libraries interchangeably to scale different parts of standard ML workflows.

To get started, install Ray AIR via pip install -U “ray[air]”

Tip

Getting involved with Ray AIR. We’ll be holding office hours, development sprints, and other activities as we get closer to the Ray AIR Beta/GA release. Want to join us? Fill out this short form!

Quick Start

Preprocess your data with a Preprocessor.

import ray
from ray.ml.preprocessors import StandardScaler

import pandas as pd

from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

data_raw = load_breast_cancer()
dataset_df = pd.DataFrame(data_raw["data"], columns=data_raw["feature_names"])
dataset_df["target"] = data_raw["target"]
train_df, test_df = train_test_split(dataset_df, test_size=0.3)
train_dataset = ray.data.from_pandas(train_df)
valid_dataset = ray.data.from_pandas(test_df)
test_dataset = ray.data.from_pandas(test_df.drop("target", axis=1))


# Create a preprocessor to scale some columns
columns_to_scale = ["mean radius", "mean texture"]
preprocessor = StandardScaler(columns=columns_to_scale)

Train a model with an XGBoostTrainer.

from ray.ml.train.integrations.xgboost import XGBoostTrainer

# XGBoost specific params
params = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
}

num_workers = 2
use_gpu = False  # use GPUs if detected.

trainer = XGBoostTrainer(
    scaling_config={
        "num_workers": num_workers,
        "use_gpu": use_gpu,
    },
    label_column="target",
    params=params,
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
    num_boost_round=20,
)
result = trainer.fit()
print(result.metrics)

Use the trained model for batch prediction with a BatchPredictor.

from ray.ml.batch_predictor import BatchPredictor
from ray.ml.predictors.integrations.xgboost import XGBoostPredictor

batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, XGBoostPredictor)

predicted_labels = (
    batch_predictor.predict(test_dataset)
    .map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    .to_pandas(limit=float("inf"))
)
print("PREDICTED LABELS")
print(f"{predicted_labels}")

shap_values = batch_predictor.predict(test_dataset, pred_contribs=True).to_pandas(
    limit=float("inf")
)
print("SHAP VALUES")
print(f"{shap_values}")

Create your dataset.


from torchvision import datasets
from torchvision.transforms import ToTensor

# Download training data from open datasets.
training_data = datasets.FashionMNIST(
    root="~/data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="~/data",
    train=False,
    download=True,
    transform=ToTensor(),
)

Train a model with a TorchTrainer.

import torch
from torch import nn
from torch.utils.data import DataLoader
import ray.train as train
from ray.ml.train.integrations.torch import TorchTrainer

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


def train_epoch(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset) // train.world_size()
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def validate_epoch(dataloader, model, loss_fn):
    size = len(dataloader.dataset) // train.world_size()
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n "
        f"Accuracy: {(100 * correct):>0.1f}%, "
        f"Avg loss: {test_loss:>8f} \n"
    )
    return test_loss


def train_func(config):
    batch_size = config["batch_size"]
    lr = config["lr"]
    epochs = config["epochs"]

    worker_batch_size = batch_size // train.world_size()

    # Create data loaders.
    train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
    test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)

    train_dataloader = train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = train.torch.prepare_data_loader(test_dataloader)

    # Create model.
    model = NeuralNetwork()
    model = train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for _ in range(epochs):
        train_epoch(train_dataloader, model, loss_fn, optimizer)
        loss = validate_epoch(test_dataloader, model, loss_fn)
        train.report(loss=loss)


num_workers = 2
use_gpu = False

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 4},
    scaling_config={"num_workers": num_workers, "use_gpu": use_gpu},
)
result = trainer.fit()
print(f"Last result: {result.metrics}")

Create your Ray Dataset.

import ray

a = 5
b = 10
size = 1000

items = [i / size for i in range(size)]
dataset = ray.data.from_items([{"x": x, "y": a * x + b} for x in items])

Train a model with a TensorflowTrainer.

import tensorflow as tf
from tensorflow.keras.callbacks import Callback

import ray.train as train
from ray.train.tensorflow import prepare_dataset_shard
from ray.ml.train.integrations.tensorflow import TensorflowTrainer


def build_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(1,)),
            tf.keras.layers.Dense(10),
            tf.keras.layers.Dense(1),
        ]
    )
    return model


class TrainCheckpointReportCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        train.save_checkpoint(**{"model": self.model.get_weights()})
        train.report(**logs)


def train_func(config: dict):
    batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_model()
        multi_worker_model.compile(
            optimizer=tf.keras.optimizers.SGD(learning_rate=config.get("lr", 1e-3)),
            loss=tf.keras.losses.mean_squared_error,
            metrics=[tf.keras.metrics.mean_squared_error],
        )

    dataset = train.get_dataset_shard("train")

    results = []
    for _ in range(epochs):
        tf_dataset = prepare_dataset_shard(
            dataset.to_tf(
                label_column="y",
                output_signature=(
                    tf.TensorSpec(shape=(None, 1), dtype=tf.float32),
                    tf.TensorSpec(shape=(None), dtype=tf.float32),
                ),
                batch_size=batch_size,
            )
        )
        history = multi_worker_model.fit(
            tf_dataset, callbacks=[TrainCheckpointReportCallback()]
        )
        results.append(history.history)
    return results


num_workers = 2
use_gpu = False

config = {"lr": 1e-3, "batch_size": 32, "epochs": 4}

trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=config,
    scaling_config=dict(num_workers=num_workers, use_gpu=use_gpu),
    datasets={"train": dataset},
)
result = trainer.fit()
print(result.metrics)

Use the trained model for batch prediction with a BatchPredictor.

import numpy as np

from ray.ml.batch_predictor import BatchPredictor
from ray.ml.predictors.integrations.tensorflow import TensorflowPredictor


batch_predictor = BatchPredictor.from_checkpoint(
    result.checkpoint, TensorflowPredictor, model_definition=build_model
)

items = [{"x": np.random.uniform(0, 1)} for _ in range(10)]
prediction_dataset = ray.data.from_items(items)

predictions = batch_predictor.predict(prediction_dataset, dtype=tf.float32)

pandas_predictions = predictions.to_pandas(float("inf"))

print(f"PREDICTIONS\n{pandas_predictions}")

See the Key Concepts for more that Ray AIR has to offer.

Why Ray AIR?

There are so many machine learning frameworks, platforms, and tools nowadays. What makes Ray AIR different?

We believe Ray AIR provides unique value deriving from Ray.