Ray AI Runtime (alpha)

Ray AI Runtime (AIR) is an open-source toolkit for building end-to-end ML applications. By leveraging Ray and its library ecosystem, it brings scalability and programmability to ML platforms.

The main focuses of the Ray AI Runtime:

  • Ray AIR focuses on providing the compute layer for ML workloads.

  • It is designed to interoperate with other systems for storage and metadata needs.

Ray AIR consists of 5 key components – Data processing (Ray Data), Model Training (Ray Train), Reinforcement Learning (Ray RLlib), Hyperparameter Tuning (Ray Tune), and Model Serving (Ray Serve).

Users can use these libraries interchangeably to scale different parts of standard ML workflows.

Tip

Getting involved with Ray AIR. We’ll be holding office hours, development sprints, and other activities as we get closer to the Ray AIR Beta/GA release. Want to join us? Fill out this short form!

Components

Preprocessors

class ray.ml.preprocessor.Preprocessor[source]

Implements an ML preprocessing operation.

Preprocessors are stateful objects that can be fitted against a Dataset and used to transform both local data batches and distributed datasets. For example, a Normalization preprocessor may calculate the mean and stdev of a field during fitting, and uses these attributes to implement its normalization transform.

fit(dataset: ray.data.dataset.Dataset) ray.ml.preprocessor.Preprocessor[source]

Fit this Preprocessor to the Dataset.

Fitted state attributes will be directly set in the Preprocessor.

Parameters

dataset – Input dataset.

Returns

The fitted Preprocessor with state attributes.

Return type

Preprocessor

fit_transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Fit this Preprocessor to the Dataset and then transform the Dataset.

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Transform the given dataset.

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

transform_batch(df: Union[pd.DataFrame, np.ndarray]) Union[pd.DataFrame, np.ndarray][source]

Transform a single batch of data.

Parameters

df (DataBatchType) – Input data batch.

Returns

The transformed data batch.

Return type

DataBatchType

check_is_fitted() bool[source]

Returns whether this preprocessor is fitted.

We use the convention that attributes with a trailing _ are set after fitting is complete.

class ray.ml.preprocessors.Chain(*preprocessors: ray.ml.preprocessor.Preprocessor)[source]

Bases: ray.ml.preprocessor.Preprocessor

Chain multiple Preprocessors into a single Preprocessor.

Calling fit will invoke fit_transform on the input preprocessors, so that one preprocessor can fit based on columns/values produced by the transform of a preceding preprocessor.

Parameters

preprocessors – The preprocessors that should be executed sequentially.

fit_transform(ds: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Fit this Preprocessor to the Dataset and then transform the Dataset.

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

check_is_fitted() bool[source]

Returns whether this preprocessor is fitted.

We use the convention that attributes with a trailing _ are set after fitting is complete.

class ray.ml.preprocessors.LabelEncoder(label_column: str)[source]

Bases: ray.ml.preprocessor.Preprocessor

Encode values within a label column as ordered integer values.

Currently, order within a column is based on the values from the fitted dataset in sorted order.

Transforming values not included in the fitted dataset will be encoded as None.

Parameters

label_column – The label column that will be encoded.

class ray.ml.preprocessors.MinMaxScaler(columns: List[str])[source]

Bases: ray.ml.preprocessor.Preprocessor

Scale values within columns based on min and max values.

For each column, each value will be transformed to (value - min) / (max - min), where min and max are calculated from the fitted dataset.

When transforming the fitted dataset, transformed values will be in the range [0, 1].

Parameters

columns – The columns that will individually be scaled.

class ray.ml.preprocessors.OneHotEncoder(columns: List[str])[source]

Bases: ray.ml.preprocessor.Preprocessor

Encode columns as new columns using one-hot encoding.

The transformed dataset will have a new column in the form {column}_{value} for each of the values from the fitted dataset. The value of a column will be set to 1 if the value matches, otherwise 0.

Transforming values not included in the fitted dataset will result in all of the encoded column values being 0.

Parameters

columns – The columns that will individually be encoded.

class ray.ml.preprocessors.OrdinalEncoder(columns: List[str])[source]

Bases: ray.ml.preprocessor.Preprocessor

Encode values within columns as ordered integer values.

Currently, order within a column is based on the values from the fitted dataset in sorted order.

Transforming values not included in the fitted dataset will be encoded as None.

Parameters

columns – The columns that will individually be encoded.

class ray.ml.preprocessors.SimpleImputer(columns: List[str], strategy: str = 'mean', fill_value: Optional[Union[str, numbers.Number]] = None)[source]

Bases: ray.ml.preprocessor.Preprocessor

Populate missing values within columns.

Parameters
  • columns – The columns that will individually be imputed.

  • strategy – The strategy to compute the value to impute. - “mean”: The mean of the column (numeric only). - “most_frequent”: The most used value of the column (string or numeric). - “constant”: The value of fill_value (string or numeric).

  • fill_value – The value to use when strategy is “constant”.

class ray.ml.preprocessors.StandardScaler(columns: List[str], ddof=0)[source]

Bases: ray.ml.preprocessor.Preprocessor

Scale values within columns based on mean and standard deviation.

For each column, each value will be transformed to (value-mean)/std, where mean and std are calculated from the fitted dataset.

Parameters
  • columns – The columns that will individually be scaled.

  • ddof – The delta degrees of freedom used to calculate standard deviation.

Trainer

class ray.ml.trainer.Trainer(*args, **kwargs)[source]

Defines interface for distributed training on Ray.

Note: The base Trainer class cannot be instantiated directly. Only one of its subclasses can be used.

How does a trainer work?

  • First, initialize the Trainer. The initialization runs locally, so heavyweight setup should not be done in __init__.

  • Then, when you call trainer.fit(), the Trainer is serialized and copied to a remote Ray actor. The following methods are then called in sequence on the remote actor.

  • trainer.setup(): Any heavyweight Trainer setup should be specified here.

  • trainer.preprocess_datasets(): The provided ray.data.Dataset are preprocessed with the provided ray.ml.preprocessor.

  • trainer.train_loop(): Executes the main training logic.

  • Calling trainer.fit() will return a ray.result.Result object where you can access metrics from your training run, as well as any checkpoints that may have been saved.

How do I create a new Trainer?

Subclass ray.train.Trainer, and override the training_loop method, and optionally setup.

import torch

from ray.ml.train import Trainer
from ray import tune


class MyPytorchTrainer(Trainer):
    def setup(self):
        self.model = torch.nn.Linear(1, 1)
        self.optimizer = torch.optim.SGD(
            self.model.parameters(), lr=0.1)

    def training_loop(self):
        # You can access any Trainer attributes directly in this method.
        # self.datasets["train"] has already been
        # preprocessed by self.preprocessor
        dataset = self.datasets["train"]

        torch_ds = dataset.to_torch(label_column="y")
        loss_fn = torch.nn.MSELoss()

        for epoch_idx in range(10):
            loss = 0
            num_batches = 0
            for X, y in iter(torch_ds):
                # Compute prediction error
                pred = self.model(X)
                batch_loss = loss_fn(pred, y.float())

                # Backpropagation
                self.optimizer.zero_grad()
                batch_loss.backward()
                self.optimizer.step()

                loss += batch_loss.item()
                num_batches += 1
            loss /= num_batches

            # Use Tune functions to report intermediate
            # results.
            tune.report(loss=loss, epoch=epoch_idx)

How do I use an existing Trainer or one of my custom Trainers?

Initialize the Trainer, and call Trainer.fit()

import ray
train_dataset = ray.data.from_items(
    [{"x": i, "y": i} for i in range(3)])
my_trainer = MyPytorchTrainer(datasets={"train": train_dataset})
result = my_trainer.fit()
Parameters
  • scaling_config – Configuration for how to scale training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note: this method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note: This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train.

Then, the Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

abstract training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note: this method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (tune.report() and tune.save_checkpoint()) inside this training loop.

Example

fit() ray.ml.result.Result[source]

Runs training.

Returns:

A Result object containing the training result.

Raises:

TrainingFailedError: If any failures during the execution of self.as_trainable().

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

as_trainable() Type[ray.tune.trainable.Trainable][source]

Convert self to a tune.Trainable class.

class ray.ml.train.integrations.xgboost.XGBoostTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel XGBoost training.

This Trainer runs the XGBoost training loop in a distributed manner using multiple Ray Actors.

Example

import ray

from ray.ml.train.integrations.xgboost import XGBoostTrainer

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = XGBoostTrainer(
    label_column="y",
    params={"objective": "reg:squarederror"},
    scaling_config={"num_workers": 3},
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – XGBoost training parameters. Refer to XGBoost documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to xgboost.DMatrix objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to xgboost.train() function.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

class ray.ml.train.integrations.lightgbm.LightGBMTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel LightGBM training.

This Trainer runs the LightGBM training loop in a distributed manner using multiple Ray Actors.

Example

import ray

from ray.ml.train.integrations.lightgbm import LightGBMTrainer

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = LightGBMTrainer(
    label_column="y",
    params={"objective": "regression"},
    scaling_config={"num_workers": 3},
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – LightGBM training parameters passed to lightgbm.train(). Refer to LightGBM documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to lightgbm.Dataset objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to lightgbm.train() function.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

class ray.ml.train.integrations.tensorflow.TensorflowTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Tensorflow training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary TensorFlow process group already configured for distributed TensorFlow training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by ray.train.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and ray.train.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray Train function utils.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging.
    train.report(...)

    # Checkpoints the provided args as restorable state.
    train.save_checkpoint(...)

    # Returns dict of last saved checkpoint.
    train.load_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    train.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    train.get_world_size()

    # Returns the rank of this worker.
    train.get_world_rank()

    # Returns the rank of the worker on the current node.
    train.get_local_rank()

You can also use any of the TensorFlow specific function utils.

def train_loop_per_worker():
    # Turns off autosharding for a dataset.
    # You should use this if you are doing
    # `train.get_dataset_shard(...).to_tf(...)`
    # as the data will be already sharded.
    train.tensorflow.prepare_dataset_shard(...)

To save a model to use for the TensorflowPredictor, you must save it under the “model” kwarg in train.save_checkpoint().

Example:

import tensorflow as tf

import ray
from ray import train
from ray.train.tensorflow import prepare_dataset_shard

from ray.ml.train.integrations.tensorflow import TensorflowTrainer

input_size = 1

def build_model():
    # toy neural network : 1-layer
    return tf.keras.Sequential(
        [tf.keras.layers.Dense(
            1, activation="linear", input_shape=(input_size,))]
    )

def train_loop_for_worker(config):
    dataset_shard = train.get_dataset_shard("train")
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = build_model()
        model.compile(
            optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    for epoch in range(config["num_epochs"]):
        tf_dataset = prepare_dataset_shard(
            dataset_shard.to_tf(
                label_column="y",
                output_signature=(
                    tf.TensorSpec(shape=(None, 1), dtype=tf.float32),
                    tf.TensorSpec(shape=(None), dtype=tf.float32),
                ),
                batch_size=1,
            )
        )
        model.fit(tf_dataset)
        train.save_checkpoint(
            epoch=epoch, model_weights=model.get_weights())

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(scaling_config={"num_workers": 3},
    datasets={"train": train_dataset},
    train_loop_config={"num_epochs": 2})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • tensorflow_config – Configuration for setting up the TensorFlow backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

class ray.ml.train.integrations.torch.TorchTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel PyTorch training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary torch process group already configured for distributed pytorch training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by ray.train.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and ray.train.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray Train function utils.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging.
    train.report(...)

    # Checkpoints the provided args as restorable state.
    train.save_checkpoint(...)

    # Returns dict of last saved checkpoint.
    train.load_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    train.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    train.get_world_size()

    # Returns the rank of this worker.
    train.get_world_rank()

    # Returns the rank of the worker on the current node.
    train.get_local_rank()

You can also use any of the Torch specific function utils.

def train_loop_per_worker():
    # Prepares model for distribted training by wrapping in
    # `DistributedDataParallel` and moving to correct device.
    train.torch.prepare_model(...)

    # Configures the dataloader for distributed training by adding a
    # `DistributedSampler`.
    # You should NOT use this if you are doing
    # `train.get_dataset_shard(...).to_torch(...)`
    train.torch.prepare_data_loader(...)

    # Returns the current torch device.
    train.torch.get_device()

To save a model to use for the TorchPredictor, you must save it under the “model” kwarg in train.save_checkpoint().

Example

import torch
import torch.nn as nn

import ray
from ray import train
from ray.ml.train.integrations.torch import TorchTrainer

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    dataset_shard = train.get_dataset_shard("train")
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    model = train.torch.prepare_model(model)

    for epoch in range(num_epochs):
        for batch in iter(dataset_shard.to_torch(batch_size=32)):
            output = model(input)
            loss = loss_fn(output, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        train.save_checkpoint(model=model.state_dict())

train_dataset = ray.data.from_items([1, 2, 3])
scaling_config = {"num_workers": 3}
# If using GPUs, use the below scaling config instead.
# scaling_config = {"num_workers": 3, "use_gpu": True}
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config={"num_workers": 3},
    datasets={"train": train_dataset})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

class ray.ml.train.data_parallel_trainer.DataParallelTrainer(*args, **kwargs)[source]

Bases: ray.ml.trainer.Trainer

A Trainer for data parallel training.

You should subclass this Trainer if your Trainer follows SPMD (single program, multiple data) programming paradigm - you want multiple processes to run the same function, but on different data.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by ray.train.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and ray.train.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray Train function utils.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging.
    train.report(...)

    # Checkpoints the provided args as restorable state.
    train.save_checkpoint(...)

    # Returns dict of last saved checkpoint.
    train.load_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    train.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    train.get_world_size()

    # Returns the rank of this worker.
    train.get_world_rank()

    # Returns the rank of the worker on the current node.
    train.get_local_rank()

How do I use ``DataParallelTrainer`` or any of its subclasses?

Example:

import ray
from ray import train

def train_loop_for_worker():
    dataset_shard_for_this_worker = train.get_dataset_shard("train")

    assert len(dataset_shard_for_this_worker) == 1

train_dataset = ray.data.from_items([1, 2, 3])
assert len(train_dataset) == 3
trainer = DataParallelTrainer(scaling_config={"num_workers": 3},
    datasets={"train": train_dataset})
result = trainer.fit()

How do I develop on top of ``DataParallelTrainer``?

In many cases, using DataParallelTrainer directly is sufficient to execute functions on multiple actors.

However, you may want to subclass DataParallelTrainer and create a custom Trainer for the following 2 use cases:

  • Use Case 1: You want to do data parallel training, but want to have a predefined training_loop_per_worker.

  • Use Case 2: You want to implement a custom Training backend that automatically handles additional setup or teardown logic on each actor, so that the users of this new trainer do not have to implement this logic. For example, a TensorflowTrainer can be built on top of DataParallelTrainer that automatically handles setting the proper environment variables for distributed Tensorflow on each actor.

For 1, you can set a predefined training loop in __init__

from ray.ml.train.data_parallel_trainer import DataParallelTrainer

class MyDataParallelTrainer(DataParallelTrainer):
    def __init__(self, *args, **kwargs):
        predefined_train_loop_per_worker = lambda: 1
        super().__init__(predefined_train_loop_per_worker, *args, **kwargs)

For 2, you can implement the ray.train.Backend and ray.train.BackendConfig interfaces.

from dataclasses import dataclass
from ray.train.backend import Backend, BackendConfig

class MyBackend(Backend):
    def on_start(self, worker_group, backend_config):
        def set_env_var(env_var_value):
            import os
            os.environ["MY_ENV_VAR"] = env_var_value

        worker_group.execute(set_env_var, backend_config.env_var)

@dataclass
class MyBackendConfig(BackendConfig):
    env_var: str = "default_value"

    def backend_cls(self):
        return MyBackend

class MyTrainer(DataParallelTrainer):
    def __init__(self, train_loop_per_worker, my_backend_config:
        MyBackendConfig, **kwargs):

        super().__init__(train_loop_per_worker, my_backend_config, **kwargs)
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • backend_config – Configuration for setting up a Backend (e.g. Torch, Tensorflow, Horovod) on each worker to enable distributed communication. If no Backend should be set up, then set this to None.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note: this method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (tune.report() and tune.save_checkpoint()) inside this training loop.

Example

class ray.ml.train.gbdt_trainer.GBDTTrainer(*args, **kwargs)[source]

Bases: ray.ml.trainer.Trainer

Common logic for gradient-boosting decision tree (GBDT) frameworks like XGBoost-Ray and LightGBM-Ray.

Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – Framework specific training parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to framework train() function.

DeveloperAPI: This API may change across minor Ray releases.

preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note: This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train.

Then, the Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note: this method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (tune.report() and tune.save_checkpoint()) inside this training loop.

Example

as_trainable() Type[ray.tune.trainable.Trainable][source]

Convert self to a tune.Trainable class.

Tuner

class ray.tune.tuner.Tuner(trainable: Optional[Union[str, Callable, Type[ray.tune.trainable.Trainable], Type[ray.ml.trainer.Trainer], ray.ml.trainer.Trainer]] = None, param_space: Optional[Dict[str, Any]] = None, tune_config: Optional[ray.tune.tune_config.TuneConfig] = None, run_config: Optional[ray.ml.config.RunConfig] = None, _tuner_internal: Optional[ray.tune.impl.tuner_internal.TunerInternal] = None)[source]

Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune.

Parameters
  • trainable – The trainable to be tuned.

  • param_space – Search space of the tuning job. One thing to note is that both preprocessor and dataset can be tuned here.

  • tune_config – Tuning algorithm specific configs. Refer to ray.tune.tune_config.TuneConfig for more info.

  • run_config – Runtime configuration that is specific to individual trials. Refer to ray.ml.config.RunConfig for more info.

Usage pattern:

# TODO(xwjiang): Make this runnable. Add imports.

param_space = {
    "scaling_config": {
        "num_actors": tune.grid_search([2, 4]),
        "cpus_per_actor": 2,
        "gpus_per_actor": 0,
    },
    "preprocessor": tune.grid_search([prep_v1, prep_v2]),
    "datasets": {
        "train_dataset": tune.grid_search([ds1, ds2]),
    },
    "params": {
        "objective": "binary:logistic",
        "tree_method": "approx",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9),
    },
}
tuner = Tuner(trainable=trainer, param_space=param_space,
    run_config(name="my_tune_run"))
analysis = tuner.fit()

To retry a failed tune run, you can then do

tuner = Tuner.restore(experiment_checkpoint_dir)
tuner.fit()

experiment_checkpoint_dir can be easily located near the end of the console output of your first failed run.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod restore(path: str) ray.tune.tuner.Tuner[source]

Restores Tuner after a previously failed run.

Parameters

path – The path where the previous failed run is checkpointed. This information could be easily located near the end of the console output of previous run. Note: depending on whether ray client mode is used or not, this path may or may not exist on your local machine.

fit() ray.tune.result_grid.ResultGrid[source]

Executes hyperparameter tuning job as configured and returns result.

Failure handling: For the kind of exception that happens during the execution of a trial, one may inspect it together with stacktrace through the returned result grid. See ResultGrid for reference. Each trial may fail up to a certain number. This is configured by RunConfig.FailureConfig.max_failures.

Exception that happens beyond trials will be thrown by this method as well. In such cases, there will be instruction like the following printed out at the end of console output to inform users on how to resume.

Please use tuner = Tuner.restore(“~/ray_results/tuner_resume”) to resume.

Exception that happens in non-essential integration blocks like during invoking callbacks will not crash the whole run.

Raises

TuneError – If errors occur executing the experiment that originate from Tune.

class ray.tune.result_grid.ResultGrid(experiment_analysis: ray.tune.analysis.experiment_analysis.ExperimentAnalysis)[source]

A set of Result objects returned from a call to tuner.fit().

You can use it to inspect the trials run as well as obtaining the best result.

The constructor is a private API.

Usage pattern:

result_grid = tuner.fit()
for i in range(len(result_grid)):
    result = result_grid[i]
    if not result.error:
        print(f"Trial finishes successfully with metric {result.metric}.")
    else:
        print(f"Trial errors out with {result.error}.")
best_result = result_grid.get_best_result()
best_checkpoint = best_result.checkpoint
best_metric = best_result.metric

Note that trials of all statuses are included in the final result grid. If a trial is not in terminated state, its latest result and checkpoint as seen by Tune will be provided.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

get_best_result() ray.ml.result.Result[source]

Get the best result from all the trials run.

Note, “best” here is determined by “metric” and “mode” specified in your Tuner’s TuneConfig.

Trials are compared using their “last” results. In a similar notion, the last checkpoint of the best trial is returned as part of the result.

Predictors

class ray.ml.predictor.Predictor[source]

Predictors load models from checkpoints to perform inference.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, **kwargs) ray.ml.predictor.Predictor[source]

Create a specific predictor from a checkpoint.

Parameters
  • checkpoint – Checkpoint to load predictor data from.

  • kwargs – Arguments specific to predictor implementations.

Returns

Predictor object.

Return type

Predictor

predict(data: Union[pd.DataFrame, np.ndarray], **kwargs) Union[pd.DataFrame, np.ndarray][source]

Perform inference on a batch of data.

Parameters
  • data – A batch of input data. Either a pandas Dataframe or numpy array.

  • kwargs – Arguments specific to predictor implementations.

Returns

Prediction result.

Return type

DataBatchType

class ray.ml.predictors.integrations.xgboost.XGBoostPredictor(model: xgboost.core.Booster, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for XGBoost models.

Parameters
  • model – The XGBoost booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.xgboost.xgboost_predictor.XGBoostPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of XGBoostTrainer.

Parameters

checkpoint (Checkpoint) – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a XGBoostTrainer run.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, dmatrix_kwargs: Optional[Dict[str, Any]] = None, **predict_kwargs) pandas.core.frame.DataFrame[source]

Run inference on data batch.

The data is converted into an XGBoost DMatrix before being inputted to the model.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • dmatrix_kwargs – Dict of keyword arguments passed to xgboost.DMatrix.

  • **predict_kwargs – Keyword arguments passed to xgboost.Booster.predict.

Examples:

import numpy as np
import xgboost as xgb
from ray.ml.predictors.xgboost import XGBoostPredictor

train_X = np.array([[1, 2], [3, 4]])
train_y = np.array([0, 1])

model = xgb.XGBClassifier().fit(train_X, train_y)
predictor = XGBoostPredictor(model=model.get_booster())

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = np.array([[1, 2, 8], [3, 4, 9]])
predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd
import xgboost as xgb
from ray.ml.predictors.xgboost import XGBoostPredictor

train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
train_y = pd.Series([0, 1])

model = xgb.XGBClassifier().fit(train_X, train_y)
predictor = XGBoostPredictor(model=model.get_booster())

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Return type

pd.DataFrame

class ray.ml.predictors.integrations.lightgbm.LightGBMPredictor(model: lightgbm.basic.Booster, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for LightGBM models.

Parameters
  • model – The LightGBM booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.lightgbm.lightgbm_predictor.LightGBMPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of LightGBMTrainer.

Parameters

checkpoint (Checkpoint) – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a LightGBMTrainer run.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) pandas.core.frame.DataFrame[source]

Run inference on data batch.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • **predict_kwargs – Keyword arguments passed to lightgbm.Booster.predict.

Examples:

import numpy as np
import lightgbm as lgbm
from ray.ml.predictors.lightgbm import LightGBMPredictor

train_X = np.array([[1, 2], [3, 4]])
train_y = np.array([0, 1])

model = lgbm.LGBMClassifier().fit(train_X, train_y)
predictor = LightGBMPredictor(model=model.booster_)

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = np.array([[1, 2, 8], [3, 4, 9]])
predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd
import lightgbm as lgbm
from ray.ml.predictors.lightgbm import LightGBMPredictor

train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
train_y = pd.Series([0, 1])

model = lgbm.LGBMClassifier().fit(train_X, train_y)
predictor = LightGBMPredictor(model=model.booster_)

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Return type

pd.DataFrame

class ray.ml.predictors.integrations.tensorflow.TensorflowPredictor(model_definition: Union[Callable[[], <Mock name='mock.keras.Model' id='140585437910352'>], Type[<Mock name='mock.keras.Model' id='140585437910352'>]], preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None, model_weights: Optional[list] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for TensorFlow models.

Parameters
  • model_definition – A callable that returns a TensorFlow Keras model to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • model_weights – List of weights to use for the model.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model_definition: Union[Callable[[], <Mock name='mock.keras.Model' id='140585437910352'>], Type[<Mock name='mock.keras.Model' id='140585437910352'>]]) ray.ml.predictors.integrations.tensorflow.tensorflow_predictor.TensorflowPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TensorflowTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TensorflowTrainer run.

  • model_definition – A callable that returns a TensorFlow Keras model to use. Model weights will be loaded from the checkpoint.

predict(data: Union[pd.DataFrame, np.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, dtype: Optional[<Mock name='mock.dtypes.DType' id='140585437799504'>] = None) Union[pd.DataFrame, np.ndarray][source]

Run inference on data batch.

The data is converted into a TensorFlow Tensor before being inputted to the model.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • dtype – The TensorFlow dtype to use when creating the TensorFlow tensor. If set to None, then automatically infer the dtype.

Examples:

import numpy as np
import tensorflow as tf
from ray.ml.predictors.tensorflow import TensorflowPredictor

def build_model(self):
    return tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(1,)),
            tf.keras.layers.Dense(1),
        ]
    )

predictor = TensorflowPredictor(model_definition=build_model)

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)

# Only use first column as the feature
predictions = predictor.predict(data, feature_columns=[0])
import pandas as pd
import tensorflow as tf
from ray.ml.predictors.tensorflow import TensorflowPredictor

def build_model(self):
    return tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(1,)),
            tf.keras.layers.Dense(1),
        ]
    )

predictor = TensorflowPredictor(model_definition=build_model)

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])

predictions = predictor.predict(data)

# Only use first column as the feature
predictions = predictor.predict(data, feature_columns=["A"])
Returns

Prediction result.

Return type

DataBatchType

class ray.ml.predictors.integrations.torch.TorchPredictor(model: <Mock name='mock.nn.Module' id='140585439290832'>, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for PyTorch models.

Parameters
  • model – The torch module to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Optional[<Mock name='mock.nn.Module' id='140585439290832'>] = None) ray.ml.predictors.integrations.torch.torch_predictor.TorchPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TorchTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TorchTrainer run.

  • model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[List[str]], List[int], List[List[int]]]] = None, dtype: Optional[<Mock name='mock.dtype' id='140585438078416'>] = None) Union[pandas.core.frame.DataFrame, numpy.ndarray][source]

Run inference on data batch.

The data is converted into a torch Tensor before being inputted to the model.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If this arg is a list of lists, then the data batch will be converted into a multiple tensors which are then concatenated before feeding into the model. This is useful for multi-input models. If None, then use all columns in data.

  • dtype – The torch dtype to use when creating the torch tensor. If set to None, then automatically infer the dtype.

Examples:

import numpy as np
import torch
from ray.ml.predictors.torch import TorchPredictor

model = torch.nn.Linear(1, 1)
predictor = TorchPredictor(model=model)

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)

# Only use first column as the feature
predictions = predictor.predict(data, feature_columns=[0])
import pandas as pd
import torch
from ray.ml.predictors.torch import TorchPredictor

model = torch.nn.Linear(1, 1)
predictor = TorchPredictor(model=model)

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])

predictions = predictor.predict(data)

# Only use first column as the feature
predictions = predictor.predict(data, feature_columns=["A"])
Returns

Prediction result.

Return type

DataBatchType

Serving

ray.serve.model_wrappers.ModelWrapperDeployment

alias of Deployment(name=ModelWrapperDeployment,version=None,route_prefix=/ModelWrapperDeployment)

class ray.serve.model_wrappers.ModelWrapper(predictor_cls: Union[str, Type[ray.ml.predictor.Predictor]], checkpoint: Union[ray.ml.checkpoint.Checkpoint, Dict], input_schema: Union[str, Callable[[Any], Any]] = 'ray.serve.http_adapters.json_to_ndarray', batching_params: Optional[Union[Dict[str, int], bool]] = None)[source]

Serve any Ray AIR predictor from an AIR checkpoint.

Parameters
  • predictor_cls (str, Type[Predictor]) – The class or path for predictor class. The type must be a subclass of ray.ml.predicotr.Predictor.

  • checkpoint (Checkpoint, dict) –

    The checkpoint object or a dictionary describe the object.

    • The checkpoint object must be a subclass of ray.ml.checkpoint.Checkpoint.

    • The dictionary should be in the form of {"checkpoint_cls": "import.path.MyCheckpoint", "uri": "uri_to_load_from"}. Serve will then call MyCheckpoint.from_uri("uri_to_load_from") to instantiate the object.

  • input_schema (str, InputSchemaFn, None) – The FastAPI input conversion function. By default, Serve will use the NdArray schema and convert to numpy array. You can pass in any FastAPI dependency resolver that returns an array. When you pass in a string, Serve will import it. Please refer to Serve HTTP adatpers documentation to learn more.

  • batching_params (dict, None, False) – override the default parameters to ray.serve.batch(). Pass False to disable batching.

Outputs

class ray.ml.checkpoint.Checkpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[<Mock name='mock.ObjectRef' id='140586090734992'>] = None)[source]

Ray ML Checkpoint.

This implementation provides methods to translate between different checkpoint storage locations: Local storage, external storage (e.g. cloud storage), and data dict representations.

The constructor is a private API, instead the from_ methods should be used to create checkpoint objects (e.g. Checkpoint.from_directory()).

When converting between different checkpoint formats, it is guaranteed that a full round trip of conversions (e.g. directory –> dict –> obj ref –> directory) will recover the original checkpoint data. There are no guarantees made about compatibility of intermediate representations.

Examples

Example for an arbitrary data checkpoint:

from ray.ml.checkpoint import Checkpoint

# Create checkpoint data dict
checkpoint_data = {"data": 123}

# Create checkpoint object from data
checkpoint = Checkpoint.from_dict(checkpoint_data)

# Save checkpoint to temporary location
path = checkpoint.to_directory()

# This path can then be passed around, e.g. to a different function

# At some other location, recover Checkpoint object from path
checkpoint = Checkpoint.from_directory(path)

# Convert into dictionary again
recovered_data = checkpoint.to_dict()

# It is guaranteed that the original data has been recovered
assert recovered_data == checkpoint_data

Example using MLflow for saving and loading a classifier:

from ray.ml.checkpoint import Checkpoint
from sklearn.ensemble import RandomForestClassifier
import mlflow.sklearn

# Create an sklearn classifier
clf = RandomForestClassifier(max_depth=7, random_state=0)
# ... e.g. train model with clf.fit()
# Save model using MLflow
mlflow.sklearn.save_model(clf, "model_directory")

# Create checkpoint object from path
checkpoint = Checkpoint.from_directory("model_directory")

# Convert into dictionary
checkpoint_dict = checkpoint.to_dict()

# This dict can then be passed around, e.g. to a different function

# At some other location, recover checkpoint object from dict
checkpoint = Checkpoint.from_dict(checkpoint_dict)

# Convert into a directory again
checkpoint.to_directory("other_directory")

# We can now use MLflow to re-load the model
clf = mlflow.sklearn.load_model("other_directory")

# It is guaranteed that the original data was recovered
assert isinstance(clf, RandomForestClassifier)

Checkpoints can be pickled and sent to remote processes. Please note that checkpoints pointing to local directories will be pickled as data representations, so the full checkpoint data will be contained in the checkpoint object. If you want to avoid this, consider passing only the checkpoint directory to the remote task and re-construct your checkpoint object in that function. Note that this will only work if the “remote” task is scheduled on the same node or a node that also has access to the local data path (e.g. on a shared file system like NFS).

Checkpoints pointing to object store references will keep the object reference in tact - this means that these checkpoints cannot be properly deserialized on other Ray clusters or outside a Ray cluster. If you need persistence across clusters, use the to_uri() or to_directory() methods to persist your checkpoints to disk.

classmethod from_bytes(data: bytes) ray.ml.checkpoint.Checkpoint[source]

Create a checkpoint from the given byte string.

Parameters

data (bytes) – Data object containing pickled checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_bytes() bytes[source]

Return Checkpoint serialized as bytes object.

Returns

Bytes object containing checkpoint data.

Return type

bytes

classmethod from_dict(data: dict) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from dictionary.

Parameters

data (dict) – Dictionary containing checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_dict() dict[source]

Return checkpoint data as dictionary.

Returns

Dictionary containing checkpoint data.

Return type

dict

classmethod from_object_ref(obj_ref: <Mock name='mock.ObjectRef' id='140586090734992'>) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from object reference.

Parameters

obj_ref (ray.ObjectRef) – ObjectRef pointing to checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_object_ref() <Mock name='mock.ObjectRef' id='140586090734992'>[source]

Return checkpoint data as object reference.

Returns

ObjectRef pointing to checkpoint data.

Return type

ray.ObjectRef

classmethod from_directory(path: str) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from directory.

Parameters

path (str) – Directory containing checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_directory(path: Optional[str] = None) str[source]

Write checkpoint data to directory.

Parameters

path (str) – Target directory to restore data in.

Returns

Directory containing checkpoint data.

Return type

str

classmethod from_uri(uri: str) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from location URI (e.g. cloud storage).

Valid locations currently include AWS S3 (s3://), Google cloud storage (gs://), HDFS (hdfs://), and local files (file://).

Parameters

uri (str) – Source location URI to read data from.

Returns

checkpoint object.

Return type

Checkpoint

to_uri(uri: str) str[source]

Write checkpoint data to location URI (e.g. cloud storage).

Parameters

uri (str) – Target location URI to write data to.

Returns

Cloud location containing checkpoint data.

Return type

str

get_internal_representation() Tuple[str, Union[dict, str, <Mock name='mock.ObjectRef' id='140586090734992'>]][source]

Return tuple of (type, data) for the internal representation.

The internal representation can be used e.g. to compare checkpoint objects for equality or to access the underlying data storage.

The returned type is a string and one of ["local_path", "data_dict", "uri", "object_ref"].

The data is the respective data value.

Note that paths converted from file://... will be returned as local_path (without the file:// prefix) and not as uri.

Returns:

Tuple of type and data.

DeveloperAPI: This API may change across minor Ray releases.

class ray.ml.result.Result(metrics: Any, checkpoint: Optional[ray.ml.checkpoint.Checkpoint], error: Optional[Exception] = None)[source]

The final result of a ML training run or a Tune trial.

This is the class produced by Trainer.fit() or Tuner.fit() (through ResultGrid). It contains a checkpoint, which can be used for resuming training and for creating a Predictor object. It also contains a metrics object describing training metrics. error is included so that non successful runs and trials can be represented as well.

The constructor is a private API.

Parameters
  • metrics – The final metrics as reported by an Trainable.

  • checkpoint – The final checkpoint of the Trainable

  • error – The execution error of the Trainable run, if the trial finishes in error.

Configs

class ray.ml.config.ScalingConfigDataClass(trainer_resources: Optional[Dict] = None, num_workers: Optional[int] = None, use_gpu: bool = False, resources_per_worker: Optional[Dict] = None, placement_strategy: str = 'PACK')[source]

Configuration for scaling training.

This is the schema for the scaling_config dict, and after beta, this will be the actual representation for Scaling config objects.

trainer_resources: Resources to allocate for the trainer. If none is provided,

will default to 1 CPU.

num_workers: The number of workers (Ray actors) to launch.

Each worker will reserve 1 CPU by default. The number of CPUs reserved by each worker can be overridden with the resources_per_worker argument.

use_gpu: If True, training will be done on GPUs (1 per worker).

Defaults to False. The number of GPUs reserved by each worker can be overridden with the resources_per_worker argument.

resources_per_worker: If specified, the resources

defined in this Dict will be reserved for each worker. The CPU and GPU keys (case-sensitive) can be defined to override the number of CPU/GPUs used by each worker.

placement_strategy: The placement strategy to use for the

placement group of the Ray actors. See Placement Group Strategies for the possible options.

property num_cpus_per_worker

The number of CPUs to set per worker.

property num_gpus_per_worker

The number of GPUs to set per worker.

property additional_resources_per_worker

Resources per worker, not including CPU or GPU resources.

as_placement_group_factory() ray.tune.utils.placement_groups.PlacementGroupFactory[source]

Returns a PlacementGroupFactory to specify resources for Tune.

class ray.ml.config.FailureConfig[source]

Configuration related to failure handling of each run/trial.

Parameters

max_failures – Tries to recover a run at least this many times. Will recover from the latest checkpoint if present. Setting to -1 will lead to infinite recovery retries. Setting to 0 will disable retries. Defaults to 0.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

class ray.ml.config.RunConfig(name: Optional[str] = None, local_dir: Optional[str] = None, callbacks: Optional[List[ray.tune.callback.Callback]] = None, failure: Optional[ray.ml.config.FailureConfig] = None)[source]

Runtime configuration for individual trials that are run.

This contains information that applies to individual runs of Trainable classes. This includes both running a Trainable by itself or running a hyperparameter tuning job on top of a Trainable (applies to each trial).

Parameters
  • name – Name of the trial or experiment. If not provided, will be deduced from the Trainable.

  • local_dir – Local dir to save training results to. Defaults to ~/ray_results.

  • callbacks – Callbacks to invoke. Refer to ray.tune.callback.Callback for more info.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.