AIR API

Components

Preprocessors

class ray.ml.preprocessor.Preprocessor[source]

Implements an ML preprocessing operation.

Preprocessors are stateful objects that can be fitted against a Dataset and used to transform both local data batches and distributed datasets. For example, a Normalization preprocessor may calculate the mean and stdev of a field during fitting, and uses these attributes to implement its normalization transform.

class FitStatus(value)[source]

The fit status of preprocessor.

transform_stats() Optional[str][source]

Return Dataset stats for the most recent transform call, if any.

TODO(ekl) we should also be able to provide stats for fit().

fit(dataset: ray.data.dataset.Dataset) ray.ml.preprocessor.Preprocessor[source]

Fit this Preprocessor to the Dataset.

Fitted state attributes will be directly set in the Preprocessor.

Calling it more than once will overwrite all previously fitted state: preprocessor.fit(A).fit(B) is equivalent to preprocessor.fit(B).

Parameters

dataset – Input dataset.

Returns

The fitted Preprocessor with state attributes.

Return type

Preprocessor

fit_transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Fit this Preprocessor to the Dataset and then transform the Dataset.

Calling it more than once will overwrite all previously fitted state: preprocessor.fit_transform(A).fit_transform(B) is equivalent to preprocessor.fit_transform(B).

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Transform the given dataset.

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

Raises

PreprocessorNotFittedException, if fit is not called yet.

transform_batch(df: Union[pd.DataFrame, np.ndarray]) Union[pd.DataFrame, np.ndarray][source]

Transform a single batch of data.

Parameters

df – Input data batch.

Returns

The transformed data batch.

Return type

DataBatchType

class ray.ml.preprocessors.BatchMapper(fn: Callable[[pandas.DataFrame], pandas.DataFrame])[source]

Bases: ray.ml.preprocessor.Preprocessor

Apply fn to batches of records of given dataset.

This is meant to be generic and supports low level operation on records. One could easily leverage this preprocessor to achieve operations like adding a new column or modifying a column in place.

Parameters

fn – The udf function for batch operation.

class ray.ml.preprocessors.Categorizer(columns: Union[List[str], Dict[str, Optional[pandas.core.dtypes.dtypes.CategoricalDtype]]])[source]

Bases: ray.ml.preprocessor.Preprocessor

Transform Dataset columns to Categorical data type.

Note that in case of automatic inferrence, you will most likely want to run this preprocessor on the entire dataset before splitting it (e.g. into train and test sets), so that all of the categories are inferred. There is no risk of data leakage when using this preprocessor.

Parameters

columns – The columns whose data type to change. Can be either a list of columns, in which case the categories will be inferred automatically from the data, or a dict of column:pd.CategoricalDtype or None - if specified, the dtype will be applied, and if not, it will be automatically inferred.

class ray.ml.preprocessors.CountVectorizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None, max_features: Optional[int] = None)[source]

Bases: ray.ml.preprocessor.Preprocessor

Tokenize string columns and convert into token columns.

The created columns will have names in the format {column_name}_{token}.

Token features will be sorted by count in descending order.

Parameters
  • columns – The columns that will individually be tokenized.

  • tokenization_fn – The tokenization function to use. If not specified, a simple string.split(" ") will be used.

  • max_features – If specified, limit the number of tokens. The tokens with the largest counts will be kept.

class ray.ml.preprocessors.Chain(*preprocessors: ray.ml.preprocessor.Preprocessor)[source]

Bases: ray.ml.preprocessor.Preprocessor

Chain multiple Preprocessors into a single Preprocessor.

Calling fit will invoke fit_transform on the input preprocessors, so that one preprocessor can fit based on columns/values produced by the transform of a preceding preprocessor.

Parameters

preprocessors – The preprocessors that should be executed sequentially.

fit_transform(ds: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Fit this Preprocessor to the Dataset and then transform the Dataset.

Calling it more than once will overwrite all previously fitted state: preprocessor.fit_transform(A).fit_transform(B) is equivalent to preprocessor.fit_transform(B).

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

class ray.ml.preprocessors.FeatureHasher(columns: List[str], num_features: int)[source]

Bases: ray.ml.preprocessor.Preprocessor

Hash the features of the specified columns.

The created columns will have names in the format hash_{column_names}_{hash}, e.g. hash_column1_column2_0, hash_column1_column2_1, …

Note: Currently sparse matrices are not supported. Therefore, it is recommended to not use a large num_features.

Parameters
  • columns – The columns of features that should be projected onto a single hashed feature vector.

  • num_features – The size of the hashed feature vector.

class ray.ml.preprocessors.HashingVectorizer(columns: List[str], num_features: int, tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]

Bases: ray.ml.preprocessor.Preprocessor

Tokenize and hash string columns into token hash columns.

The created columns will have names in the format hash_{column_name}_{hash}.

Parameters
  • columns – The columns that will individually be hashed.

  • num_features – The size of each hashed feature vector.

  • tokenization_fn – The tokenization function to use. If not specified, a simple string.split(" ") will be used.

class ray.ml.preprocessors.LabelEncoder(label_column: str)[source]

Bases: ray.ml.preprocessor.Preprocessor

Encode values within a label column as ordered integer values.

Currently, order within a column is based on the values from the fitted dataset in sorted order.

Transforming values not included in the fitted dataset will be encoded as None.

Parameters

label_column – The label column that will be encoded.

class ray.ml.preprocessors.MaxAbsScaler(columns: List[str])[source]

Bases: ray.ml.preprocessor.Preprocessor

Scale values within columns based on the absolute max value.

For each column, each value will be transformed to value / abs_max, where abs_max is calculated from the fitted dataset.

When transforming the fitted dataset, transformed values will be in the range [-1, 1].

Parameters

columns – The columns that will individually be scaled.

class ray.ml.preprocessors.MinMaxScaler(columns: List[str])[source]

Bases: ray.ml.preprocessor.Preprocessor

Scale values within columns based on min and max values.

For each column, each value will be transformed to (value - min) / (max - min), where min and max are calculated from the fitted dataset.

When transforming the fitted dataset, transformed values will be in the range [0, 1].

Parameters

columns – The columns that will individually be scaled.

class ray.ml.preprocessors.Normalizer(columns: List[str], norm='l2')[source]

Bases: ray.ml.preprocessor.Preprocessor

Normalize each record to have unit norm.

Supports the following normalization types:
  • l1: Sum of the absolute values.

  • l2: Square root of the sum of the squared values.

  • max: Maximum value.

Parameters
  • columns – The columns that in combination define the record to normalize.

  • norm – “l1”, “l2”, or “max”. Defaults to “l2”

class ray.ml.preprocessors.OneHotEncoder(columns: List[str], limit: Optional[Dict[str, int]] = None)[source]

Bases: ray.ml.preprocessor.Preprocessor

Encode columns as new columns using one-hot encoding.

The transformed dataset will have a new column in the form {column}_{value} for each of the values from the fitted dataset. The value of a column will be set to 1 if the value matches, otherwise 0.

Transforming values not included in the fitted dataset or not among the top popular values (see limit) will result in all of the encoded column values being 0.

Example:

ohe = OneHotEncoder(
    columns=[
        "trip_start_hour",
        "trip_start_day",
        "trip_start_month",
        "dropoff_census_tract",
        "pickup_community_area",
        "dropoff_community_area",
        "payment_type",
        "company",
    ],
    limit={
        "dropoff_census_tract": 25,
        "pickup_community_area": 20,
        "dropoff_community_area": 20,
        "payment_type": 2,
        "company": 7,
    },
)
Parameters
  • columns – The columns that will individually be encoded.

  • limit – If set, only the top “limit” number of most popular values become categorical variables. The less frequent ones will result in all the encoded column values being 0. This is a dict of column to its corresponding limit. The column in this dictionary has to be in columns.

class ray.ml.preprocessors.OrdinalEncoder(columns: List[str])[source]

Bases: ray.ml.preprocessor.Preprocessor

Encode values within columns as ordered integer values.

Currently, order within a column is based on the values from the fitted dataset in sorted order.

Transforming values not included in the fitted dataset will be encoded as None.

Parameters

columns – The columns that will individually be encoded.

class ray.ml.preprocessors.PowerTransformer(columns: List[str], power: float, method: str = 'yeo-johnson')[source]

Bases: ray.ml.preprocessor.Preprocessor

Apply power function to make data more normally distributed.

See https://en.wikipedia.org/wiki/Power_transform.

Supports the following methods:
  • Yeo-Johnson (positive and negative numbers)

  • Box-Cox (positive numbers only)

Currently, this requires the user to specify the power parameter. In the future, an optimal value can be determined in fit().

Parameters
  • columns – The columns that will individually be transformed.

  • power – The power parameter which is used as the exponent.

  • method – Supports “yeo-johnson” and “box-cox”. Defaults to “yeo-johnson”.

class ray.ml.preprocessors.RobustScaler(columns: List[str], quantile_range: Tuple[float, float] = (0.25, 0.75))[source]

Bases: ray.ml.preprocessor.Preprocessor

Scale values within columns based on their quantile range.

For each column, each value will be transformed to (value - median) / (high_quantile - low_quantile), where median , high_quantile, and low_quantile are calculated from the fitted dataset.

Parameters
  • columns – The columns that will be scaled individually.

  • quantile_range – A tuple that defines the lower and upper quantile to scale to. Defaults to the 1st and 3rd quartiles: (0.25, 0.75).

class ray.ml.preprocessors.SimpleImputer(columns: List[str], strategy: str = 'mean', fill_value: Optional[Union[str, numbers.Number]] = None)[source]

Bases: ray.ml.preprocessor.Preprocessor

Populate missing values within columns.

Parameters
  • columns – The columns that will individually be imputed.

  • strategy – The strategy to compute the value to impute. - “mean”: The mean of the column (numeric only). - “most_frequent”: The most used value of the column (string or numeric). - “constant”: The value of fill_value (string or numeric).

  • fill_value – The value to use when strategy is “constant”.

class ray.ml.preprocessors.StandardScaler(columns: List[str], ddof=0)[source]

Bases: ray.ml.preprocessor.Preprocessor

Scale values within columns based on mean and standard deviation.

For each column, each value will be transformed to (value-mean)/std, where mean and std are calculated from the fitted dataset.

Parameters
  • columns – The columns that will individually be scaled.

  • ddof – The delta degrees of freedom used to calculate standard deviation.

class ray.ml.preprocessors.Tokenizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]

Bases: ray.ml.preprocessor.Preprocessor

Tokenize string columns.

Each string entry will be replaced with a list of tokens.

Parameters
  • columns – The columns that will individually be tokenized.

  • tokenization_fn – The tokenization function to use. If not specified, a simple string.split(" ") will be used.

ray.ml.train_test_split(dataset: ray.data.dataset.Dataset, test_size: Union[int, float], *, shuffle: bool = False, seed: Optional[int] = None) Tuple[ray.data.dataset.Dataset, ray.data.dataset.Dataset][source]

Split a Dataset into train and test subsets.

Example

import ray
from ray.ml import train_test_split

ds = ray.data.range(8)
train, test = train_test_split(ds, test_size=0.25)
print(train.take())  # [0, 1, 2, 3, 4, 5]
print(test.take())  # [6, 7]
Parameters
  • dataset – Dataset to split.

  • test_size – If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. The train split will always be the compliment of the test split.

  • shuffle – Whether or not to globally shuffle the dataset before splitting. Defaults to False. This may be a very expensive operation with large datasets.

  • seed – Fix the random seed to use for shuffle, otherwise one will be chosen based on system randomness. Ignored if shuffle=False.

Returns

Train and test subsets as two Datasets.

Trainer

class ray.ml.trainer.Trainer(*args, **kwargs)[source]

Defines interface for distributed training on Ray.

Note: The base Trainer class cannot be instantiated directly. Only one of its subclasses can be used.

How does a trainer work?

  • First, initialize the Trainer. The initialization runs locally, so heavyweight setup should not be done in __init__.

  • Then, when you call trainer.fit(), the Trainer is serialized and copied to a remote Ray actor. The following methods are then called in sequence on the remote actor.

  • trainer.setup(): Any heavyweight Trainer setup should be specified here.

  • trainer.preprocess_datasets(): The provided ray.data.Dataset are preprocessed with the provided ray.ml.preprocessor.

  • trainer.train_loop(): Executes the main training logic.

  • Calling trainer.fit() will return a ray.result.Result object where you can access metrics from your training run, as well as any checkpoints that may have been saved.

How do I create a new Trainer?

Subclass ray.train.Trainer, and override the training_loop method, and optionally setup.

import torch

from ray.ml.train import Trainer
from ray import tune


class MyPytorchTrainer(Trainer):
    def setup(self):
        self.model = torch.nn.Linear(1, 1)
        self.optimizer = torch.optim.SGD(
            self.model.parameters(), lr=0.1)

    def training_loop(self):
        # You can access any Trainer attributes directly in this method.
        # self.datasets["train"] has already been
        # preprocessed by self.preprocessor
        dataset = self.datasets["train"]

        torch_ds = dataset.to_torch(label_column="y")
        loss_fn = torch.nn.MSELoss()

        for epoch_idx in range(10):
            loss = 0
            num_batches = 0
            for X, y in iter(torch_ds):
                # Compute prediction error
                pred = self.model(X)
                batch_loss = loss_fn(pred, y.float())

                # Backpropagation
                self.optimizer.zero_grad()
                batch_loss.backward()
                self.optimizer.step()

                loss += batch_loss.item()
                num_batches += 1
            loss /= num_batches

            # Use Tune functions to report intermediate
            # results.
            tune.report(loss=loss, epoch=epoch_idx)

How do I use an existing Trainer or one of my custom Trainers?

Initialize the Trainer, and call Trainer.fit()

import ray
train_dataset = ray.data.from_items(
    [{"x": i, "y": i} for i in range(3)])
my_trainer = MyPytorchTrainer(datasets={"train": train_dataset})
result = my_trainer.fit()
Parameters
  • scaling_config – Configuration for how to scale training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note: this method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note: This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

abstract training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note: this method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (tune.report() and tune.save_checkpoint()) inside this training loop.

Example

fit() ray.ml.result.Result[source]

Runs training.

Returns:

A Result object containing the training result.

Raises:

TrainingFailedError: If any failures during the execution of self.as_trainable().

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

as_trainable() Type[ray.tune.trainable.Trainable][source]

Convert self to a tune.Trainable class.

class ray.ml.train.integrations.xgboost.XGBoostTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel XGBoost training.

This Trainer runs the XGBoost training loop in a distributed manner using multiple Ray Actors.

Example

import ray

from ray.ml.train.integrations.xgboost import XGBoostTrainer

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = XGBoostTrainer(
    label_column="y",
    params={"objective": "reg:squarederror"},
    scaling_config={"num_workers": 3},
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – XGBoost training parameters. Refer to XGBoost documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to xgboost.DMatrix objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to xgboost.train() function.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.ml.train.integrations.xgboost.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) Tuple[xgboost.core.Booster, Optional[ray.ml.preprocessor.Preprocessor]][source]

Load a Checkpoint from XGBoostTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a XGBoostTrainer run.

Returns

The model and AIR preprocessor contained within.

class ray.ml.train.integrations.lightgbm.LightGBMTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel LightGBM training.

This Trainer runs the LightGBM training loop in a distributed manner using multiple Ray Actors.

If you would like to take advantage of LightGBM’s built-in handling for features with the categorical data type, consider using the Categorizer preprocessor to set the dtypes in the dataset.

Example

import ray

from ray.ml.train.integrations.lightgbm import LightGBMTrainer

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = LightGBMTrainer(
    label_column="y",
    params={"objective": "regression"},
    scaling_config={"num_workers": 3},
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – LightGBM training parameters passed to lightgbm.train(). Refer to LightGBM documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to lightgbm.Dataset objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to lightgbm.train() function.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.ml.train.integrations.lightgbm.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) Tuple[lightgbm.basic.Booster, Optional[ray.ml.preprocessor.Preprocessor]][source]

Load a Checkpoint from LightGBMTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a LightGBMTrainer run.

Returns

The model and AIR preprocessor contained within.

class ray.ml.train.integrations.tensorflow.TensorflowTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Tensorflow training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary TensorFlow process group already configured for distributed TensorFlow training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by ray.train.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and ray.train.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray Train function utils.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging.
    train.report(...)

    # Checkpoints the provided args as restorable state.
    train.save_checkpoint(...)

    # Returns dict of last saved checkpoint.
    train.load_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    train.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    train.get_world_size()

    # Returns the rank of this worker.
    train.get_world_rank()

    # Returns the rank of the worker on the current node.
    train.get_local_rank()

You can also use any of the TensorFlow specific function utils.

def train_loop_per_worker():
    # Turns off autosharding for a dataset.
    # You should use this if you are doing
    # `train.get_dataset_shard(...).to_tf(...)`
    # as the data will be already sharded.
    train.tensorflow.prepare_dataset_shard(...)

To save a model to use for the TensorflowPredictor, you must save it under the “model” kwarg in train.save_checkpoint().

Example:

import tensorflow as tf

import ray
from ray import train
from ray.train.tensorflow import prepare_dataset_shard

from ray.ml.train.integrations.tensorflow import TensorflowTrainer

input_size = 1

def build_model():
    # toy neural network : 1-layer
    return tf.keras.Sequential(
        [tf.keras.layers.Dense(
            1, activation="linear", input_shape=(input_size,))]
    )

def train_loop_for_worker(config):
    dataset_shard = train.get_dataset_shard("train")
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = build_model()
        model.compile(
            optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    for epoch in range(config["num_epochs"]):
        tf_dataset = prepare_dataset_shard(
            dataset_shard.to_tf(
                label_column="y",
                output_signature=(
                    tf.TensorSpec(shape=(None, 1), dtype=tf.float32),
                    tf.TensorSpec(shape=(None), dtype=tf.float32),
                ),
                batch_size=1,
            )
        )
        model.fit(tf_dataset)
        train.save_checkpoint(
            epoch=epoch, model=model.get_weights())

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(scaling_config={"num_workers": 3},
    datasets={"train": train_dataset},
    train_loop_config={"num_epochs": 2})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • tensorflow_config – Configuration for setting up the TensorFlow backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.ml.train.integrations.tensorflow.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Union[Callable[[], <MagicMock name='mock.keras.Model' id='140040349448656'>], Type[<MagicMock name='mock.keras.Model' id='140040349448656'>], <MagicMock name='mock.keras.Model' id='140040349448656'>]) Tuple[<MagicMock name='mock.keras.Model' id='140040349448656'>, Optional[ray.ml.preprocessor.Preprocessor]][source]

Load a Checkpoint from TensorflowTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TensorflowTrainer run.

  • model – A callable that returns a TensorFlow Keras model to use, or an instantiated model. Model weights will be loaded from the checkpoint.

Returns

The model with set weights and AIR preprocessor contained within.

class ray.ml.train.integrations.torch.TorchTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel PyTorch training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary torch process group already configured for distributed PyTorch training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by ray.train.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and ray.train.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray Train function utils.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging.
    train.report(...)

    # Checkpoints the provided args as restorable state.
    train.save_checkpoint(...)

    # Returns dict of last saved checkpoint.
    train.load_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    train.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    train.get_world_size()

    # Returns the rank of this worker.
    train.get_world_rank()

    # Returns the rank of the worker on the current node.
    train.get_local_rank()

You can also use any of the Torch specific function utils.

def train_loop_per_worker():
    # Prepares model for distribted training by wrapping in
    # `DistributedDataParallel` and moving to correct device.
    train.torch.prepare_model(...)

    # Configures the dataloader for distributed training by adding a
    # `DistributedSampler`.
    # You should NOT use this if you are doing
    # `train.get_dataset_shard(...).to_torch(...)`
    train.torch.prepare_data_loader(...)

    # Returns the current torch device.
    train.torch.get_device()

To save a model to use for the TorchPredictor, you must save it under the “model” kwarg in train.save_checkpoint().

Example

import torch
import torch.nn as nn

import ray
from ray import train
from ray.ml.train.integrations.torch import TorchTrainer

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    dataset_shard = train.get_dataset_shard("train")
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    model = train.torch.prepare_model(model)

    for epoch in range(num_epochs):
        for batch in iter(dataset_shard.to_torch(batch_size=32)):
            output = model(input)
            loss = loss_fn(output, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        train.save_checkpoint(model=model.state_dict())

train_dataset = ray.data.from_items([1, 2, 3])
scaling_config = {"num_workers": 3}
# If using GPUs, use the below scaling config instead.
# scaling_config = {"num_workers": 3, "use_gpu": True}
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.ml.train.integrations.torch.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None) Tuple[torch.nn.modules.module.Module, Optional[ray.ml.preprocessor.Preprocessor]][source]

Load a Checkpoint from TorchTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TorchTrainer run.

  • model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model.

Returns

The model with set weights and AIR preprocessor contained within.

class ray.ml.train.integrations.huggingface.HuggingFaceTrainer(*args, **kwargs)[source]

Bases: ray.ml.train.integrations.torch.torch_trainer.TorchTrainer

A Trainer for data parallel HuggingFace Transformers on PyTorch training.

This Trainer runs the transformers.Trainer.train() method on multiple Ray Actors. The training is carried out in a distributed fashion through PyTorch DDP. These actors already have the necessary torch process group already configured for distributed PyTorch training.

The training function ran on every Actor will first run the specified trainer_init_per_worker function to obtain an instantiated transformers.Trainer object. The trainer_init_per_worker function will have access to preprocessed train and evaluation datsets.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards, with each Actor training on a single shard. All the other datasets will not be split.

Please note that if you use a custom transformers.Trainer subclass, the get_train_dataloader method will be wrapped around to disable sharding by transformers.IterableDatasetShard, as the dataset will already be sharded on the Ray AIR side.

HuggingFace loggers will be automatically disabled, and the local_rank argument in TrainingArguments will be automatically set. Please note that if you want to use CPU training, you will need to set the no_cuda argument in TrainingArguments manually - otherwise, an exception (segfault) may be thrown. Furthermore, ‘steps’ value for save_strategy, logging_strategy and evaluation_strategy is not yet supported.

This Trainer requires transformers>=4.19.0 package.

Example

# Based on
# huggingface/notebooks/examples/language_modeling_from_scratch.ipynb

# Hugging Face imports
from datasets import load_dataset
import transformers
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer

import ray
from ray.ml.train.integrations.huggingface import HuggingFaceTrainer

model_checkpoint = "gpt2"
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
block_size = 128

datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)

def tokenize_function(examples):
    return tokenizer(examples["text"])

tokenized_datasets = datasets.map(
    tokenize_function, batched=True, num_proc=1, remove_columns=["text"]
)

def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {
        k: sum(examples[k], []) for k in examples.keys()
    }
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model
    # supported it.
    # instead of this drop, you can customize this part to your needs.
    total_length = (total_length // block_size) * block_size
    # Split by chunks of max_len.
    result = {
        k: [
            t[i : i + block_size]
            for i in range(0, total_length, block_size)
        ]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result

lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    batch_size=1000,
    num_proc=1,
)
ray_train_ds = ray.data.from_huggingface(lm_datasets["train"])
ray_evaluation_ds = ray.data.from_huggingface(
    lm_datasets["evaluation"]
)

def trainer_init_per_worker(train_dataset, eval_dataset, **config):
    model_config = AutoConfig.from_pretrained(model_checkpoint)
    model = AutoModelForCausalLM.from_config(model_config)
    args = transformers.TrainingArguments(
        output_dir=f"{model_checkpoint}-wikitext2",
        evaluation_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
    )
    return transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )

scaling_config = {"num_workers": 3}
# If using GPUs, use the below scaling config instead.
# scaling_config = {"num_workers": 3, "use_gpu": True}
trainer = HuggingFaceTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "evaluation": ray_evaluation_ds},
)
result = trainer.fit()
Parameters
  • trainer_init_per_worker – The function that returns an instantiated transformers.Trainer object and takes in the following arguments: train Torch.Dataset, optional evaluation Torch.Dataset and config as kwargs. The Torch Datasets are automatically created by converting the Ray Datasets internally before they are passed into the function.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset and (optionally) key “evaluation” to denote the evaluation dataset. Can only contain a training dataset and up to one extra dataset to be used for evaluation. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • trainer_init_config – Configurations to pass into trainer_init_per_worker as kwargs.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer. Same as in TorchTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note: this method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

as_trainable() Type[ray.tune.trainable.Trainable][source]

Convert self to a tune.Trainable class.

ray.ml.train.integrations.huggingface.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Union[Type[<MagicMock name='mock.modeling_utils.PreTrainedModel' id='140038625055632'>], torch.nn.modules.module.Module], tokenizer: Optional[Type[<MagicMock name='mock.PreTrainedTokenizer' id='140038624607184'>]] = None, *, tokenizer_kwargs: Optional[Dict[str, Any]] = None, **pretrained_model_kwargs) Tuple[Union[<MagicMock name='mock.modeling_utils.PreTrainedModel' id='140038625055632'>, torch.nn.modules.module.Module], <MagicMock name='mock.training_args.TrainingArguments' id='140038625177040'>, Optional[<MagicMock name='mock.PreTrainedTokenizer' id='140038624607184'>], Optional[ray.ml.preprocessor.Preprocessor]][source]

Load a Checkpoint from HuggingFaceTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a HuggingFaceTrainer run.

  • model – Either a transformers.PreTrainedModel class (eg. AutoModelForCausalLM), or a PyTorch model to load the weights to. This should be the same model used for training.

  • tokenizer – A transformers.PreTrainedTokenizer class to load the model tokenizer to. If not specified, the tokenizer will not be loaded. Will throw an exception if specified, but no tokenizer was found in the checkpoint.

  • tokenizer_kwargs – Dict of kwargs to pass to tokenizer.from_pretrained call. Ignored if tokenizer is None.

  • **pretrained_model_kwargs – Kwargs to pass to mode.from_pretrained call. Ignored if model is not a transformers.PreTrainedModel class.

Returns

The model, TrainingArguments, tokenizer and AIR preprocessor contained within. Those can be used to initialize a transformers.Trainer object locally.

class ray.ml.train.integrations.sklearn.SklearnTrainer(*args, **kwargs)[source]

Bases: ray.ml.trainer.Trainer

A Trainer for scikit-learn estimator training.

This Trainer runs the fit method of the given estimator in a non-distributed manner on a single Ray Actor.

By default, the n_jobs (or thread_count) estimator parameters will be set to match the number of CPUs assigned to the Ray Actor. This behavior can be disabled by setting set_estimator_cpus=False.

If you wish to use GPU-enabled estimators (eg. cuML), make sure to set "GPU": 1 in scaling_config.trainer_resources.

The results are reported all at once and not in an iterative fashion. No checkpointing is done during training. This may be changed in the future.

Example

import ray

from ray.ml.train.integrations.sklearn import SklearnTrainer
from sklearn.ensemble import RandomForestRegressor

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = SklearnTrainer(
    sklearn_estimator=RandomForestRegressor,
    label_column="y",
    scaling_config={
        "trainer_resources": {"CPU": 4}
    },
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • estimator – A scikit-learn compatible estimator to use.

  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting separate metrics.

  • label_column – Name of the label column. A column with this name must be present in the training dataset. If None, no validation will be performed.

  • params – Optional dict of params to be set on the estimator before fitting. Useful for hyperparameter tuning.

  • scoring

    Strategy to evaluate the performance of the model on the validation sets and for cross-validation. Same as in sklearn.model_selection.cross_validation. If scoring represents a single score, one can use:

    • a single string;

    • a callable that returns a single value.

    If scoring represents multiple scores, one can use:

    • a list or tuple of unique strings;

    • a callable returning a dictionary where the keys are the metric names and the values are the metric scores;

    • a dictionary with metric names as keys and callables a values.

  • cv

    Determines the cross-validation splitting strategy. If specified, cross-validation will be run on the train dataset, in addition to computing metrics for validation datasets. Same as in sklearn.model_selection.cross_validation, with the exception of None. Possible inputs for cv are:

    • None, to skip cross-validation.

    • int, to specify the number of folds in a (Stratified)KFold,

    • CV splitter,

    • An iterable yielding (train, test) splits as arrays of indices.

    For int/None inputs, if the estimator is a classifier and y is either binary or multiclass, StratifiedKFold is used. In all other cases, KFold is used. These splitters are instantiated with shuffle=False so the splits will be the same across calls.

    If you provide a “cv_groups” column in the train dataset, it will be used as group labels for the samples used while splitting the dataset into train/test set. Only used in conjunction with a “Group” cv instance (e.g., GroupKFold). This corresponds to the groups argument in sklearn.model_selection.cross_validation.

  • return_train_score_cv – Whether to also return train scores during cross-validation. Ignored if cv is None.

  • parallelize_cv – If set to True, will parallelize cross-validation instead of the estimator. If set to None, will detect if the estimator has any parallelism-related params (n_jobs or thread_count) and parallelize cross-validation if there are none. If False, will not parallelize cross-validation. Cannot be set to True if there are any GPUs assigned to the trainer. Ignored if cv is None.

  • set_estimator_cpus – If set to True, will automatically set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to match the number of available CPUs.

  • scaling_config – Configuration for how to scale training. Only the trainer_resources key can be provided, as the training is not distributed.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • **fit_params – Additional kwargs passed to estimator.fit() method.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note: this method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (tune.report() and tune.save_checkpoint()) inside this training loop.

Example

ray.ml.train.integrations.sklearn.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) Tuple[sklearn.base.BaseEstimator, Optional[ray.ml.preprocessor.Preprocessor]][source]

Load a Checkpoint from SklearnTrainer.

Parameters

checkpoint – The checkpoint to load the estimator and preprocessor from. It is expected to be from the result of a SklearnTrainer run.

Returns

The estimator and AIR preprocessor contained within.

class ray.ml.train.data_parallel_trainer.DataParallelTrainer(*args, **kwargs)[source]

Bases: ray.ml.trainer.Trainer

A Trainer for data parallel training.

You should subclass this Trainer if your Trainer follows SPMD (single program, multiple data) programming paradigm - you want multiple processes to run the same function, but on different data.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by ray.train.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and ray.train.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray Train function utils.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging.
    train.report(...)

    # Checkpoints the provided args as restorable state.
    train.save_checkpoint(...)

    # Returns dict of last saved checkpoint.
    train.load_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    train.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    train.get_world_size()

    # Returns the rank of this worker.
    train.get_world_rank()

    # Returns the rank of the worker on the current node.
    train.get_local_rank()

How do I use ``DataParallelTrainer`` or any of its subclasses?

Example:

import ray
from ray import train

def train_loop_for_worker():
    dataset_shard_for_this_worker = train.get_dataset_shard("train")

    assert len(dataset_shard_for_this_worker) == 1

train_dataset = ray.data.from_items([1, 2, 3])
assert len(train_dataset) == 3
trainer = DataParallelTrainer(scaling_config={"num_workers": 3},
    datasets={"train": train_dataset})
result = trainer.fit()

How do I develop on top of ``DataParallelTrainer``?

In many cases, using DataParallelTrainer directly is sufficient to execute functions on multiple actors.

However, you may want to subclass DataParallelTrainer and create a custom Trainer for the following 2 use cases:

  • Use Case 1: You want to do data parallel training, but want to have a predefined training_loop_per_worker.

  • Use Case 2: You want to implement a custom Training backend that automatically handles additional setup or teardown logic on each actor, so that the users of this new trainer do not have to implement this logic. For example, a TensorflowTrainer can be built on top of DataParallelTrainer that automatically handles setting the proper environment variables for distributed Tensorflow on each actor.

For 1, you can set a predefined training loop in __init__

from ray.ml.train.data_parallel_trainer import DataParallelTrainer

class MyDataParallelTrainer(DataParallelTrainer):
    def __init__(self, *args, **kwargs):
        predefined_train_loop_per_worker = lambda: 1
        super().__init__(predefined_train_loop_per_worker, *args, **kwargs)

For 2, you can implement the ray.train.Backend and ray.train.BackendConfig interfaces.

from dataclasses import dataclass
from ray.train.backend import Backend, BackendConfig

class MyBackend(Backend):
    def on_start(self, worker_group, backend_config):
        def set_env_var(env_var_value):
            import os
            os.environ["MY_ENV_VAR"] = env_var_value

        worker_group.execute(set_env_var, backend_config.env_var)

@dataclass
class MyBackendConfig(BackendConfig):
    env_var: str = "default_value"

    def backend_cls(self):
        return MyBackend

class MyTrainer(DataParallelTrainer):
    def __init__(self, train_loop_per_worker, my_backend_config:
        MyBackendConfig, **kwargs):

        super().__init__(
            train_loop_per_worker,
            backend_config=my_backend_config, **kwargs)
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • backend_config – Configuration for setting up a Backend (e.g. Torch, Tensorflow, Horovod) on each worker to enable distributed communication. If no Backend should be set up, then set this to None.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note: this method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (tune.report() and tune.save_checkpoint()) inside this training loop.

Example

class ray.ml.train.gbdt_trainer.GBDTTrainer(*args, **kwargs)[source]

Bases: ray.ml.trainer.Trainer

Common logic for gradient-boosting decision tree (GBDT) frameworks like XGBoost-Ray and LightGBM-Ray.

Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – Framework specific training parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to framework train() function.

DeveloperAPI: This API may change across minor Ray releases.

preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note: This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note: this method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (tune.report() and tune.save_checkpoint()) inside this training loop.

Example

as_trainable() Type[ray.tune.trainable.Trainable][source]

Convert self to a tune.Trainable class.

Tuner

class ray.tune.tuner.Tuner(trainable: Optional[Union[str, Callable, Type[ray.tune.trainable.Trainable], ray.ml.trainer.Trainer]] = None, param_space: Optional[Dict[str, Any]] = None, tune_config: Optional[ray.tune.tune_config.TuneConfig] = None, run_config: Optional[ray.ml.config.RunConfig] = None, _tuner_kwargs: Optional[Dict] = None, _tuner_internal: Optional[ray.tune.impl.tuner_internal.TunerInternal] = None)[source]

Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune.

Parameters
  • trainable – The trainable to be tuned.

  • param_space – Search space of the tuning job. One thing to note is that both preprocessor and dataset can be tuned here.

  • tune_config – Tuning algorithm specific configs. Refer to ray.tune.tune_config.TuneConfig for more info.

  • run_config – Runtime configuration that is specific to individual trials. If passed, this will overwrite the run config passed to the Trainer, if applicable. Refer to ray.ml.config.RunConfig for more info.

Usage pattern:

from sklearn.datasets import load_breast_cancer

from ray import tune
from ray.data import from_pandas
from ray.ml.config import RunConfig
from ray.ml.train.integrations.xgboost import XGBoostTrainer
from ray.tune.tuner import Tuner

def get_dataset():
    data_raw = load_breast_cancer(as_frame=True)
    dataset_df = data_raw["data"]
    dataset_df["target"] = data_raw["target"]
    dataset = from_pandas(dataset_df)
    return dataset

trainer = XGBoostTrainer(
    label_column="target",
    params={},
    datasets={"train": get_dataset()},
)

param_space = {
    "scaling_config": {
        "num_workers": tune.grid_search([2, 4]),
        "resources_per_worker": {
            "CPU": tune.grid_search([1, 2]),
        },
    },
    # You can even grid search various datasets in Tune.
    # "datasets": {
    #     "train": tune.grid_search(
    #         [ds1, ds2]
    #     ),
    # },
    "params": {
        "objective": "binary:logistic",
        "tree_method": "approx",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9),
    },
}
tuner = Tuner(trainable=trainer, param_space=param_space,
    run_config=RunConfig(name="my_tune_run"))
analysis = tuner.fit()

To retry a failed tune run, you can then do

tuner = Tuner.restore(experiment_checkpoint_dir)
tuner.fit()

experiment_checkpoint_dir can be easily located near the end of the console output of your first failed run.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod restore(path: str) ray.tune.tuner.Tuner[source]

Restores Tuner after a previously failed run.

Parameters

path – The path where the previous failed run is checkpointed. This information could be easily located near the end of the console output of previous run. Note: depending on whether ray client mode is used or not, this path may or may not exist on your local machine.

fit() ray.tune.result_grid.ResultGrid[source]

Executes hyperparameter tuning job as configured and returns result.

Failure handling: For the kind of exception that happens during the execution of a trial, one may inspect it together with stacktrace through the returned result grid. See ResultGrid for reference. Each trial may fail up to a certain number. This is configured by RunConfig.FailureConfig.max_failures.

Exception that happens beyond trials will be thrown by this method as well. In such cases, there will be instruction like the following printed out at the end of console output to inform users on how to resume.

Please use tuner = Tuner.restore(“~/ray_results/tuner_resume”) to resume.

Raises

RayTaskError when the exception happens in trainable else TuneError.

class ray.tune.result_grid.ResultGrid(experiment_analysis: ray.tune.analysis.experiment_analysis.ExperimentAnalysis)[source]

A set of Result objects returned from a call to tuner.fit().

You can use it to inspect the trials run as well as obtaining the best result.

The constructor is a private API.

Usage pattern:

result_grid = tuner.fit()
for i in range(len(result_grid)):
    result = result_grid[i]
    if not result.error:
        print(f"Trial finishes successfully with metric {result.metric}.")
    else:
        print(f"Trial errors out with {result.error}.")
best_result = result_grid.get_best_result()
best_checkpoint = best_result.checkpoint
best_metric = best_result.metric

Note that trials of all statuses are included in the final result grid. If a trial is not in terminated state, its latest result and checkpoint as seen by Tune will be provided.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

get_best_result(metric: Optional[str] = None, mode: Optional[str] = None, scope: str = 'last', filter_nan_and_inf: bool = True) ray.ml.result.Result[source]

Get the best result from all the trials run.

Parameters
  • metric – Key for trial info to order on. Defaults to the metric specified in your Tuner’s TuneConfig.

  • mode – One of [min, max]. Defaults to the mode specified in your Tuner’s TuneConfig.

  • scope – One of [all, last, avg, last-5-avg, last-10-avg]. If scope=last, only look at each trial’s final step for metric, and compare across trials based on mode=[min,max]. If scope=avg, consider the simple average over all steps for metric and compare across trials based on mode=[min,max]. If scope=last-5-avg or scope=last-10-avg, consider the simple average over the last 5 or 10 steps for metric and compare across trials based on mode=[min,max]. If scope=all, find each trial’s min/max score for metric based on mode, and compare trials based on mode=[min,max].

  • filter_nan_and_inf – If True (default), NaN or infinite values are disregarded and these trials are never selected as the best trial.

get_dataframe(filter_metric: Optional[str] = None, filter_mode: Optional[str] = None) pandas.core.frame.DataFrame[source]

Return dataframe of all trials with their configs and reported results.

Per default, this returns the last reported results for each trial.

If filter_metric and filter_mode are set, the results from each trial are filtered for this metric and mode. For example, if filter_metric="some_metric" and filter_mode="max", for each trial, every received result is checked, and the one where some_metric is maximal is returned.

Example

result_grid = Tuner.fit(...)

# Get last reported results per trial
df = result_grid.get_dataframe()

# Get best ever reported accuracy per trial
df = result_grid.get_dataframe(metric="accuracy", mode="max")
Parameters
  • filter_metric – Metric to filter best result for.

  • filter_mode – If filter_metric is given, one of ["min", "max"] to specify if we should find the minimum or maximum result.

Returns

Pandas DataFrame with each trial as a row and their results as columns.

Predictors

class ray.ml.predictor.Predictor[source]

Predictors load models from checkpoints to perform inference.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, **kwargs) ray.ml.predictor.Predictor[source]

Create a specific predictor from a checkpoint.

Parameters
  • checkpoint – Checkpoint to load predictor data from.

  • kwargs – Arguments specific to predictor implementations.

Returns

Predictor object.

Return type

Predictor

predict(data: Union[pd.DataFrame, np.ndarray], **kwargs) Union[pd.DataFrame, np.ndarray][source]

Perform inference on a batch of data.

Parameters
  • data – A batch of input data. Either a pandas Dataframe or numpy array.

  • kwargs – Arguments specific to predictor implementations.

Returns

Prediction result.

Return type

DataBatchType

class ray.ml.batch_predictor.BatchPredictor(checkpoint: ray.ml.checkpoint.Checkpoint, predictor_cls: Type[ray.ml.predictor.Predictor], **predictor_kwargs)[source]

Batch predictor class.

Takes a predictor class and a checkpoint and provides an interface to run batch scoring on Ray datasets.

This batch predictor wraps around a predictor class and executes it in a distributed way when calling predict().

checkpoint

Checkpoint loaded by the distributed predictor objects.

predictor_cls

Predictor class reference. When scoring, each scoring worker will create an instance of this class and call predict(batch) on it.

\*\*predictor_kwargs

Keyword arguments passed to the predictor on initialization.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, predictor_cls: Type[ray.ml.predictor.Predictor], **kwargs) ray.ml.batch_predictor.BatchPredictor[source]

Create a specific predictor from a checkpoint.

Parameters
  • checkpoint – Checkpoint to load predictor data from.

  • kwargs – Arguments specific to predictor implementations.

Returns

Predictor object.

Return type

Predictor

predict(data: ray.data.dataset.Dataset, *, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: int = 1, num_gpus_per_worker: int = 0, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) ray.data.dataset.Dataset[source]

Run batch scoring on dataset.

Parameters
  • data – Ray dataset to run batch prediction on.

  • batch_size – Split dataset into batches of this size for prediction.

  • min_scoring_workers – Minimum number of scoring actors.

  • max_scoring_workers – If set, specify the maximum number of scoring actors.

  • num_cpus_per_worker – Number of CPUs to allocate per scoring worker.

  • num_gpus_per_worker – Number of GPUs to allocate per scoring worker.

  • ray_remote_args – Additional resource requirements to request from ray.

  • predict_kwargs – Keyword arguments passed to the predictor’s predict() method.

Returns

Dataset containing scoring results.

class ray.ml.predictors.integrations.xgboost.XGBoostPredictor(model: xgboost.core.Booster, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for XGBoost models.

Parameters
  • model – The XGBoost booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.xgboost.xgboost_predictor.XGBoostPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of XGBoostTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a XGBoostTrainer run.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, dmatrix_kwargs: Optional[Dict[str, Any]] = None, **predict_kwargs) pandas.core.frame.DataFrame[source]

Run inference on data batch.

The data is converted into an XGBoost DMatrix before being inputted to the model.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • dmatrix_kwargs – Dict of keyword arguments passed to xgboost.DMatrix.

  • **predict_kwargs – Keyword arguments passed to xgboost.Booster.predict.

Examples:

import numpy as np
import xgboost as xgb
from ray.ml.predictors.xgboost import XGBoostPredictor

train_X = np.array([[1, 2], [3, 4]])
train_y = np.array([0, 1])

model = xgb.XGBClassifier().fit(train_X, train_y)
predictor = XGBoostPredictor(model=model.get_booster())

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = np.array([[1, 2, 8], [3, 4, 9]])
predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd
import xgboost as xgb
from ray.ml.predictors.xgboost import XGBoostPredictor

train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
train_y = pd.Series([0, 1])

model = xgb.XGBClassifier().fit(train_X, train_y)
predictor = XGBoostPredictor(model=model.get_booster())

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Return type

pd.DataFrame

class ray.ml.predictors.integrations.lightgbm.LightGBMPredictor(model: lightgbm.basic.Booster, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for LightGBM models.

Parameters
  • model – The LightGBM booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.lightgbm.lightgbm_predictor.LightGBMPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of LightGBMTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a LightGBMTrainer run.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) pandas.core.frame.DataFrame[source]

Run inference on data batch.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • **predict_kwargs – Keyword arguments passed to lightgbm.Booster.predict.

Examples:

import numpy as np
import lightgbm as lgbm
from ray.ml.predictors.lightgbm import LightGBMPredictor

train_X = np.array([[1, 2], [3, 4]])
train_y = np.array([0, 1])

model = lgbm.LGBMClassifier().fit(train_X, train_y)
predictor = LightGBMPredictor(model=model.booster_)

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = np.array([[1, 2, 8], [3, 4, 9]])
predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd
import lightgbm as lgbm
from ray.ml.predictors.lightgbm import LightGBMPredictor

train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
train_y = pd.Series([0, 1])

model = lgbm.LGBMClassifier().fit(train_X, train_y)
predictor = LightGBMPredictor(model=model.booster_)

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Return type

pd.DataFrame

class ray.ml.predictors.integrations.tensorflow.TensorflowPredictor(model_definition: Union[Callable[[], <MagicMock name='mock.keras.Model' id='140040349448656'>], Type[<MagicMock name='mock.keras.Model' id='140040349448656'>]], preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None, model_weights: Optional[list] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for TensorFlow models.

Parameters
  • model_definition – A callable that returns a TensorFlow Keras model to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • model_weights – List of weights to use for the model.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model_definition: Union[Callable[[], <MagicMock name='mock.keras.Model' id='140040349448656'>], Type[<MagicMock name='mock.keras.Model' id='140040349448656'>]]) ray.ml.predictors.integrations.tensorflow.tensorflow_predictor.TensorflowPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TensorflowTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TensorflowTrainer run.

  • model_definition – A callable that returns a TensorFlow Keras model to use. Model weights will be loaded from the checkpoint.

predict(data: Union[pd.DataFrame, np.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, dtype: Optional[<MagicMock name='mock.dtypes.DType' id='140038620202576'>] = None) Union[pd.DataFrame, np.ndarray][source]

Run inference on data batch.

The data is converted into a TensorFlow Tensor before being inputted to the model.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • dtype – The TensorFlow dtype to use when creating the TensorFlow tensor. If set to None, then automatically infer the dtype.

Examples:

import numpy as np
import tensorflow as tf
from ray.ml.predictors.tensorflow import TensorflowPredictor

def build_model(self):
    return tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(2,)),
            tf.keras.layers.Dense(1),
        ]
    )

predictor = TensorflowPredictor(model_definition=build_model)

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)
import pandas as pd
import tensorflow as tf
from ray.ml.predictors.tensorflow import TensorflowPredictor

def build_model(self):
    return tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(1,)),
            tf.keras.layers.Dense(1),
        ]
    )

predictor = TensorflowPredictor(model_definition=build_model)

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])

predictions = predictor.predict(data)

# Only use first column as the feature
predictions = predictor.predict(data, feature_columns=["A"])
Returns

Prediction result.

Return type

DataBatchType

class ray.ml.predictors.integrations.torch.TorchPredictor(model: torch.nn.modules.module.Module, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for PyTorch models.

Parameters
  • model – The torch module to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None) ray.ml.predictors.integrations.torch.torch_predictor.TorchPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TorchTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TorchTrainer run.

  • model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[List[str]], List[int], List[List[int]]]] = None, dtype: Optional[torch.dtype] = None, unsqueeze: bool = True) Union[pandas.core.frame.DataFrame, numpy.ndarray][source]

Run inference on data batch.

The data is converted into a torch Tensor before being inputted to the model.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If this arg is a list of lists or a dict of string-list pairs, then the data batch will be converted into a multiple tensors which are then concatenated before feeding into the model. This is useful for multi-input models. If None, then use all columns in data.

  • dtype – The dtypes to use for the tensors. This should match the format of feature_columns, or be a single dtype, in which case it will be applied to all tensors. If None, then automatically infer the dtype.

  • unsqueeze – If set to True, the features tensors will be unsqueezed (reshaped to (N, 1)) before being concatenated into the final features tensor. Otherwise, they will be left as is, that is (N, ). Defaults to True.

Examples:

import numpy as np
import torch
from ray.ml.predictors.integrations.torch import TorchPredictor

model = torch.nn.Linear(2, 1)
predictor = TorchPredictor(model=model)

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)
import pandas as pd
import torch
from ray.ml.predictors.integrations.torch import TorchPredictor

model = torch.nn.Linear(1, 1)
predictor = TorchPredictor(model=model)

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])

predictions = predictor.predict(data)

# Only use first column as the feature
predictions = predictor.predict(data, feature_columns=["A"])
Returns

Prediction result.

Return type

DataBatchType

class ray.ml.predictors.integrations.sklearn.SklearnPredictor(estimator: sklearn.base.BaseEstimator, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for scikit-learn compatible estimators.

Parameters
  • estimator – The fitted scikit-learn compatible estimator to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.sklearn.sklearn_predictor.SklearnPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of SklearnTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a SklearnTrainer run.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, num_estimator_cpus: Optional[int] = 1, **predict_kwargs) pandas.core.frame.DataFrame[source]

Run inference on data batch.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • num_estimator_cpus – If set to a value other than None, will set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to the given value.

  • **predict_kwargs – Keyword arguments passed to estimator.predict.

Examples:

import numpy as np
from sklearn.ensemble import RandomForestClassifier
from ray.ml.predictors.sklearn import SklearnPredictor

train_X = np.array([[1, 2], [3, 4]])
train_y = np.array([0, 1])

model = RandomForestClassifier().fit(train_X, train_y)
predictor = SklearnPredictor(model=model)

data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = np.array([[1, 2, 8], [3, 4, 9]])
predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from ray.ml.predictors.sklearn import SklearnPredictor

train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
train_y = pd.Series([0, 1])

model = RandomForestClassifier().fit(train_X, train_y)
predictor = SklearnPredictor(model=model)

# Pandas dataframe.
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
predictions = predictor.predict(data)

# Only use first and second column as the feature
data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Return type

pd.DataFrame

class ray.ml.predictors.integrations.huggingface.HuggingFacePredictor(pipeline: Optional[<MagicMock name='mock.Pipeline' id='140038619696592'>] = None, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]

Bases: ray.ml.predictor.Predictor

A predictor for HuggingFace Transformers PyTorch models.

This predictor uses Transformers Pipelines for inference.

Parameters
  • pipeline – The Transformers pipeline to use for inference.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, *, pipeline: Optional[Type[<MagicMock name='mock.Pipeline' id='140038619696592'>]] = None, **pipeline_kwargs) ray.ml.predictors.integrations.huggingface.huggingface_predictor.HuggingFacePredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of HuggingFaceTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a HuggingFaceTrainer run.

  • pipeline – A transformers.pipelines.Pipeline class to use. If not specified, will use the pipeline abstraction wrapper.

  • **pipeline_kwargs – Any kwargs to pass to the pipeline initialization. If pipeline is None, this must contain the ‘task’ argument. Cannot contain ‘model’.

predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[List[str]] = None, **pipeline_call_kwargs) Union[pandas.core.frame.DataFrame, numpy.ndarray][source]

Run inference on data batch.

The data is converted into a list (unless pipeline is a TableQuestionAnsweringPipeline) and passed to the pipeline object.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, use all columns.

  • **pipeline_call_kwargs – additional kwargs to pass to the pipeline object.

Examples:

import pandas as pd
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
from transformers.pipelines import pipeline
from ray.ml.predictors.integrations.huggingface import HuggingFacePredictor

model_checkpoint = "gpt2"
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)

model_config = AutoConfig.from_pretrained(model_checkpoint)
model = AutoModelForCausalLM.from_config(model_config)
predictor = HuggingFacePredictor(
    pipeline=pipeline(
        task="text-generation", model=model, tokenizer=tokenizer
    )
)

prompts = pd.DataFrame(
    ["Complete me", "And me", "Please complete"], columns=["sentences"]
)
predictions = predictor.predict(prompts)
Returns

Prediction result.

Return type

DataBatchType

Serving

ray.serve.model_wrappers.ModelWrapperDeployment

alias of Deployment(name=ModelWrapperDeployment,version=None,route_prefix=/ModelWrapperDeployment)

class ray.serve.model_wrappers.ModelWrapper(predictor_cls: Union[str, Type[ray.ml.predictor.Predictor]], checkpoint: Union[ray.ml.checkpoint.Checkpoint, Dict], http_adapter: Union[str, Callable[[Any], Any]] = 'ray.serve.http_adapters.json_to_ndarray', batching_params: Optional[Union[Dict[str, int], bool]] = None, **predictor_kwargs)[source]

Serve any Ray AIR predictor from an AIR checkpoint.

Parameters
  • predictor_cls (str, Type[Predictor]) – The class or path for predictor class. The type must be a subclass of ray.ml.predicotr.Predictor.

  • checkpoint (Checkpoint, dict) –

    The checkpoint object or a dictionary describe the object.

    • The checkpoint object must be a subclass of ray.ml.checkpoint.Checkpoint.

    • The dictionary should be in the form of {"checkpoint_cls": "import.path.MyCheckpoint", "uri": "uri_to_load_from"}. Serve will then call MyCheckpoint.from_uri("uri_to_load_from") to instantiate the object.

  • http_adapter (str, HTTPAdapterFn, None) – The FastAPI input conversion function. By default, Serve will use the NdArray schema and convert to numpy array. You can pass in any FastAPI dependency resolver that returns an array. When you pass in a string, Serve will import it. Please refer to Serve HTTP adatpers documentation to learn more.

  • batching_params (dict, None, False) – override the default parameters to ray.serve.batch(). Pass False to disable batching.

  • **predictor_kwargs – Additional keyword arguments passed to the Predictor.from_checkpoint() call.

Outputs

class ray.ml.checkpoint.Checkpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[<MagicMock name='mock.ObjectRef' id='140040716323920'>] = None)[source]

Ray ML Checkpoint.

This implementation provides methods to translate between different checkpoint storage locations: Local storage, external storage (e.g. cloud storage), and data dict representations.

The constructor is a private API, instead the from_ methods should be used to create checkpoint objects (e.g. Checkpoint.from_directory()).

When converting between different checkpoint formats, it is guaranteed that a full round trip of conversions (e.g. directory –> dict –> obj ref –> directory) will recover the original checkpoint data. There are no guarantees made about compatibility of intermediate representations.

Examples

Example for an arbitrary data checkpoint:

from ray.ml.checkpoint import Checkpoint

# Create checkpoint data dict
checkpoint_data = {"data": 123}

# Create checkpoint object from data
checkpoint = Checkpoint.from_dict(checkpoint_data)

# Save checkpoint to temporary location
path = checkpoint.to_directory()

# This path can then be passed around, e.g. to a different function

# At some other location, recover Checkpoint object from path
checkpoint = Checkpoint.from_directory(path)

# Convert into dictionary again
recovered_data = checkpoint.to_dict()

# It is guaranteed that the original data has been recovered
assert recovered_data == checkpoint_data

Example using MLflow for saving and loading a classifier:

from ray.ml.checkpoint import Checkpoint
from sklearn.ensemble import RandomForestClassifier
import mlflow.sklearn

# Create an sklearn classifier
clf = RandomForestClassifier(max_depth=7, random_state=0)
# ... e.g. train model with clf.fit()
# Save model using MLflow
mlflow.sklearn.save_model(clf, "model_directory")

# Create checkpoint object from path
checkpoint = Checkpoint.from_directory("model_directory")

# Convert into dictionary
checkpoint_dict = checkpoint.to_dict()

# This dict can then be passed around, e.g. to a different function

# At some other location, recover checkpoint object from dict
checkpoint = Checkpoint.from_dict(checkpoint_dict)

# Convert into a directory again
checkpoint.to_directory("other_directory")

# We can now use MLflow to re-load the model
clf = mlflow.sklearn.load_model("other_directory")

# It is guaranteed that the original data was recovered
assert isinstance(clf, RandomForestClassifier)

Checkpoints can be pickled and sent to remote processes. Please note that checkpoints pointing to local directories will be pickled as data representations, so the full checkpoint data will be contained in the checkpoint object. If you want to avoid this, consider passing only the checkpoint directory to the remote task and re-construct your checkpoint object in that function. Note that this will only work if the “remote” task is scheduled on the same node or a node that also has access to the local data path (e.g. on a shared file system like NFS).

Checkpoints pointing to object store references will keep the object reference in tact - this means that these checkpoints cannot be properly deserialized on other Ray clusters or outside a Ray cluster. If you need persistence across clusters, use the to_uri() or to_directory() methods to persist your checkpoints to disk.

PublicAPI: This API is stable across Ray releases.

classmethod from_bytes(data: bytes) ray.ml.checkpoint.Checkpoint[source]

Create a checkpoint from the given byte string.

Parameters

data – Data object containing pickled checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_bytes() bytes[source]

Return Checkpoint serialized as bytes object.

Returns

Bytes object containing checkpoint data.

Return type

bytes

classmethod from_dict(data: dict) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from dictionary.

Parameters

data – Dictionary containing checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_dict() dict[source]

Return checkpoint data as dictionary.

Returns

Dictionary containing checkpoint data.

Return type

dict

classmethod from_object_ref(obj_ref: <MagicMock name='mock.ObjectRef' id='140040716323920'>) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from object reference.

Parameters

obj_ref – ObjectRef pointing to checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_object_ref() <MagicMock name='mock.ObjectRef' id='140040716323920'>[source]

Return checkpoint data as object reference.

Returns

ObjectRef pointing to checkpoint data.

Return type

ray.ObjectRef

classmethod from_directory(path: str) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from directory.

Parameters

path – Directory containing checkpoint data. The caller promises to not delete the directory (gifts ownership of the directory to this Checkpoint).

Returns

checkpoint object.

Return type

Checkpoint

to_directory(path: Optional[str] = None, dedup: bool = True) str[source]

Write checkpoint data to directory.

Parameters

path – Target directory to restore data in. If not specified,

Returns

Directory containing checkpoint data.

Return type

str

as_directory() Iterator[str][source]

Return checkpoint directory path in a context.

This function makes checkpoint data available as a directory while avoiding unnecessary copies and left-over temporary data.

If the checkpoint is already a directory checkpoint, it will return the existing path. If it is not, it will create a temporary directory, which will be deleted after the context is exited.

Users should treat the returned checkpoint directory as read-only and avoid changing any data within it, as it might get deleted when exiting the context.

Example

with checkpoint.as_directory() as checkpoint_dir:

# Do some read-only processing of files within checkpoint_dir pass

# At this point, if a temporary directory was created, it will have # been deleted.

classmethod from_uri(uri: str) ray.ml.checkpoint.Checkpoint[source]

Create checkpoint object from location URI (e.g. cloud storage).

Valid locations currently include AWS S3 (s3://), Google cloud storage (gs://), HDFS (hdfs://), and local files (file://).

Parameters

uri – Source location URI to read data from.

Returns

checkpoint object.

Return type

Checkpoint

to_uri(uri: str) str[source]

Write checkpoint data to location URI (e.g. cloud storage).

Parameters

uri – Target location URI to write data to.

Returns

Cloud location containing checkpoint data.

Return type

str

get_internal_representation() Tuple[str, Union[dict, str, <MagicMock name='mock.ObjectRef' id='140040716323920'>]][source]

Return tuple of (type, data) for the internal representation.

The internal representation can be used e.g. to compare checkpoint objects for equality or to access the underlying data storage.

The returned type is a string and one of ["local_path", "data_dict", "uri", "object_ref"].

The data is the respective data value.

Note that paths converted from file://... will be returned as local_path (without the file:// prefix) and not as uri.

Returns:

Tuple of type and data.

DeveloperAPI: This API may change across minor Ray releases.

class ray.ml.result.Result(metrics: Optional[Dict[str, Any]], checkpoint: Optional[ray.ml.checkpoint.Checkpoint], error: Optional[Exception])[source]

The final result of a ML training run or a Tune trial.

This is the class produced by Trainer.fit(). It contains a checkpoint, which can be used for resuming training and for creating a Predictor object. It also contains a metrics object describing training metrics. error is included so that non successful runs and trials can be represented as well.

The constructor is a private API.

Parameters
  • metrics – The final metrics as reported by an Trainable.

  • checkpoint – The final checkpoint of the Trainable.

  • error – The execution error of the Trainable run, if the trial finishes in error.

property config: Optional[Dict[str, Any]]

The config associated with the result.

Configs

class ray.ml.config.ScalingConfigDataClass(trainer_resources: Optional[Dict] = None, num_workers: Optional[int] = None, use_gpu: bool = False, resources_per_worker: Optional[Dict] = None, placement_strategy: str = 'PACK')[source]

Configuration for scaling training.

This is the schema for the scaling_config dict, and after beta, this will be the actual representation for Scaling config objects.

trainer_resources: Resources to allocate for the trainer. If none is provided,

will default to 1 CPU.

num_workers: The number of workers (Ray actors) to launch.

Each worker will reserve 1 CPU by default. The number of CPUs reserved by each worker can be overridden with the resources_per_worker argument.

use_gpu: If True, training will be done on GPUs (1 per worker).

Defaults to False. The number of GPUs reserved by each worker can be overridden with the resources_per_worker argument.

resources_per_worker: If specified, the resources

defined in this Dict will be reserved for each worker. The CPU and GPU keys (case-sensitive) can be defined to override the number of CPU/GPUs used by each worker.

placement_strategy: The placement strategy to use for the

placement group of the Ray actors. See Placement Group Strategies for the possible options.

property num_cpus_per_worker

The number of CPUs to set per worker.

property num_gpus_per_worker

The number of GPUs to set per worker.

property additional_resources_per_worker

Resources per worker, not including CPU or GPU resources.

as_placement_group_factory() PlacementGroupFactory[source]

Returns a PlacementGroupFactory to specify resources for Tune.

class ray.ml.config.FailureConfig[source]

Configuration related to failure handling of each run/trial.

Parameters

max_failures – Tries to recover a run at least this many times. Will recover from the latest checkpoint if present. Setting to -1 will lead to infinite recovery retries. Setting to 0 will disable retries. Defaults to 0.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

class ray.ml.config.RunConfig(name: Optional[str] = None, local_dir: Optional[str] = None, callbacks: Optional[List[Callback]] = None, stop: Optional[Union[Mapping, Stopper, Callable[[str, Mapping], bool]]] = None, failure: Optional[ray.ml.config.FailureConfig] = None, sync_config: Optional[ray.tune.syncer.SyncConfig] = None, verbose: Union[int, ray.tune.utils.log.Verbosity] = Verbosity.V3_TRIAL_DETAILS)[source]

Runtime configuration for individual trials that are run.

This contains information that applies to individual runs of Trainable classes. This includes both running a Trainable by itself or running a hyperparameter tuning job on top of a Trainable (applies to each trial).

At resume, Ray Tune will automatically apply the same run config so that resumed run uses the same run config as the original run.

Parameters
  • name – Name of the trial or experiment. If not provided, will be deduced from the Trainable.

  • local_dir – Local dir to save training results to. Defaults to ~/ray_results.

  • stop – Stop conditions to consider. Refer to ray.tune.stopper.Stopper for more info. Stoppers should be serializable.

  • callbacks – Callbacks to invoke. Refer to ray.tune.callback.Callback for more info. Callbacks should be serializable. Currently only stateless callbacks are supported for resumed runs. (any state of the callback will not be checkpointed by Tune and thus will not take effect in resumed runs).

  • failure – The failure mode configuration.

  • sync_config – Configuration object for syncing. See tune.SyncConfig.

  • verbose – 0, 1, 2, or 3. Verbosity mode. 0 = silent, 1 = only status updates, 2 = status and brief results, 3 = status and detailed results. Defaults to 2.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.