Ray Train API

This page covers framework specific integrations with Ray Train and Ray Train Developer APIs.

For core Ray AIR APIs, take a look at the AIR Trainer package reference.

Trainer and Predictor Integrations

XGBoost

class ray.train.xgboost.XGBoostTrainer(*args, **kwargs)[source]

Bases: ray.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel XGBoost training.

This Trainer runs the XGBoost training loop in a distributed manner using multiple Ray Actors.

Note

XGBoostTrainer does not modify or otherwise alter the working of the XGBoost distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on XGBoost distributed training, refer to XGBoost documentation.

Example

import ray

from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = XGBoostTrainer(
    label_column="y",
    params={"objective": "reg:squarederror"},
    scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – XGBoost training parameters. Refer to XGBoost documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to xgboost.DMatrix objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to xgboost.train() function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)
class ray.train.xgboost.XGBoostCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with XGBoost-specific functionality.

Create this from a generic Checkpoint by calling XGBoostCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(booster: xgboost.core.Booster, *, preprocessor: Optional[Preprocessor] = None) XGBoostCheckpoint[source]

Create a Checkpoint that stores an XGBoost model.

Parameters
  • booster – The XGBoost model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An XGBoostCheckpoint containing the specified Estimator.

Examples

>>> import numpy as np
>>> import ray
>>> from ray.train.xgboost import XGBoostCheckpoint
>>> import xgboost
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = xgboost.XGBClassifier().fit(train_X, train_y)
>>> checkpoint = XGBoostCheckpoint.from_model(model.get_booster())

You can use a XGBoostCheckpoint to create an XGBoostPredictor and preform inference.

>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> predictor = XGBoostPredictor.from_checkpoint(checkpoint)
get_model() xgboost.core.Booster[source]

Retrieve the XGBoost model stored in this checkpoint.

class ray.train.xgboost.XGBoostPredictor(model: xgboost.core.Booster, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for XGBoost models.

Parameters
  • model – The XGBoost booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.xgboost.xgboost_predictor.XGBoostPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of XGBoostTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a XGBoostTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, dmatrix_kwargs: Optional[Dict[str, Any]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

The data is converted into an XGBoost DMatrix before being inputted to the model.

Parameters
  • data – A batch of input data.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • dmatrix_kwargs – Dict of keyword arguments passed to xgboost.DMatrix.

  • **predict_kwargs – Keyword arguments passed to xgboost.Booster.predict.

Examples

>>> import numpy as np
>>> import xgboost as xgb
>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = xgb.XGBClassifier().fit(train_X, train_y)
>>> predictor = XGBoostPredictor(model=model.get_booster())
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictor.predict(data, feature_columns=[0, 1])
array([0.5, 0.5], dtype=float32)
>>> import pandas as pd
>>> import xgboost as xgb
>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = xgb.XGBClassifier().fit(train_X, train_y)
>>> predictor = XGBoostPredictor(model=model.get_booster())
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictor.predict(data, feature_columns=["A", "B"])
   predictions
0          0.5
1          0.5
Returns

Prediction result.

LightGBM

class ray.train.lightgbm.LightGBMTrainer(*args, **kwargs)[source]

Bases: ray.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel LightGBM training.

This Trainer runs the LightGBM training loop in a distributed manner using multiple Ray Actors.

If you would like to take advantage of LightGBM’s built-in handling for features with the categorical data type, consider using the Categorizer preprocessor to set the dtypes in the dataset.

Note

LightGBMTrainer does not modify or otherwise alter the working of the LightGBM distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on LightGBM distributed training, refer to LightGBM documentation.

Example

import ray

from ray.train.lightgbm import LightGBMTrainer
from ray.air.config import ScalingConfig

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = LightGBMTrainer(
    label_column="y",
    params={"objective": "regression"},
    scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – LightGBM training parameters passed to lightgbm.train(). Refer to LightGBM documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to lightgbm.Dataset objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to lightgbm.train() function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)
class ray.train.lightgbm.LightGBMCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with LightGBM-specific functionality.

Create this from a generic Checkpoint by calling LightGBMCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(booster: lightgbm.basic.Booster, *, preprocessor: Optional[Preprocessor] = None) LightGBMCheckpoint[source]

Create a Checkpoint that stores a LightGBM model.

Parameters
  • booster – The LightGBM model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An LightGBMCheckpoint containing the specified Estimator.

Examples

>>> import lightgbm
>>> import numpy as np
>>> from ray.train.lightgbm import LightGBMCheckpoint
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = lightgbm.LGBMClassifier().fit(train_X, train_y)
>>> checkpoint = LightGBMCheckpoint.from_model(model.booster_)

You can use a LightGBMCheckpoint to create an LightGBMPredictor and preform inference.

>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> predictor = LightGBMPredictor.from_checkpoint(checkpoint)
get_model() lightgbm.basic.Booster[source]

Retrieve the LightGBM model stored in this checkpoint.

class ray.train.lightgbm.LightGBMPredictor(model: lightgbm.basic.Booster, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for LightGBM models.

Parameters
  • model – The LightGBM booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.lightgbm.lightgbm_predictor.LightGBMPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of LightGBMTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a LightGBMTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

Parameters
  • data – A batch of input data.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • **predict_kwargs – Keyword arguments passed to lightgbm.Booster.predict.

Examples

>>> import numpy as np
>>> import lightgbm as lgbm
>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = lgbm.LGBMClassifier().fit(train_X, train_y)
>>> predictor = LightGBMPredictor(model=model.booster_)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictions = predictor.predict(data, feature_columns=[0, 1])
>>> import pandas as pd
>>> import lightgbm as lgbm
>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = lgbm.LGBMClassifier().fit(train_X, train_y)
>>> predictor = LightGBMPredictor(model=model.booster_)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

TensorFlow

class ray.train.tensorflow.TensorflowTrainer(*args, **kwargs)[source]

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Tensorflow training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary TensorFlow process group already configured for distributed TensorFlow training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

You can also use ray.train.tensorflow.prepare_dataset_shard() within your training code.

def train_loop_per_worker():
    # Turns off autosharding for a dataset.
    # You should use this if you are doing
    # `session.get_dataset_shard(...).iter_tf_batches(...)`
    # as the data will be already sharded.
    train.tensorflow.prepare_dataset_shard(...)

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

To save a model to use for the TensorflowPredictor, you must save it under the “model” kwarg in Checkpoint passed to session.report().

Example:

import tensorflow as tf

import ray
from ray.air import session, Checkpoint
from ray.train.tensorflow import prepare_dataset_shard, TensorflowTrainer
from ray.air.config import ScalingConfig

input_size = 1

def build_model():
    # toy neural network : 1-layer
    return tf.keras.Sequential(
        [tf.keras.layers.Dense(
            1, activation="linear", input_shape=(input_size,))]
    )

def train_loop_for_worker(config):
    dataset_shard = session.get_dataset_shard("train")
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = build_model()
        model.compile(
            optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    def to_tf_dataset(dataset, batch_size):
        def to_tensor_iterator():
            for batch in dataset.iter_tf_batches(
                batch_size=batch_size, dtypes=tf.float32
            ):
                yield tf.expand_dims(batch["x"], 1), batch["y"]

        output_signature = (
            tf.TensorSpec(shape=(None, 1), dtype=tf.float32),
            tf.TensorSpec(shape=(None), dtype=tf.float32),
        )
        tf_dataset = tf.data.Dataset.from_generator(
            to_tensor_iterator, output_signature=output_signature
        )
        return prepare_dataset_shard(tf_dataset)

    for epoch in range(config["num_epochs"]):
        tf_dataset = to_tf_dataset(dataset=dataset_shard, batch_size=1)
        model.fit(tf_dataset)
        # You can also use ray.air.callbacks.keras.Callback
        # for reporting and checkpointing instead of reporting manually.
        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=model.get_weights())
            ),
        )

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset},
    train_loop_config={"num_epochs": 2})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • tensorflow_config – Configuration for setting up the TensorFlow backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, tensorflow_config: Optional[ray.train.tensorflow.config.TensorflowConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
class ray.train.tensorflow.TensorflowCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with TensorFlow-specific functionality.

Create this from a generic Checkpoint by calling TensorflowCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(model: keras.engine.training.Model, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint[source]

Create a Checkpoint that stores a Keras model.

Parameters
  • model – The Keras model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TensorflowCheckpoint containing the specified model.

Examples

>>> from ray.train.tensorflow import TensorflowCheckpoint
>>> import tensorflow as tf
>>>
>>> model = tf.keras.applications.resnet.ResNet101()  
>>> checkpoint = TensorflowCheckpoint.from_model(model)  
get_model_weights() keras.engine.training.Model[source]

Retrieve the model weights stored in this checkpoint.

class ray.train.tensorflow.TensorflowConfig[source]

Bases: ray.train.backend.BackendConfig

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.tensorflow.prepare_dataset_shard(tf_dataset_shard: tensorflow.python.data.ops.dataset_ops.DatasetV2)[source]

A utility function that overrides default config for Tensorflow Dataset.

This should be used on a TensorFlow Dataset created by calling iter_tf_batches() on a ray.data.Dataset returned by ray.train.get_dataset_shard() since the dataset has already been sharded across the workers.

Parameters

tf_dataset_shard (tf.data.Dataset) – A TensorFlow Dataset.

Returns

  • autosharding turned off

  • prefetching turned on with autotune enabled

Return type

A TensorFlow Dataset with

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.train.tensorflow.TensorflowPredictor(model_definition: Union[Callable[[], keras.engine.training.Model], Type[keras.engine.training.Model]], preprocessor: Optional[Preprocessor] = None, model_weights: Optional[list] = None, use_gpu: bool = False)[source]

Bases: ray.train._internal.dl_predictor.DLPredictor

A predictor for TensorFlow models.

Parameters
  • model_definition – A callable that returns a TensorFlow Keras model to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • model_weights – List of weights to use for the model.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model_definition: Union[Callable[[], keras.engine.training.Model], Type[keras.engine.training.Model]], use_gpu: bool = False) ray.train.tensorflow.tensorflow_predictor.TensorflowPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TensorflowTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TensorflowTrainer run.

  • model_definition – A callable that returns a TensorFlow Keras model to use. Model weights will be loaded from the checkpoint.

call_model(tensor: Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]]) Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]][source]

Runs inference on a single batch of tensor data.

This method is called by TorchPredictor.predict after converting the original data batch to torch tensors.

Override this method to add custom logic for processing the model input or output.

Example

# List outputs are not supported by default TensorflowPredictor.
def build_model() -> tf.keras.Model:
    input = tf.keras.layers.Input(shape=1)
    model = tf.keras.models.Model(inputs=input, outputs=[input, input])
    return model

# Use a custom predictor to format model output as a dict.
class CustomPredictor(TensorflowPredictor):
    def call_model(self, tensor):
        model_output = super().call_model(tensor)
        return {
            str(i): model_output[i] for i in range(len(model_output))
        }

predictor = CustomPredictor(model_definition=build_model)
predictions = predictor.predict(data_batch)
Parameters

tensor – A batch of data to predict on, represented as either a single PyTorch tensor or for multi-input models, a dictionary of tensors.

Returns

The model outputs, either as a single tensor or a dictionary of tensors.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], dtype: Optional[Union[tensorflow.python.framework.dtypes.DType, Dict[str, tensorflow.python.framework.dtypes.DType]]] = None) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single Tensorflow tensor before being inputted to the model.

If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • dtype – The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.

Examples

>>> import numpy as np
>>> import tensorflow as tf
>>> from ray.train.tensorflow import TensorflowPredictor
>>>
>>> def build_model():
...     return tf.keras.Sequential(
...         [
...             tf.keras.layers.InputLayer(input_shape=()),
...             tf.keras.layers.Flatten(),
...             tf.keras.layers.Dense(1),
...         ]
...     )
>>>
>>> weights = [np.array([[2.0]]), np.array([0.0])]
>>> predictor = TensorflowPredictor(
...     model_definition=build_model, model_weights=weights)
>>>
>>> data = np.asarray([1, 2, 3])
>>> predictions = predictor.predict(data) 
>>> import pandas as pd
>>> import tensorflow as tf
>>> from ray.train.tensorflow import TensorflowPredictor
>>>
>>> def build_model():
...     input1 = tf.keras.layers.Input(shape=(1,), name="A")
...     input2 = tf.keras.layers.Input(shape=(1,), name="B")
...     merged = tf.keras.layers.Concatenate(axis=1)([input1, input2])
...     output = tf.keras.layers.Dense(2, input_dim=2)(merged)
...     return tf.keras.models.Model(
...         inputs=[input1, input2], outputs=output)
>>>
>>> predictor = TensorflowPredictor(model_definition=build_model)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>>
>>> predictions = predictor.predict(data) 
Returns

Prediction result. The return type will be the same as the

input type.

Return type

DataBatchType

PyTorch

class ray.train.torch.TorchTrainer(*args, **kwargs)[source]

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel PyTorch training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary torch process group already configured for distributed PyTorch training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

You can also use any of the Torch specific function utils, such as ray.train.torch.get_device() and ray.train.torch.prepare_model()

def train_loop_per_worker():
    # Prepares model for distribted training by wrapping in
    # `DistributedDataParallel` and moving to correct device.
    train.torch.prepare_model(...)

    # Configures the dataloader for distributed training by adding a
    # `DistributedSampler`.
    # You should NOT use this if you are doing
    # `session.get_dataset_shard(...).iter_torch_batches(...)`
    train.torch.prepare_data_loader(...)

    # Returns the current torch device.
    train.torch.get_device()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

To save a model to use for the TorchPredictor, you must save it under the “model” kwarg in Checkpoint passed to session.report().

Example

import torch
import torch.nn as nn

import ray
from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

    model = train.torch.prepare_model(model)

    for epoch in range(num_epochs):
        for batches in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
            output = model(inputs)
            loss = loss_fn(output, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=model.state_dict())
            ),
        )

train_dataset = ray.data.from_items(
    [{"x": x, "y": 2 * x + 1} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
class ray.train.torch.TorchCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with Torch-specific functionality.

Create this from a generic Checkpoint by calling TorchCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_state_dict(state_dict: Dict[str, Any], *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint[source]

Create a Checkpoint that stores a model state dictionary.

Tip

This is the recommended method for creating TorchCheckpoints.

Parameters
  • state_dict – The model state dictionary to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TorchCheckpoint containing the specified state dictionary.

Examples

>>> from ray.train.torch import TorchCheckpoint
>>> import torch
>>>
>>> model = torch.nn.Linear(1, 1)
>>> checkpoint = TorchCheckpoint.from_state_dict(model.state_dict())

To load the state dictionary, call get_model().

>>> checkpoint.get_model(torch.nn.Linear(1, 1))
Linear(in_features=1, out_features=1, bias=True)
classmethod from_model(model: torch.nn.modules.module.Module, *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint[source]

Create a Checkpoint that stores a Torch model.

Note

PyTorch recommends storing state dictionaries. To create a TorchCheckpoint from a state dictionary, call from_state_dict(). To learn more about state dictionaries, read Saving and Loading Models.

Parameters
  • model – The Torch model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TorchCheckpoint containing the specified model.

Examples

>>> from ray.train.torch import TorchCheckpoint
>>> import torch
>>>
>>> model = torch.nn.Identity()
>>> checkpoint = TorchCheckpoint.from_model(model)

You can use a TorchCheckpoint to create an TorchPredictor and perform inference.

>>> from ray.train.torch import TorchPredictor
>>>
>>> predictor = TorchPredictor.from_checkpoint(checkpoint)
get_model(model: Optional[torch.nn.modules.module.Module] = None) torch.nn.modules.module.Module[source]

Retrieve the model stored in this checkpoint.

Parameters

model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model.

class ray.train.torch.TorchConfig(backend: Optional[str] = None, init_method: str = 'env', timeout_s: int = 1800)[source]

Bases: ray.train.backend.BackendConfig

Configuration for torch process group setup.

See https://pytorch.org/docs/stable/distributed.html for more info.

Parameters
  • backend – The backend to use for training. See torch.distributed.init_process_group for more info and valid values. If set to None, nccl will be used if GPUs are requested, else gloo will be used.

  • init_method – The initialization method to use. Either “env” for environment variable initialization or “tcp” for TCP initialization. Defaults to “env”.

  • timeout_s – Seconds for process group operations to timeout.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.accelerate(amp: bool = False) None[source]

Enables training optimizations.

Parameters

amp – If true, perform training with automatic mixed precision. Otherwise, use full precision.

Warning

train.torch.accelerate cannot be called more than once, and it must be called before any other train.torch utility function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.get_device() torch.device[source]

Gets the correct torch device to use for training.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_model(model: torch.nn.modules.module.Module, move_to_device: bool = True, parallel_strategy: Optional[str] = 'ddp', parallel_strategy_kwargs: Optional[Dict[str, Any]] = None, wrap_ddp: bool = True, ddp_kwargs: Optional[Dict[str, Any]] = None) torch.nn.modules.module.Module[source]

Prepares the model for distributed execution.

This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).

Parameters
  • model (torch.nn.Module) – A torch model to prepare.

  • move_to_device – Whether to move the model to the correct device. If set to False, the model needs to manually be moved to the correct device.

  • parallel_strategy ("ddp", "fsdp", or None) – Whether to wrap models in DistributedDataParallel, FullyShardedDataParallel, or neither.

  • parallel_strategy_kwargs (Dict[str, Any]) – Args to pass into DistributedDataParallel or FullyShardedDataParallel initialization if parallel_strategy is set to “ddp” or “fsdp”, respectively.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_optimizer(optimizer: torch.optim.optimizer.Optimizer) torch.optim.optimizer.Optimizer[source]

Wraps optimizer to support automatic mixed precision.

Parameters

optimizer (torch.optim.Optimizer) – The DataLoader to prepare.

Returns

A wrapped optimizer.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_data_loader(data_loader: torch.utils.data.dataloader.DataLoader, add_dist_sampler: bool = True, move_to_device: bool = True, auto_transfer: bool = True) torch.utils.data.dataloader.DataLoader[source]

Prepares DataLoader for distributed execution.

This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).

Parameters
  • data_loader (torch.utils.data.DataLoader) – The DataLoader to prepare.

  • add_dist_sampler – Whether to add a DistributedSampler to the provided DataLoader.

  • move_to_device – If set, automatically move the data returned by the data loader to the correct device.

  • auto_transfer – If set and device is GPU, another CUDA stream is created to automatically copy data from host (CPU) memory to device (GPU) memory (the default CUDA stream still runs the training procedure). If device is CPU, it will be disabled regardless of the setting. This configuration will be ignored if move_to_device is False.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.backward(tensor: torch.Tensor) None[source]

Computes the gradient of the specified tensor w.r.t. graph leaves.

Parameters

tensor (torch.Tensor) – Tensor of which the derivative will be computed.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.enable_reproducibility(seed: int = 0) None[source]

Limits sources of nondeterministic behavior.

This function:

  • Seeds PyTorch, Python, and NumPy.

  • Disables CUDA convolution benchmarking.

  • Configures PyTorch to use determinstic algorithms.

  • Seeds workers spawned for multi-process data loading.

Parameters

seed – The number to seed libraries and data workers with.

Warning

train.torch.enable_reproducibility() can’t guarantee completely reproducible results across executions. To learn more, read the PyTorch notes on randomness.

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.train.torch.TorchWorkerProfiler(trace_dir: Optional[str] = None)[source]

Bases: object

Utility class for running PyTorch Profiler on a Train worker.

Parameters

trace_dir (Optional[str]) – The directory to store traces on the worker node. If None, this will use a default temporary dir.

Warning

DEPRECATED: This API is deprecated and may be removed in a future Ray release.

trace_handler(p: torch.profiler.profiler.profile)[source]

A stateful PyTorch Profiler trace handler.

This will the export chrome trace to a file on disk.

These exported traces can then be fetched by calling get_and_clear_profile_traces.

Parameters

p – A PyTorch Profiler profile.

get_and_clear_profile_traces()[source]

Reads unread Profiler traces from this worker.

Returns

The traces in a format consumable by TorchTensorboardProfilerCallback.

class ray.train.torch.TorchPredictor(model: torch.nn.modules.module.Module, preprocessor: Optional[Preprocessor] = None, use_gpu: bool = False)[source]

Bases: ray.train._internal.dl_predictor.DLPredictor

A predictor for PyTorch models.

Parameters
  • model – The torch module to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None, use_gpu: bool = False) ray.train.torch.torch_predictor.TorchPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TorchTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TorchTrainer run.

  • model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

call_model(tensor: Union[torch.Tensor, Dict[str, torch.Tensor]]) Union[torch.Tensor, Dict[str, torch.Tensor]][source]

Runs inference on a single batch of tensor data.

This method is called by TorchPredictor.predict after converting the original data batch to torch tensors.

Override this method to add custom logic for processing the model input or output.

Example

# List outputs are not supported by default TorchPredictor.
class MyModel(torch.nn.Module):
    def forward(self, input_tensor):
        return [input_tensor, input_tensor]

# Use a custom predictor to format model output as a dict.
class CustomPredictor(TorchPredictor):
    def call_model(self, tensor):
        model_output = super().call_model(tensor)
        return {
            str(i): model_output[i] for i in range(len(model_output))
        }

predictor = CustomPredictor(model=MyModel())
predictions = predictor.predict(data_batch)
Parameters

tensor – A batch of data to predict on, represented as either a single PyTorch tensor or for multi-input models, a dictionary of tensors.

Returns

The model outputs, either as a single tensor or a dictionary of tensors.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], dtype: Optional[Union[torch.dtype, Dict[str, torch.dtype]]] = None) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single PyTorch tensor before being inputted to the model.

If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).

Parameters
  • data – A batch of input data of DataBatchType.

  • dtype – The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.

Examples

>>> import numpy as np
>>> import torch
>>> from ray.train.torch import TorchPredictor
>>>
>>> model = torch.nn.Linear(2, 1)
>>> predictor = TorchPredictor(model=model)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data, dtype=torch.float)
>>> import pandas as pd
>>> import torch
>>> from ray.train.torch import TorchPredictor
>>>
>>> class CustomModule(torch.nn.Module):
...     def __init__(self):
...         super().__init__()
...         self.linear1 = torch.nn.Linear(1, 1)
...         self.linear2 = torch.nn.Linear(1, 1)
...
...     def forward(self, input_dict: dict):
...         out1 = self.linear1(input_dict["A"].unsqueeze(1))
...         out2 = self.linear2(input_dict["B"].unsqueeze(1))
...         return out1 + out2
>>>
>>> predictor = TorchPredictor(model=CustomModule())
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>>
>>> predictions = predictor.predict(data, dtype=torch.float)
Returns

Prediction result. The return type will be the same as the

input type.

Return type

DataBatchType

Horovod

class ray.train.horovod.HorovodTrainer(*args, **kwargs)[source]

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Horovod training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary Horovod setup already configured for distributed Horovod training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

You could use TensorflowPredictor or TorchPredictor in conjunction with HorovodTrainer. You must save the model under the “model” kwarg in the Checkpoint passed to session.report(), so that it can be used by corresponding predictors.

Example:

import ray
import ray.train as train
import ray.train.torch. # Need this to use `train.torch.get_device()`
import horovod.torch as hvd
import torch
import torch.nn as nn
from ray.air import session, Checkpoint
from ray.train.horovod import HorovodTrainer
from ray.air.config import ScalingConfig

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)
    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    hvd.init()
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()
    device = train.torch.get_device()
    model.to(device)
    loss_fn = nn.MSELoss()
    lr_scaler = 1
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
        optimizer,
        named_parameters=model.named_parameters(),
        op=hvd.Average,
    )
    for epoch in range(num_epochs):
        model.train()
        for batch in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
            inputs.to(device)
            labels.to(device)
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")
        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(model=model.state_dict())
            ),
        )
train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
trainer = HorovodTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • horovod_config – Configuration for setting up the Horovod backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, horovod_config: Optional[ray.train.horovod.config.HorovodConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
class ray.train.horovod.HorovodConfig(nics: Optional[Set[str]] = None, verbose: int = 1, key: Optional[str] = None, ssh_port: Optional[int] = None, ssh_identity_file: Optional[str] = None, ssh_str: Optional[str] = None, timeout_s: int = 300, placement_group_timeout_s: int = 100)[source]

Bases: ray.train.backend.BackendConfig

Configurations for Horovod setup.

See https://github.com/horovod/horovod/blob/master/horovod/runner/common/util/settings.py # noqa: E501

Parameters
  • nics (Optional[Set[str]) – Network interfaces that can be used for communication.

  • verbose – Horovod logging verbosity.

  • key (Optional[str]) – Secret used for communication between workers.

  • ssh_port (Optional[int]) – Port for SSH server running on worker nodes.

  • ssh_identity_file (Optional[str]) – Path to the identity file to ssh into different hosts on the cluster.

  • ssh_str (Optional[str]) – CAUTION WHEN USING THIS. Private key file contents. Writes the private key to ssh_identity_file.

  • timeout_s – Timeout parameter for Gloo rendezvous.

  • placement_group_timeout_s – Timeout parameter for Ray Placement Group creation. Currently unused.

PublicAPI (beta): This API is in beta and may change before becoming stable.

HuggingFace

class ray.train.huggingface.HuggingFaceTrainer(*args, **kwargs)[source]

Bases: ray.train.torch.torch_trainer.TorchTrainer

A Trainer for data parallel HuggingFace Transformers on PyTorch training.

This Trainer runs the transformers.Trainer.train() method on multiple Ray Actors. The training is carried out in a distributed fashion through PyTorch DDP. These actors already have the necessary torch process group already configured for distributed PyTorch training. If you have PyTorch >= 1.12.0 installed, you can also run FSDP training by specifying the fsdp argument in TrainingArguments. For more information on configuring FSDP, refer to Hugging Face documentation.

The training function ran on every Actor will first run the specified trainer_init_per_worker function to obtain an instantiated transformers.Trainer object. The trainer_init_per_worker function will have access to preprocessed train and evaluation datasets.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards, with each Actor training on a single shard. All the other datasets will not be split.

Please note that if you use a custom transformers.Trainer subclass, the get_train_dataloader method will be wrapped around to disable sharding by transformers.IterableDatasetShard, as the dataset will already be sharded on the Ray AIR side.

HuggingFace loggers will be automatically disabled, and the local_rank argument in TrainingArguments will be automatically set. Please note that if you want to use CPU training, you will need to set the no_cuda argument in TrainingArguments manually - otherwise, an exception (segfault) may be thrown.

This Trainer requires transformers>=4.19.0 package.

Example

# Based on
# huggingface/notebooks/examples/language_modeling_from_scratch.ipynb

# Hugging Face imports
from datasets import load_dataset
import transformers
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer

import ray
from ray.train.huggingface import HuggingFaceTrainer
from ray.air.config import ScalingConfig

model_checkpoint = "gpt2"
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
block_size = 128

datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)

def tokenize_function(examples):
    return tokenizer(examples["text"])

tokenized_datasets = datasets.map(
    tokenize_function, batched=True, num_proc=1, remove_columns=["text"]
)

def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {
        k: sum(examples[k], []) for k in examples.keys()
    }
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model
    # supported it.
    # instead of this drop, you can customize this part to your needs.
    total_length = (total_length // block_size) * block_size
    # Split by chunks of max_len.
    result = {
        k: [
            t[i : i + block_size]
            for i in range(0, total_length, block_size)
        ]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result

lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    batch_size=1000,
    num_proc=1,
)
ray_train_ds = ray.data.from_huggingface(lm_datasets["train"])
ray_evaluation_ds = ray.data.from_huggingface(
    lm_datasets["validation"]
)

def trainer_init_per_worker(train_dataset, eval_dataset, **config):
    model_config = AutoConfig.from_pretrained(model_checkpoint)
    model = AutoModelForCausalLM.from_config(model_config)
    args = transformers.TrainingArguments(
        output_dir=f"{model_checkpoint}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
    )
    return transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )

scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
trainer = HuggingFaceTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "evaluation": ray_evaluation_ds},
)
result = trainer.fit()
Parameters
  • trainer_init_per_worker – The function that returns an instantiated transformers.Trainer object and takes in the following arguments: train Torch.Dataset, optional evaluation Torch.Dataset and config as kwargs. The Torch Datasets are automatically created by converting the Ray Datasets internally before they are passed into the function.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset and (optionally) key “evaluation” to denote the evaluation dataset. Can only contain a training dataset and up to one extra dataset to be used for evaluation. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • trainer_init_config – Configurations to pass into trainer_init_per_worker as kwargs.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer. Same as in TorchTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

__init__(trainer_init_per_worker: Callable[[torch.utils.data.dataset.Dataset, Optional[torch.utils.data.dataset.Dataset], Any], transformers.trainer.Trainer], *, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], trainer_init_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note

This method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

as_trainable() Type[ray.tune.trainable.trainable.Trainable][source]

Convert self to a tune.Trainable class.

class ray.train.huggingface.HuggingFaceCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with HuggingFace-specific functionality.

Create this from a generic Checkpoint by calling HuggingFaceCheckpoint.from_checkpoint(ckpt)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_model(model: Union[transformers.modeling_utils.PreTrainedModel, torch.nn.modules.module.Module], tokenizer: Optional[transformers.tokenization_utils.PreTrainedTokenizer] = None, *, path: os.PathLike, preprocessor: Optional[Preprocessor] = None) HuggingFaceCheckpoint[source]

Create a Checkpoint that stores a HuggingFace model.

Parameters
  • model – The pretrained transformer or Torch model to store in the checkpoint.

  • tokenizer – The Tokenizer to use in the Transformers pipeline for inference.

  • path – The directory where the checkpoint will be stored.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A HuggingFaceCheckpoint containing the specified model.

get_model(model: Union[Type[transformers.modeling_utils.PreTrainedModel], torch.nn.modules.module.Module], **pretrained_model_kwargs) Union[transformers.modeling_utils.PreTrainedModel, torch.nn.modules.module.Module][source]

Retrieve the model stored in this checkpoint.

get_tokenizer(tokenizer: Type[transformers.tokenization_utils.PreTrainedTokenizer], **kwargs) Optional[transformers.tokenization_utils.PreTrainedTokenizer][source]

Create a tokenizer using the data stored in this checkpoint.

get_training_arguments() transformers.training_args.TrainingArguments[source]

Retrieve the training arguments stored in this checkpoint.

class ray.train.huggingface.HuggingFacePredictor(pipeline: Optional[transformers.pipelines.base.Pipeline] = None, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for HuggingFace Transformers PyTorch models.

This predictor uses Transformers Pipelines for inference.

Parameters
  • pipeline – The Transformers pipeline to use for inference.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, *, pipeline_cls: Optional[Type[transformers.pipelines.base.Pipeline]] = None, **pipeline_kwargs) ray.train.huggingface.huggingface_predictor.HuggingFacePredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of HuggingFaceTrainer.

Parameters
  • checkpoint – The checkpoint to load the model, tokenizer and preprocessor from. It is expected to be from the result of a HuggingFaceTrainer run.

  • pipeline_cls – A transformers.pipelines.Pipeline class to use. If not specified, will use the pipeline abstraction wrapper.

  • **pipeline_kwargs – Any kwargs to pass to the pipeline initialization. If pipeline is None, this must contain the ‘task’ argument. Cannot contain ‘model’. Can be used to override the tokenizer with ‘tokenizer’.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

The data is converted into a list (unless pipeline is a TableQuestionAnsweringPipeline) and passed to the pipeline object.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, use all columns.

  • **pipeline_call_kwargs – additional kwargs to pass to the pipeline object.

Examples

>>> import pandas as pd
>>> from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
>>> from transformers.pipelines import pipeline
>>> from ray.train.huggingface import HuggingFacePredictor
>>>
>>> model_checkpoint = "gpt2"
>>> tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
>>> tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)
>>>
>>> model_config = AutoConfig.from_pretrained(model_checkpoint)
>>> model = AutoModelForCausalLM.from_config(model_config)
>>> predictor = HuggingFacePredictor(
...     pipeline=pipeline(
...         task="text-generation", model=model, tokenizer=tokenizer
...     )
... )
>>>
>>> prompts = pd.DataFrame(
...     ["Complete me", "And me", "Please complete"], columns=["sentences"]
... )
>>> predictions = predictor.predict(prompts)
Returns

Prediction result.

Scikit-Learn

class ray.train.sklearn.SklearnTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

A Trainer for scikit-learn estimator training.

This Trainer runs the fit method of the given estimator in a non-distributed manner on a single Ray Actor.

By default, the n_jobs (or thread_count) estimator parameters will be set to match the number of CPUs assigned to the Ray Actor. This behavior can be disabled by setting set_estimator_cpus=False.

If you wish to use GPU-enabled estimators (eg. cuML), make sure to set "GPU": 1 in scaling_config.trainer_resources.

The results are reported all at once and not in an iterative fashion. No checkpointing is done during training. This may be changed in the future.

Example:

import ray

from ray.train.sklearn import SklearnTrainer
from sklearn.ensemble import RandomForestRegressor

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = SklearnTrainer(
    estimator=RandomForestRegressor(),
    label_column="y",
    scaling_config=ray.air.config.ScalingConfig(
        trainer_resources={"CPU": 4}
    ),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • estimator – A scikit-learn compatible estimator to use.

  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting separate metrics.

  • label_column – Name of the label column. A column with this name must be present in the training dataset. If None, no validation will be performed.

  • params – Optional dict of params to be set on the estimator before fitting. Useful for hyperparameter tuning.

  • scoring

    Strategy to evaluate the performance of the model on the validation sets and for cross-validation. Same as in sklearn.model_selection.cross_validation. If scoring represents a single score, one can use:

    • a single string;

    • a callable that returns a single value.

    If scoring represents multiple scores, one can use:

    • a list or tuple of unique strings;

    • a callable returning a dictionary where the keys are the metric names and the values are the metric scores;

    • a dictionary with metric names as keys and callables a values.

  • cv

    Determines the cross-validation splitting strategy. If specified, cross-validation will be run on the train dataset, in addition to computing metrics for validation datasets. Same as in sklearn.model_selection.cross_validation, with the exception of None. Possible inputs for cv are:

    • None, to skip cross-validation.

    • int, to specify the number of folds in a (Stratified)KFold,

    • CV splitter,

    • An iterable yielding (train, test) splits as arrays of indices.

    For int/None inputs, if the estimator is a classifier and y is either binary or multiclass, StratifiedKFold is used. In all other cases, KFold is used. These splitters are instantiated with shuffle=False so the splits will be the same across calls.

    If you provide a “cv_groups” column in the train dataset, it will be used as group labels for the samples used while splitting the dataset into train/test set. Only used in conjunction with a “Group” cv instance (e.g., GroupKFold). This corresponds to the groups argument in sklearn.model_selection.cross_validation.

  • return_train_score_cv – Whether to also return train scores during cross-validation. Ignored if cv is None.

  • parallelize_cv – If set to True, will parallelize cross-validation instead of the estimator. If set to None, will detect if the estimator has any parallelism-related params (n_jobs or thread_count) and parallelize cross-validation if there are none. If False, will not parallelize cross-validation. Cannot be set to True if there are any GPUs assigned to the trainer. Ignored if cv is None.

  • set_estimator_cpus – If set to True, will automatically set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to match the number of available CPUs.

  • scaling_config – Configuration for how to scale training. Only the trainer_resources key can be provided, as the training is not distributed.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • **fit_params – Additional kwargs passed to estimator.fit() method.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

__init__(*, estimator: sklearn.base.BaseEstimator, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: Optional[str] = None, params: Optional[Dict[str, Any]] = None, scoring: Optional[Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float], Iterable[Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float]]], Dict[str, Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float]]]]] = None, cv: Optional[Union[int, Iterable, sklearn.model_selection._split.BaseCrossValidator]] = None, return_train_score_cv: bool = False, parallelize_cv: Optional[bool] = None, set_estimator_cpus: bool = True, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, **fit_params)[source]
training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
class ray.train.sklearn.SklearnCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with sklearn-specific functionality.

Create this from a generic Checkpoint by calling SklearnCheckpoint.from_checkpoint(ckpt)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_estimator(estimator: sklearn.base.BaseEstimator, *, path: os.PathLike, preprocessor: Optional[Preprocessor] = None) SklearnCheckpoint[source]

Create a Checkpoint that stores an sklearn Estimator.

Parameters
  • estimator – The Estimator to store in the checkpoint.

  • path – The directory where the checkpoint will be stored.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An SklearnCheckpoint containing the specified Estimator.

Examples

>>> from ray.train.sklearn import SklearnCheckpoint
>>> from sklearn.ensemble import RandomForestClassifier
>>>
>>> estimator = RandomForestClassifier()
>>> checkpoint = SklearnCheckpoint.from_estimator(estimator, path=".")

You can use a SklearnCheckpoint to create an SklearnPredictor and preform inference.

>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> predictor = SklearnPredictor.from_checkpoint(checkpoint)
get_estimator() sklearn.base.BaseEstimator[source]

Retrieve the Estimator stored in this checkpoint.

class ray.train.sklearn.SklearnPredictor(estimator: sklearn.base.BaseEstimator, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for scikit-learn compatible estimators.

Parameters
  • estimator – The fitted scikit-learn compatible estimator to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.sklearn.sklearn_predictor.SklearnPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of SklearnTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a SklearnTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, num_estimator_cpus: Optional[int] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • num_estimator_cpus – If set to a value other than None, will set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to the given value.

  • **predict_kwargs – Keyword arguments passed to estimator.predict.

Examples

>>> import numpy as np
>>> from sklearn.ensemble import RandomForestClassifier
>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = RandomForestClassifier().fit(train_X, train_y)
>>> predictor = SklearnPredictor(estimator=model)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictions = predictor.predict(data, feature_columns=[0, 1])
>>> import pandas as pd
>>> from sklearn.ensemble import RandomForestClassifier
>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = RandomForestClassifier().fit(train_X, train_y)
>>> predictor = SklearnPredictor(estimator=model)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Reinforcement Learning (RLlib)

class ray.train.rl.RLCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with RLlib-specific functionality.

Create this from a generic Checkpoint by calling RLCheckpoint.from_checkpoint(ckpt).

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

get_policy(env: Optional[Any] = None) ray.rllib.policy.policy.Policy[source]

Retrieve the policy stored in this checkpoint.

Parameters

env – Optional environment to instantiate the trainer with. If not given, it is parsed from the saved trainer configuration.

Returns

The policy stored in this checkpoint.

class ray.train.rl.RLPredictor(policy: ray.rllib.policy.policy.Policy, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for RLlib policies.

Parameters
  • policy – The RLlib policy on which to perform inference on.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, env: Optional[Any] = None, **kwargs) ray.train.predictor.Predictor[source]

Create RLPredictor from checkpoint.

This method requires that the checkpoint was created with the Ray AIR RLTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from.

  • env – Optional environment to instantiate the trainer with. If not given, it is parsed from the saved trainer configuration instead.

class ray.train.rl.RLTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

Reinforcement learning trainer.

This trainer provides an interface to RLlib trainables.

If datasets and preprocessors are used, they can be utilized for offline training, e.g. using behavior cloning. Otherwise, this trainer will use online training.

Parameters
  • algorithm – Algorithm to train on. Can be a string reference, (e.g. "PPO") or a RLlib trainer class.

  • scaling_config – Configuration for how to scale training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. If specified, datasets will be used for offline training. Will be configured as an RLlib input config item.

  • preprocessor – A preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

Example

Online training:

from ray.air.config import RunConfig, ScalingConfig
from ray.train.rl import RLTrainer

trainer = RLTrainer(
    run_config=RunConfig(stop={"training_iteration": 5}),
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
    algorithm="PPO",
    config={
        "env": "CartPole-v0",
        "framework": "tf",
        "evaluation_num_workers": 1,
        "evaluation_interval": 1,
        "evaluation_config": {"input": "sampler"},
    },
)
result = trainer.fit()

Example

Offline training (assumes data is stored in /tmp/data-dir):

import ray
from ray.air.config import RunConfig, ScalingConfig
from ray.train.rl import RLTrainer
from ray.rllib.algorithms.bc.bc import BC

dataset = ray.data.read_json(
    "/tmp/data-dir", parallelism=2, ray_remote_args={"num_cpus": 1}
)

trainer = RLTrainer(
    run_config=RunConfig(stop={"training_iteration": 5}),
    scaling_config=ScalingConfig(
        num_workers=2,
        use_gpu=False,
    ),
    datasets={"train": dataset},
    algorithm=BCTrainer,
    config={
        "env": "CartPole-v0",
        "framework": "tf",
        "evaluation_num_workers": 1,
        "evaluation_interval": 1,
        "evaluation_config": {"input": "sampler"},
    },
)
result = trainer.fit()

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
as_trainable() Type[ray.tune.trainable.trainable.Trainable][source]

Convert self to a tune.Trainable class.

Base Classes (Developer APIs)

class ray.train.trainer.BaseTrainer(*args, **kwargs)[source]

Defines interface for distributed training on Ray.

Note: The base BaseTrainer class cannot be instantiated directly. Only one of its subclasses can be used.

How does a trainer work?

  • First, initialize the Trainer. The initialization runs locally, so heavyweight setup should not be done in __init__.

  • Then, when you call trainer.fit(), the Trainer is serialized and copied to a remote Ray actor. The following methods are then called in sequence on the remote actor.

  • trainer.setup(): Any heavyweight Trainer setup should be specified here.

  • trainer.preprocess_datasets(): The provided ray.data.Dataset are preprocessed with the provided ray.data.Preprocessor.

  • trainer.train_loop(): Executes the main training logic.

  • Calling trainer.fit() will return a ray.result.Result object where you can access metrics from your training run, as well as any checkpoints that may have been saved.

How do I create a new Trainer?

Subclass ray.train.trainer.BaseTrainer, and override the training_loop method, and optionally setup.

import torch

from ray.train.trainer import BaseTrainer
from ray import tune
from ray.air import session


class MyPytorchTrainer(BaseTrainer):
    def setup(self):
        self.model = torch.nn.Linear(1, 1)
        self.optimizer = torch.optim.SGD(
            self.model.parameters(), lr=0.1)

    def training_loop(self):
        # You can access any Trainer attributes directly in this method.
        # self.datasets["train"] has already been
        # preprocessed by self.preprocessor
        dataset = self.datasets["train"]

        torch_ds = dataset.iter_torch_batches(dtypes=torch.float)
        loss_fn = torch.nn.MSELoss()

        for epoch_idx in range(10):
            loss = 0
            num_batches = 0
            for batch in torch_ds:
                X, y = torch.unsqueeze(batch["x"], 1), batch["y"]
                # Compute prediction error
                pred = self.model(X)
                batch_loss = loss_fn(pred, y)

                # Backpropagation
                self.optimizer.zero_grad()
                batch_loss.backward()
                self.optimizer.step()

                loss += batch_loss.item()
                num_batches += 1
            loss /= num_batches

            # Use Tune functions to report intermediate
            # results.
            session.report({"loss": loss, "epoch": epoch_idx})

How do I use an existing Trainer or one of my custom Trainers?

Initialize the Trainer, and call Trainer.fit()

import ray
train_dataset = ray.data.from_items(
    [{"x": i, "y": i} for i in range(3)])
my_trainer = MyPytorchTrainer(datasets={"train": train_dataset})
result = my_trainer.fit()
Parameters
  • scaling_config – Configuration for how to scale training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

__init__(*, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note

This method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

abstract training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
fit() ray.air.result.Result[source]

Runs training.

Returns

A Result object containing the training result.

Raises
  • TrainingFailedError – If any failures during the execution of

  • self.as_trainable()`

PublicAPI (beta): This API is in beta and may change before becoming stable.

as_trainable() Type[Trainable][source]

Convert self to a tune.Trainable class.

class ray.train.data_parallel_trainer.DataParallelTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

A Trainer for data parallel training.

You should subclass this Trainer if your Trainer follows SPMD (single program, multiple data) programming paradigm - you want multiple processes to run the same function, but on different data.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

How do I use ``DataParallelTrainer`` or any of its subclasses?

Example:

import ray
from ray.air import session

def train_loop_for_worker():
    dataset_shard_for_this_worker = session.get_dataset_shard("train")

    assert len(dataset_shard_for_this_worker) == 1

train_dataset = ray.data.from_items([1, 2, 3])
assert len(train_dataset) == 3
trainer = DataParallelTrainer(
    ray.air.config.ScalingConfig(num_workers=3),
    datasets={"train": train_dataset},
)
result = trainer.fit()

How do I develop on top of ``DataParallelTrainer``?

In many cases, using DataParallelTrainer directly is sufficient to execute functions on multiple actors.

However, you may want to subclass DataParallelTrainer and create a custom Trainer for the following 2 use cases:

  • Use Case 1: You want to do data parallel training, but want to have a predefined training_loop_per_worker.

  • Use Case 2: You want to implement a custom Backend that automatically handles additional setup or teardown logic on each actor, so that the users of this new trainer do not have to implement this logic. For example, a TensorflowTrainer can be built on top of DataParallelTrainer that automatically handles setting the proper environment variables for distributed Tensorflow on each actor.

For 1, you can set a predefined training loop in __init__

from ray.train.data_parallel_trainer import DataParallelTrainer

class MyDataParallelTrainer(DataParallelTrainer):
    def __init__(self, *args, **kwargs):
        predefined_train_loop_per_worker = lambda: 1
        super().__init__(predefined_train_loop_per_worker, *args, **kwargs)

For 2, you can implement the ray.train.Backend and ray.train.BackendConfig interfaces.

from dataclasses import dataclass
from ray.train.backend import Backend, BackendConfig

class MyBackend(Backend):
    def on_start(self, worker_group, backend_config):
        def set_env_var(env_var_value):
            import os
            os.environ["MY_ENV_VAR"] = env_var_value

        worker_group.execute(set_env_var, backend_config.env_var)

@dataclass
class MyBackendConfig(BackendConfig):
    env_var: str = "default_value"

    def backend_cls(self):
        return MyBackend

class MyTrainer(DataParallelTrainer):
    def __init__(self, train_loop_per_worker, my_backend_config:
        MyBackendConfig, **kwargs):

        super().__init__(
            train_loop_per_worker,
            backend_config=my_backend_config, **kwargs)
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • backend_config – Configuration for setting up a Backend (e.g. Torch, Tensorflow, Horovod) on each worker to enable distributed communication. If no Backend should be set up, then set this to None.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest. This is merged with the default dataset config for the given trainer (cls._dataset_config).

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, backend_config: Optional[ray.train.backend.BackendConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
get_dataset_config() Dict[str, ray.air.config.DatasetConfig][source]

Return a copy of this Trainer’s final dataset configs.

Returns

The merged default + user-supplied dataset config.

class ray.train.gbdt_trainer.GBDTTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

Abstract class for scaling gradient-boosting decision tree (GBDT) frameworks.

Inherited by XGBoostTrainer and LightGBMTrainer.

Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – Framework specific training parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to framework train() function.

DeveloperAPI: This API may change across minor Ray releases.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)[source]
preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
as_trainable() Type[ray.tune.trainable.trainable.Trainable][source]

Convert self to a tune.Trainable class.

class ray.train.backend.Backend(*args, **kwargs)[source]

Singleton for distributed communication backend.

share_cuda_visible_devices

If True, each worker process will have CUDA_VISIBLE_DEVICES set as the visible device IDs of all workers on the same node for this training instance. If False, each worker will have CUDA_VISIBLE_DEVICES set to the device IDs allocated by Ray for that worker.

Type

bool

DeveloperAPI: This API may change across minor Ray releases.

on_start(worker_group: ray.train._internal.worker_group.WorkerGroup, backend_config: ray.train.backend.BackendConfig)[source]

Logic for starting this backend.

on_shutdown(worker_group: ray.train._internal.worker_group.WorkerGroup, backend_config: ray.train.backend.BackendConfig)[source]

Logic for shutting down the backend.

static encode_data(data_dict: Dict) ray.train.backend.EncodedData[source]

Logic to encode a data dict before sending to the driver.

This function will be called on the workers for any data that is sent to the driver via session.report().

static decode_data(encoded_data: ray.train.backend.EncodedData) Dict[source]

Logic to decode an encoded data dict.

This function will be called on the driver after receiving the encoded data dict from the worker.

class ray.train.backend.BackendConfig[source]

Parent class for configurations of training backend.

DeveloperAPI: This API may change across minor Ray releases.

Deprecated APIs

These APIs are deprecated and will be removed in a future Ray release:

  • ray.train.Trainer

  • ray.train.callbacks.*