Ray Train API#

This page covers framework specific integrations with Ray Train and Ray Train Developer APIs.

For core Ray AIR APIs, take a look at the AIR Trainer package reference.

Trainer and Predictor Integrations#

XGBoost#

XGBoostTrainer#

class ray.train.xgboost.XGBoostTrainer(*args, **kwargs)[source]#

Bases: ray.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel XGBoost training.

This Trainer runs the XGBoost training loop in a distributed manner using multiple Ray Actors.

Note

XGBoostTrainer does not modify or otherwise alter the working of the XGBoost distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on XGBoost distributed training, refer to XGBoost documentation.

Example

import ray

from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = XGBoostTrainer(
    label_column="y",
    params={"objective": "reg:squarederror"},
    scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a β€œtrain” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – XGBoost training parameters. Refer to XGBoost documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to xgboost.DMatrix objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to xgboost.train() function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)#

XGBoostPredictor#

class ray.train.xgboost.XGBoostCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]#

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with XGBoost-specific functionality.

Create this from a generic Checkpoint by calling XGBoostCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(booster: xgboost.core.Booster, *, preprocessor: Optional[Preprocessor] = None) XGBoostCheckpoint[source]#

Create a Checkpoint that stores an XGBoost model.

Parameters
  • booster – The XGBoost model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An XGBoostCheckpoint containing the specified Estimator.

Examples

>>> import numpy as np
>>> import ray
>>> from ray.train.xgboost import XGBoostCheckpoint
>>> import xgboost
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = xgboost.XGBClassifier().fit(train_X, train_y)
>>> checkpoint = XGBoostCheckpoint.from_model(model.get_booster())

You can use a XGBoostCheckpoint to create an XGBoostPredictor and preform inference.

>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> predictor = XGBoostPredictor.from_checkpoint(checkpoint)
get_model() xgboost.core.Booster[source]#

Retrieve the XGBoost model stored in this checkpoint.

class ray.train.xgboost.XGBoostPredictor(model: xgboost.core.Booster, preprocessor: Optional[Preprocessor] = None)[source]#

Bases: ray.train.predictor.Predictor

A predictor for XGBoost models.

Parameters
  • model – The XGBoost booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.xgboost.xgboost_predictor.XGBoostPredictor[source]#

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of XGBoostTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a XGBoostTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, dmatrix_kwargs: Optional[Dict[str, Any]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]][source]#

Run inference on data batch.

The data is converted into an XGBoost DMatrix before being inputted to the model.

Parameters
  • data – A batch of input data.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • dmatrix_kwargs – Dict of keyword arguments passed to xgboost.DMatrix.

  • **predict_kwargs – Keyword arguments passed to xgboost.Booster.predict.

Examples

>>> import numpy as np
>>> import xgboost as xgb
>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = xgb.XGBClassifier().fit(train_X, train_y)
>>> predictor = XGBoostPredictor(model=model.get_booster())
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictor.predict(data, feature_columns=[0, 1])
array([0.5, 0.5], dtype=float32)
>>> import pandas as pd
>>> import xgboost as xgb
>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = xgb.XGBClassifier().fit(train_X, train_y)
>>> predictor = XGBoostPredictor(model=model.get_booster())
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictor.predict(data, feature_columns=["A", "B"])
   predictions
0          0.5
1          0.5
Returns

Prediction result.

LightGBM#

LightGBMTrainer#

class ray.train.lightgbm.LightGBMTrainer(*args, **kwargs)[source]#

Bases: ray.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel LightGBM training.

This Trainer runs the LightGBM training loop in a distributed manner using multiple Ray Actors.

If you would like to take advantage of LightGBM’s built-in handling for features with the categorical data type, consider using the Categorizer preprocessor to set the dtypes in the dataset.

Note

LightGBMTrainer does not modify or otherwise alter the working of the LightGBM distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on LightGBM distributed training, refer to LightGBM documentation.

Example

import ray

from ray.train.lightgbm import LightGBMTrainer
from ray.air.config import ScalingConfig

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = LightGBMTrainer(
    label_column="y",
    params={"objective": "regression"},
    scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a β€œtrain” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – LightGBM training parameters passed to lightgbm.train(). Refer to LightGBM documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to lightgbm.Dataset objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to lightgbm.train() function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)#

LightGBMPredictor#

class ray.train.lightgbm.LightGBMCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]#

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with LightGBM-specific functionality.

Create this from a generic Checkpoint by calling LightGBMCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(booster: lightgbm.basic.Booster, *, preprocessor: Optional[Preprocessor] = None) LightGBMCheckpoint[source]#

Create a Checkpoint that stores a LightGBM model.

Parameters
  • booster – The LightGBM model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An LightGBMCheckpoint containing the specified Estimator.

Examples

>>> import lightgbm
>>> import numpy as np
>>> from ray.train.lightgbm import LightGBMCheckpoint
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = lightgbm.LGBMClassifier().fit(train_X, train_y)
>>> checkpoint = LightGBMCheckpoint.from_model(model.booster_)

You can use a LightGBMCheckpoint to create an LightGBMPredictor and preform inference.

>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> predictor = LightGBMPredictor.from_checkpoint(checkpoint)
get_model() lightgbm.basic.Booster[source]#

Retrieve the LightGBM model stored in this checkpoint.

class ray.train.lightgbm.LightGBMPredictor(model: lightgbm.basic.Booster, preprocessor: Optional[Preprocessor] = None)[source]#

Bases: ray.train.predictor.Predictor

A predictor for LightGBM models.

Parameters
  • model – The LightGBM booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.lightgbm.lightgbm_predictor.LightGBMPredictor[source]#

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of LightGBMTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a LightGBMTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]][source]#

Run inference on data batch.

Parameters
  • data – A batch of input data.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • **predict_kwargs – Keyword arguments passed to lightgbm.Booster.predict.

Examples

>>> import numpy as np
>>> import lightgbm as lgbm
>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = lgbm.LGBMClassifier().fit(train_X, train_y)
>>> predictor = LightGBMPredictor(model=model.booster_)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictions = predictor.predict(data, feature_columns=[0, 1])
>>> import pandas as pd
>>> import lightgbm as lgbm
>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = lgbm.LGBMClassifier().fit(train_X, train_y)
>>> predictor = LightGBMPredictor(model=model.booster_)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

TensorFlow#

TensorflowTrainer#

class ray.train.tensorflow.TensorflowTrainer(*args, **kwargs)[source]#

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Tensorflow training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary TensorFlow process group already configured for distributed TensorFlow training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the β€œtrain” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

Warning

Ray will not automatically set any environment variables or configuration related to local parallelism / threading aside from β€œOMP_NUM_THREADS”. If you desire greater control over TensorFlow threading, use the tf.config.threading module (eg. tf.config.threading.set_inter_op_parallelism_threads(num_cpus)) at the beginning of your train_loop_per_worker function.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

To save a model to use for the TensorflowPredictor, you must save it under the β€œmodel” kwarg in Checkpoint passed to session.report().

Example:

import tensorflow as tf

import ray
from ray.air import session, Checkpoint
from ray.air.config import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer

# If using GPUs, set this to True.
use_gpu = False

def build_model():
    # toy neural network : 1-layer
    return tf.keras.Sequential(
        [tf.keras.layers.Dense(
            1, activation="linear", input_shape=(1,))]
    )

def train_loop_per_worker(config):
    dataset_shard = session.get_dataset_shard("train")
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = build_model()
        model.compile(
            optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    tf_dataset = dataset_shard.to_tf(
        feature_columns="x",
        label_columns="y",
        batch_size=1
    )
    for epoch in range(config["num_epochs"]):
        model.fit(tf_dataset)
        # You can also use ray.air.integrations.keras.Callback
        # for reporting and checkpointing instead of reporting manually.
        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=model.get_weights())
            ),
        )

train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=3, use_gpu=use_gpu),
    datasets={"train": train_dataset},
    train_loop_config={"num_epochs": 2},
)
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • tensorflow_config – Configuration for setting up the TensorFlow backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key β€œtrain” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, tensorflow_config: Optional[ray.train.tensorflow.config.TensorflowConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]#

TensorflowPredictor and TensorflowCheckpoint#

class ray.train.tensorflow.TensorflowCheckpoint(*args, **kwargs)[source]#

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with TensorFlow-specific functionality.

Create this from a generic Checkpoint by calling TensorflowCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

class Flavor(value)[source]#

Bases: enum.Enum

An enumeration.

classmethod from_model(model: keras.engine.training.Model, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint[source]#

Create a Checkpoint that stores a Keras model.

The checkpoint created with this method needs to be paired with model when used.

Parameters
  • model – The Keras model, whose weights are stored in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TensorflowCheckpoint containing the specified model.

Examples

>>> from ray.train.tensorflow import TensorflowCheckpoint
>>> import tensorflow as tf
>>>
>>> model = tf.keras.applications.resnet.ResNet101()  
>>> checkpoint = TensorflowCheckpoint.from_model(model)  
classmethod from_h5(file_path: str, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint[source]#

Create a Checkpoint that stores a Keras model from H5 format.

The checkpoint generated by this method contains all the information needed. Thus no model is needed to be supplied when using this checkpoint.

file_path must maintain validity even after this function returns. Some new files/directories may be added to the parent directory of file_path, as a side effect of this method.

Parameters
  • file_path – The path to the .h5 file to load model from. This is the same path that is used for model.save(path).

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TensorflowCheckpoint converted from h5 format.

Examples

>>> import tensorflow as tf
>>> import ray
>>> from ray.train.batch_predictor import BatchPredictor
>>> from ray.train.tensorflow import (
...     TensorflowCheckpoint, TensorflowTrainer, TensorflowPredictor
... )
>>> from ray.air import session
>>> from ray.air.config import ScalingConfig
>>> def train_func():
...     model = tf.keras.Sequential(
...         [
...             tf.keras.layers.InputLayer(input_shape=()),
...             tf.keras.layers.Flatten(),
...             tf.keras.layers.Dense(10),
...             tf.keras.layers.Dense(1),
...         ]
...     )
...     model.save("my_model.h5")
...     checkpoint = TensorflowCheckpoint.from_h5("my_model.h5")
...     session.report({"my_metric": 1}, checkpoint=checkpoint)
>>> trainer = TensorflowTrainer(
...     train_loop_per_worker=train_func,
...     scaling_config=ScalingConfig(num_workers=2))
>>> result_checkpoint = trainer.fit().checkpoint  
>>> batch_predictor = BatchPredictor.from_checkpoint(
...     result_checkpoint, TensorflowPredictor)  
>>> batch_predictor.predict(ray.data.range(3))  
classmethod from_saved_model(dir_path: str, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint[source]#

Create a Checkpoint that stores a Keras model from SavedModel format.

The checkpoint generated by this method contains all the information needed. Thus no model is needed to be supplied when using this checkpoint.

dir_path must maintain validity even after this function returns. Some new files/directories may be added to dir_path, as a side effect of this method.

Parameters
  • dir_path – The directory containing the saved model. This is the same directory as used by model.save(dir_path).

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TensorflowCheckpoint converted from SavedModel format.

Examples

>>> import tensorflow as tf
>>> import ray
>>> from ray.train.batch_predictor import BatchPredictor
>>> from ray.train.tensorflow import (
... TensorflowCheckpoint, TensorflowTrainer, TensorflowPredictor)
>>> from ray.air import session
>>> from ray.air.config import ScalingConfig
>>> def train_fn():
...     model = tf.keras.Sequential(
...         [
...             tf.keras.layers.InputLayer(input_shape=()),
...             tf.keras.layers.Flatten(),
...             tf.keras.layers.Dense(10),
...             tf.keras.layers.Dense(1),
...         ])
...     model.save("my_model")
...     checkpoint = TensorflowCheckpoint.from_saved_model("my_model")
...     session.report({"my_metric": 1}, checkpoint=checkpoint)
>>> trainer = TensorflowTrainer(
...     train_loop_per_worker=train_fn,
...     scaling_config=ScalingConfig(num_workers=2))
>>> result_checkpoint = trainer.fit().checkpoint  
>>> batch_predictor = BatchPredictor.from_checkpoint(
...     result_checkpoint, TensorflowPredictor)  
>>> batch_predictor.predict(ray.data.range(3))  
get_model(model: Optional[Union[keras.engine.training.Model, Callable[[], keras.engine.training.Model]]] = None, model_definition: Optional[Callable[[], keras.engine.training.Model]] = None) keras.engine.training.Model[source]#

Retrieve the model stored in this checkpoint.

Parameters
  • model – This arg is expected only if the original checkpoint was created via TensorflowCheckpoint.from_model.

  • model_definition – This parameter is deprecated. Use model instead.

Returns

The Tensorflow Keras model stored in the checkpoint.

class ray.train.tensorflow.TensorflowConfig[source]#

Bases: ray.train.backend.BackendConfig

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.tensorflow.prepare_dataset_shard(tf_dataset_shard: tensorflow.python.data.ops.dataset_ops.DatasetV2)[source]#

A utility function that overrides default config for Tensorflow Dataset.

This should be used on a TensorFlow Dataset created by calling iter_tf_batches() on a ray.data.Dataset returned by ray.air.session.get_dataset_shard() since the dataset has already been sharded across the workers.

Parameters

tf_dataset_shard (tf.data.Dataset) – A TensorFlow Dataset.

Returns

  • autosharding turned off

  • prefetching turned on with autotune enabled

Return type

A TensorFlow Dataset with

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.train.tensorflow.TensorflowPredictor(*, model: Optional[keras.engine.training.Model] = None, preprocessor: Optional[Preprocessor] = None, use_gpu: bool = False)[source]#

Bases: ray.train._internal.dl_predictor.DLPredictor

A predictor for TensorFlow models.

Parameters
  • model – A Tensorflow Keras model to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • model_weights – List of weights to use for the model.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model_definition: Optional[Union[Callable[[], keras.engine.training.Model], Type[keras.engine.training.Model]]] = None, use_gpu: Optional[bool] = False) ray.train.tensorflow.tensorflow_predictor.TensorflowPredictor[source]#

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TensorflowTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TensorflowTrainer run.

  • model_definition – A callable that returns a TensorFlow Keras model to use. Model weights will be loaded from the checkpoint. This is only needed if the checkpoint was created from TensorflowCheckpoint.from_model.

  • use_gpu – Whether GPU should be used during prediction.

call_model(inputs: Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]]) Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]][source]#

Runs inference on a single batch of tensor data.

This method is called by TorchPredictor.predict after converting the original data batch to torch tensors.

Override this method to add custom logic for processing the model input or output.

Example

# List outputs are not supported by default TensorflowPredictor.
def build_model() -> tf.keras.Model:
    input = tf.keras.layers.Input(shape=1)
    model = tf.keras.models.Model(inputs=input, outputs=[input, input])
    return model

# Use a custom predictor to format model output as a dict.
class CustomPredictor(TensorflowPredictor):
    def call_model(self, inputs):
        model_output = super().call_model(inputs)
        return {
            str(i): model_output[i] for i in range(len(model_output))
        }

predictor = CustomPredictor(model_definition=build_model)
predictions = predictor.predict(data_batch)
Parameters

inputs – A batch of data to predict on, represented as either a single TensorFlow tensor or for multi-input models, a dictionary of tensors.

Returns

The model outputs, either as a single tensor or a dictionary of tensors.

DeveloperAPI: This API may change across minor Ray releases.

predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], dtype: Optional[Union[tensorflow.python.framework.dtypes.DType, Dict[str, tensorflow.python.framework.dtypes.DType]]] = None) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]][source]#

Run inference on data batch.

If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single Tensorflow tensor before being inputted to the model.

If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • dtype – The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.

Examples

>>> import numpy as np
>>> import tensorflow as tf
>>> from ray.train.tensorflow import TensorflowPredictor
>>>
>>> def build_model():
...     return tf.keras.Sequential(
...         [
...             tf.keras.layers.InputLayer(input_shape=()),
...             tf.keras.layers.Flatten(),
...             tf.keras.layers.Dense(1),
...         ]
...     )
>>>
>>> weights = [np.array([[2.0]]), np.array([0.0])]
>>> predictor = TensorflowPredictor(model=build_model())
>>>
>>> data = np.asarray([1, 2, 3])
>>> predictions = predictor.predict(data) 
>>> import pandas as pd
>>> import tensorflow as tf
>>> from ray.train.tensorflow import TensorflowPredictor
>>>
>>> def build_model():
...     input1 = tf.keras.layers.Input(shape=(1,), name="A")
...     input2 = tf.keras.layers.Input(shape=(1,), name="B")
...     merged = tf.keras.layers.Concatenate(axis=1)([input1, input2])
...     output = tf.keras.layers.Dense(2, input_dim=2)(merged)
...     return tf.keras.models.Model(
...         inputs=[input1, input2], outputs=output)
>>>
>>> predictor = TensorflowPredictor(model=build_model())
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>>
>>> predictions = predictor.predict(data) 
Returns

Prediction result. The return type will be the same as the

input type.

Return type

DataBatchType

PyTorch#

TorchTrainer#

class ray.train.torch.TorchTrainer(*args, **kwargs)[source]#

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel PyTorch training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary torch process group configured for distributed PyTorch training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
from typing import Dict, Any
def train_loop_per_worker(config: Dict[str, Any]):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the β€œtrain” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods. See full example code below.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Get dict of last saved checkpoint.
    session.get_checkpoint()

    # Session returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Get the total number of workers executing training.
    session.get_world_size()

    # Get the rank of this worker.
    session.get_world_rank()

    # Get the rank of the worker on the current node.
    session.get_local_rank()

You can also use any of the Torch specific function utils, such as ray.train.torch.get_device() and ray.train.torch.prepare_model()

def train_loop_per_worker():
    # Prepares model for distribted training by wrapping in
    # `DistributedDataParallel` and moving to correct device.
    train.torch.prepare_model(...)

    # Configures the dataloader for distributed training by adding a
    # `DistributedSampler`.
    # You should NOT use this if you are doing
    # `session.get_dataset_shard(...).iter_torch_batches(...)`
    train.torch.prepare_data_loader(...)

    # Get the current torch device.
    train.torch.get_device()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

To save a model to use for the TorchPredictor, you must save it under the β€œmodel” kwarg in Checkpoint passed to session.report().

Note

When you wrap the model with prepare_model, the keys of its state_dict are prefixed by module.. For example, layer1.0.bn1.bias becomes module.layer1.0.bn1.bias. However, when saving model through session.report() all module. prefixes are stripped. As a result, when you load from a saved checkpoint, make sure that you first load state_dict to the model before calling prepare_model. Otherwise, you will run into errors like Error(s) in loading state_dict for DistributedDataParallel: Missing key(s) in state_dict: "module.conv1.weight", .... See snippet below.

from torchvision.models import resnet18
from ray.air import session
from ray.air.checkpoint import Checkpoint
import ray.train as train

def train_func():
    ...
    model = resnet18()
    model = train.torch.prepare_model(model)
    for epoch in range(3):
        ...
        ckpt = Checkpoint.from_dict({
            "epoch": epoch,
            "model": model.state_dict(),
            # "model": model.module.state_dict(),
            # ** The above two are equivalent **
        })
        session.report({"foo": "bar"}, ckpt)

Example

import torch
import torch.nn as nn

import ray
from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig
from ray.air.config import CheckpointConfig

# If using GPUs, set this to True.
use_gpu = False

# Define NN layers archicture, epochs, and number of workers
input_size = 1
layer_size = 32
output_size = 1
num_epochs = 200
num_workers = 3

# Define your network structure
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

# Define your train worker loop
def train_loop_per_worker():

    # Fetch training set from the session
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()

    # Loss function, optimizer, prepare model for training.
    # This moves the data and prepares model for distributed
    # execution
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(),
                lr=0.01,
                weight_decay=0.01)
    model = train.torch.prepare_model(model)

    # Iterate over epochs and batches
    for epoch in range(num_epochs):
        for batches in dataset_shard.iter_torch_batches(batch_size=32,
                    dtypes=torch.float, device=train.torch.get_device()):

            # Add batch or unsqueeze as an additional dimension [32, x]
            inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
            output = model(inputs)

            # Make output shape same as the as labels
            loss = loss_fn(output.squeeze(), labels)

            # Zero out grads, do backward, and update optimizer
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Print what's happening with loss per 30 epochs
            if epoch % 20 == 0:
                print(f"epoch: {epoch}/{num_epochs}, loss: {loss:.3f}")

        # Report and record metrics, checkpoint model at end of each
        # epoch
        session.report({"loss": loss.item(), "epoch": epoch},
                             checkpoint=Checkpoint.from_dict(
                             dict(epoch=epoch, model=model.state_dict()))
        )

torch.manual_seed(42)
train_dataset = ray.data.from_items(
    [{"x": x, "y": 2 * x + 1} for x in range(200)]
)

# Define scaling and run configs
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1))

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets={"train": train_dataset})

result = trainer.fit()

best_checkpoint_loss = result.metrics['loss']

# Assert loss is less 0.09
assert best_checkpoint_loss <= 0.09
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key β€œtrain” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]#

TorchPredictor#

class ray.train.torch.TorchCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]#

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with Torch-specific functionality.

Create this from a generic Checkpoint by calling TorchCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_state_dict(state_dict: Dict[str, Any], *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint[source]#

Create a Checkpoint that stores a model state dictionary.

Tip

This is the recommended method for creating TorchCheckpoints.

Parameters
  • state_dict – The model state dictionary to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TorchCheckpoint containing the specified state dictionary.

Examples

import torch
import torch.nn as nn
from ray.train.torch import TorchCheckpoint

# Set manual seed
torch.manual_seed(42)

# Function to create a NN model
def create_model() -> nn.Module:
    model = nn.Sequential(nn.Linear(1, 10),
            nn.ReLU(),
            nn.Linear(10,1))
    return model

# Create a TorchCheckpoint from our model's state_dict
model = create_model()
checkpoint = TorchCheckpoint.from_state_dict(model.state_dict())

# Now load the model from the TorchCheckpoint by providing the
# model architecture
model_from_chkpt = checkpoint.get_model(create_model())

# Assert they have the same state dict
assert str(model.state_dict()) == str(model_from_chkpt.state_dict())
print("worked")
classmethod from_model(model: torch.nn.modules.module.Module, *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint[source]#

Create a Checkpoint that stores a Torch model.

Note

PyTorch recommends storing state dictionaries. To create a TorchCheckpoint from a state dictionary, call from_state_dict(). To learn more about state dictionaries, read Saving and Loading Models. # noqa: E501

Parameters
  • model – The Torch model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TorchCheckpoint containing the specified model.

Examples

from ray.train.torch import TorchCheckpoint
from ray.train.torch import TorchPredictor
import torch

# Set manual seed
torch.manual_seed(42)

# Create model identity and send a random tensor to it
model = torch.nn.Identity()
input = torch.randn(2, 2)
output = model(input)

# Create a checkpoint
checkpoint = TorchCheckpoint.from_model(model)

# You can use a class TorchCheckpoint to create an
# a class ray.train.torch.TorchPredictor and perform inference.
predictor = TorchPredictor.from_checkpoint(checkpoint)
pred = predictor.predict(input.numpy())

# Convert prediction dictionary value into a tensor
pred = torch.tensor(pred['predictions'])

# Assert the output from the original and checkoint model are the same
assert torch.equal(output, pred)
print("worked")
get_model(model: Optional[torch.nn.modules.module.Module] = None) torch.nn.modules.module.Module[source]#

Retrieve the model stored in this checkpoint.

Parameters

model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model. Otherwise, the model will be discarded.

class ray.train.torch.TorchConfig(backend: Optional[str] = None, init_method: str = 'env', timeout_s: int = 1800)[source]#

Bases: ray.train.backend.BackendConfig

Configuration for torch process group setup.

See https://pytorch.org/docs/stable/distributed.html for more info.

Parameters
  • backend – The backend to use for training. See torch.distributed.init_process_group for more info and valid values. If set to None, nccl will be used if GPUs are requested, else gloo will be used.

  • init_method – The initialization method to use. Either β€œenv” for environment variable initialization or β€œtcp” for TCP initialization. Defaults to β€œenv”.

  • timeout_s – Seconds for process group operations to timeout.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.accelerate(amp: bool = False) None[source]#

Enables training optimizations.

Parameters

amp – If true, perform training with automatic mixed precision. Otherwise, use full precision.

Warning

train.torch.accelerate cannot be called more than once, and it must be called before any other train.torch utility function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.get_device() torch.device[source]#

Gets the correct torch device to use for training.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_model(model: torch.nn.modules.module.Module, move_to_device: bool = True, parallel_strategy: Optional[str] = 'ddp', parallel_strategy_kwargs: Optional[Dict[str, Any]] = None) torch.nn.modules.module.Module[source]#

Prepares the model for distributed execution.

This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).

Parameters
  • model (torch.nn.Module) – A torch model to prepare.

  • move_to_device – Whether to move the model to the correct device. If set to False, the model needs to manually be moved to the correct device.

  • parallel_strategy ("ddp", "fsdp", or None) – Whether to wrap models in DistributedDataParallel, FullyShardedDataParallel, or neither.

  • parallel_strategy_kwargs (Dict[str, Any]) – Args to pass into DistributedDataParallel or FullyShardedDataParallel initialization if parallel_strategy is set to β€œddp” or β€œfsdp”, respectively.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_optimizer(optimizer: torch.optim.optimizer.Optimizer) torch.optim.optimizer.Optimizer[source]#

Wraps optimizer to support automatic mixed precision.

Parameters

optimizer (torch.optim.Optimizer) – The DataLoader to prepare.

Returns

A wrapped optimizer.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_data_loader(data_loader: torch.utils.data.dataloader.DataLoader, add_dist_sampler: bool = True, move_to_device: bool = True, auto_transfer: bool = True) torch.utils.data.dataloader.DataLoader[source]#

Prepares DataLoader for distributed execution.

This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).

Parameters
  • data_loader (torch.utils.data.DataLoader) – The DataLoader to prepare.

  • add_dist_sampler – Whether to add a DistributedSampler to the provided DataLoader.

  • move_to_device – If set, automatically move the data returned by the data loader to the correct device.

  • auto_transfer – If set and device is GPU, another CUDA stream is created to automatically copy data from host (CPU) memory to device (GPU) memory (the default CUDA stream still runs the training procedure). If device is CPU, it will be disabled regardless of the setting. This configuration will be ignored if move_to_device is False.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.backward(tensor: torch.Tensor) None[source]#

Computes the gradient of the specified tensor w.r.t. graph leaves.

Parameters

tensor (torch.Tensor) – Tensor of which the derivative will be computed.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.enable_reproducibility(seed: int = 0) None[source]#

Limits sources of nondeterministic behavior.

This function:

  • Seeds PyTorch, Python, and NumPy.

  • Disables CUDA convolution benchmarking.

  • Configures PyTorch to use determinstic algorithms.

  • Seeds workers spawned for multi-process data loading.

Parameters

seed – The number to seed libraries and data workers with.

Warning

train.torch.enable_reproducibility() can’t guarantee completely reproducible results across executions. To learn more, read the PyTorch notes on randomness.

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.train.torch.TorchPredictor(model: torch.nn.modules.module.Module, preprocessor: Optional[Preprocessor] = None, use_gpu: bool = False)[source]#

Bases: ray.train._internal.dl_predictor.DLPredictor

A predictor for PyTorch models.

Parameters
  • model – The torch module to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None, use_gpu: bool = False) ray.train.torch.torch_predictor.TorchPredictor[source]#

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TorchTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TorchTrainer run.

  • model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model. If the checkpoint already contains the model itself, this model argument will be discarded.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

call_model(inputs: Union[torch.Tensor, Dict[str, torch.Tensor]]) Union[torch.Tensor, Dict[str, torch.Tensor]][source]#

Runs inference on a single batch of tensor data.

This method is called by TorchPredictor.predict after converting the original data batch to torch tensors.

Override this method to add custom logic for processing the model input or output.

Parameters

inputs – A batch of data to predict on, represented as either a single PyTorch tensor or for multi-input models, a dictionary of tensors.

Returns

The model outputs, either as a single tensor or a dictionary of tensors.

Example

# List outputs are not supported by default TorchPredictor.
# So let's define a custom TorchPredictor and override call_model
class MyModel(torch.nn.Module):
    def forward(self, input_tensor):
        return [input_tensor, input_tensor]

# Use a custom predictor to format model output as a dict.
class CustomPredictor(TorchPredictor):
    def call_model(self, inputs):
        model_output = super().call_model(inputs)
        return {
            str(i): model_output[i] for i in range(len(model_output))
        }

# create our data batch
data_batch = np.array([1, 2])
# create custom predictor and predict
predictor = CustomPredictor(model=MyModel())
predictions = predictor.predict(data_batch)
print(f"Predictions: {predictions.get('0')}, {predictions.get('1')}")
Predictions: [1 2], [1 2]

DeveloperAPI: This API may change across minor Ray releases.

predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], dtype: Optional[Union[torch.dtype, Dict[str, torch.dtype]]] = None) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]][source]#

Run inference on data batch.

If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single PyTorch tensor before being inputted to the model.

If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).

Parameters
  • data – A batch of input data of DataBatchType.

  • dtype – The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.

Returns

Prediction result. The return type will be the same as the

input type.

Return type

DataBatchType

Example

import numpy as np
import pandas as pd
import torch
import ray
from ray.train.torch import TorchPredictor

# Define a custom PyTorch module
class CustomModule(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.linear1 = torch.nn.Linear(1, 1)
        self.linear2 = torch.nn.Linear(1, 1)

    def forward(self, input_dict: dict):
        out1 = self.linear1(input_dict["A"].unsqueeze(1))
        out2 = self.linear2(input_dict["B"].unsqueeze(1))
        return out1 + out2

# Set manul seed so we get consistent output
torch.manual_seed(42)

# Use Standard PyTorch model
model = torch.nn.Linear(2, 1)
predictor = TorchPredictor(model=model)
# Define our data
data = np.array([[1, 2], [3, 4]])
predictions = predictor.predict(data, dtype=torch.float)
print(f"Standard model predictions: {predictions}")
print("---")

# Use Custom PyTorch model with TorchPredictor
predictor = TorchPredictor(model=CustomModule())
# Define our data and predict Customer model with TorchPredictor
data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
predictions = predictor.predict(data, dtype=torch.float)
print(f"Custom model predictions: {predictions}")
Standard model predictions: {'predictions': array([[1.5487633],
       [3.8037925]], dtype=float32)}
---
Custom model predictions:     predictions
0  [0.61623406]
1    [2.857038]

Horovod#

HorovodTrainer#

class ray.train.horovod.HorovodTrainer(*args, **kwargs)[source]#

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Horovod training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary Horovod setup already configured for distributed Horovod training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the β€œtrain” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

You could use TensorflowPredictor or TorchPredictor in conjunction with HorovodTrainer. You must save the model under the β€œmodel” kwarg in the Checkpoint passed to session.report(), so that it can be used by corresponding predictors.

Example:

import ray
import ray.train as train
import ray.train.torch. # Need this to use `train.torch.get_device()`
import horovod.torch as hvd
import torch
import torch.nn as nn
from ray.air import session
from ray.train.horovod import HorovodTrainer
from ray.train.torch import TorchCheckpoint
from ray.air.config import ScalingConfig

# If using GPUs, set this to True.
use_gpu = False

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)
    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    hvd.init()
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()
    device = train.torch.get_device()
    model.to(device)
    loss_fn = nn.MSELoss()
    lr_scaler = 1
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
        optimizer,
        named_parameters=model.named_parameters(),
        op=hvd.Average,
    )
    for epoch in range(num_epochs):
        model.train()
        for batch in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float, device=train.torch.get_device()
        ):
            inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
            inputs.to(device)
            labels.to(device)
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")
        session.report(
            {},
            checkpoint=TorchCheckpoint.from_state_dict(
                model.state_dict()
            ),
        )
train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • horovod_config – Configuration for setting up the Horovod backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key β€œtrain” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, horovod_config: Optional[ray.train.horovod.config.HorovodConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]#

HorovodConfig#

class ray.train.horovod.HorovodConfig(nics: Optional[Set[str]] = None, verbose: int = 1, key: Optional[str] = None, ssh_port: Optional[int] = None, ssh_identity_file: Optional[str] = None, ssh_str: Optional[str] = None, timeout_s: int = 300, placement_group_timeout_s: int = 100)[source]#

Bases: ray.train.backend.BackendConfig

Configurations for Horovod setup.

See https://github.com/horovod/horovod/blob/master/horovod/runner/common/util/settings.py # noqa: E501

Parameters
  • nics (Optional[Set[str]) – Network interfaces that can be used for communication.

  • verbose – Horovod logging verbosity.

  • key (Optional[str]) – Secret used for communication between workers.

  • ssh_port (Optional[int]) – Port for SSH server running on worker nodes.

  • ssh_identity_file (Optional[str]) – Path to the identity file to ssh into different hosts on the cluster.

  • ssh_str (Optional[str]) – CAUTION WHEN USING THIS. Private key file contents. Writes the private key to ssh_identity_file.

  • timeout_s – Timeout parameter for Gloo rendezvous.

  • placement_group_timeout_s – Timeout parameter for Ray Placement Group creation. Currently unused.

PublicAPI (beta): This API is in beta and may change before becoming stable.

HuggingFace#

HuggingFaceTrainer#

class ray.train.huggingface.HuggingFaceTrainer(*args, **kwargs)[source]#

Bases: ray.train.torch.torch_trainer.TorchTrainer

A Trainer for data parallel HuggingFace Transformers on PyTorch training.

This Trainer runs the transformers.Trainer.train() method on multiple Ray Actors. The training is carried out in a distributed fashion through PyTorch DDP. These actors already have the necessary torch process group already configured for distributed PyTorch training. If you have PyTorch >= 1.12.0 installed, you can also run FSDP training by specifying the fsdp argument in TrainingArguments. For more information on configuring FSDP, refer to Hugging Face documentation.

The training function ran on every Actor will first run the specified trainer_init_per_worker function to obtain an instantiated transformers.Trainer object. The trainer_init_per_worker function will have access to preprocessed train and evaluation datasets.

If the datasets dict contains a training dataset (denoted by the β€œtrain” key), then it will be split into multiple dataset shards, with each Actor training on a single shard. All the other datasets will not be split.

Please note that if you use a custom transformers.Trainer subclass, the get_train_dataloader method will be wrapped around to disable sharding by transformers.IterableDatasetShard, as the dataset will already be sharded on the Ray AIR side.

HuggingFace loggers will be automatically disabled, and the local_rank argument in TrainingArguments will be automatically set. Please note that if you want to use CPU training, you will need to set the no_cuda argument in TrainingArguments manually - otherwise, an exception (segfault) may be thrown.

This Trainer requires transformers>=4.19.0 package.

Example

# Based on
# huggingface/notebooks/examples/language_modeling_from_scratch.ipynb

# Hugging Face imports
from datasets import load_dataset
import transformers
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer

import ray
from ray.train.huggingface import HuggingFaceTrainer
from ray.air.config import ScalingConfig

# If using GPUs, set this to True.
use_gpu = False

model_checkpoint = "gpt2"
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
block_size = 128

datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)

def tokenize_function(examples):
    return tokenizer(examples["text"])

tokenized_datasets = datasets.map(
    tokenize_function, batched=True, num_proc=1, remove_columns=["text"]
)

def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {
        k: sum(examples[k], []) for k in examples.keys()
    }
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model
    # supported it.
    # instead of this drop, you can customize this part to your needs.
    total_length = (total_length // block_size) * block_size
    # Split by chunks of max_len.
    result = {
        k: [
            t[i : i + block_size]
            for i in range(0, total_length, block_size)
        ]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result

lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    batch_size=1000,
    num_proc=1,
)
ray_train_ds = ray.data.from_huggingface(lm_datasets["train"])
ray_evaluation_ds = ray.data.from_huggingface(
    lm_datasets["validation"]
)

def trainer_init_per_worker(train_dataset, eval_dataset, **config):
    model_config = AutoConfig.from_pretrained(model_checkpoint)
    model = AutoModelForCausalLM.from_config(model_config)
    args = transformers.TrainingArguments(
        output_dir=f"{model_checkpoint}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
        no_cuda=(not use_gpu),
    )
    return transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )

scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HuggingFaceTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "evaluation": ray_evaluation_ds},
)
result = trainer.fit()
Parameters
  • trainer_init_per_worker – The function that returns an instantiated transformers.Trainer object and takes in the following arguments: train Torch.Dataset, optional evaluation Torch.Dataset and config as kwargs. The Torch Datasets are automatically created by converting the Ray Datasets internally before they are passed into the function.

  • datasets – Any Ray Datasets to use for training. Use the key β€œtrain” to denote which dataset is the training dataset and (optionally) key β€œevaluation” to denote the evaluation dataset. Can only contain a training dataset and up to one extra dataset to be used for evaluation. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • trainer_init_config – Configurations to pass into trainer_init_per_worker as kwargs.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer. Same as in TorchTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

__init__(trainer_init_per_worker: Callable[[torch.utils.data.dataset.Dataset, Optional[torch.utils.data.dataset.Dataset], Any], transformers.trainer.Trainer], *, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], trainer_init_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]#

HuggingFacePredictor and HuggingFaceCheckpoint#

class ray.train.huggingface.HuggingFaceCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]#

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with HuggingFace-specific functionality.

Use HuggingFaceCheckpoint.from_model to create this type of checkpoint.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_model(model: Union[transformers.modeling_utils.PreTrainedModel, torch.nn.modules.module.Module], tokenizer: Optional[transformers.tokenization_utils.PreTrainedTokenizer] = None, *, path: os.PathLike, preprocessor: Optional[Preprocessor] = None) HuggingFaceCheckpoint[source]#

Create a Checkpoint that stores a HuggingFace model.

Parameters
  • model – The pretrained transformer or Torch model to store in the checkpoint.

  • tokenizer – The Tokenizer to use in the Transformers pipeline for inference.

  • path – The directory where the checkpoint will be stored.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A HuggingFaceCheckpoint containing the specified model.

get_model(model: Union[Type[transformers.modeling_utils.PreTrainedModel], torch.nn.modules.module.Module], **pretrained_model_kwargs) Union[transformers.modeling_utils.PreTrainedModel, torch.nn.modules.module.Module][source]#

Retrieve the model stored in this checkpoint.

get_tokenizer(tokenizer: Type[transformers.tokenization_utils.PreTrainedTokenizer], **kwargs) Optional[transformers.tokenization_utils.PreTrainedTokenizer][source]#

Create a tokenizer using the data stored in this checkpoint.

get_training_arguments() transformers.training_args.TrainingArguments[source]#

Retrieve the training arguments stored in this checkpoint.

class ray.train.huggingface.HuggingFacePredictor(pipeline: Optional[transformers.pipelines.base.Pipeline] = None, preprocessor: Optional[Preprocessor] = None, use_gpu: bool = False)[source]#

Bases: ray.train.predictor.Predictor

A predictor for HuggingFace Transformers PyTorch models.

This predictor uses Transformers Pipelines for inference.

Parameters
  • pipeline – The Transformers pipeline to use for inference.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, *, pipeline_cls: Optional[Type[transformers.pipelines.base.Pipeline]] = None, **pipeline_kwargs) ray.train.huggingface.huggingface_predictor.HuggingFacePredictor[source]#

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of HuggingFaceTrainer.

Parameters
  • checkpoint – The checkpoint to load the model, tokenizer and preprocessor from. It is expected to be from the result of a HuggingFaceTrainer run.

  • pipeline_cls – A transformers.pipelines.Pipeline class to use. If not specified, will use the pipeline abstraction wrapper.

  • **pipeline_kwargs – Any kwargs to pass to the pipeline initialization. If pipeline is None, this must contain the β€˜task’ argument. Cannot contain β€˜model’. Can be used to override the tokenizer with β€˜tokenizer’. If use_gpu is True, β€˜device’ will be set to 0 by default.

predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]][source]#

Run inference on data batch.

The data is converted into a list (unless pipeline is a TableQuestionAnsweringPipeline) and passed to the pipeline object.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, use all columns.

  • **pipeline_call_kwargs – additional kwargs to pass to the pipeline object. If use_gpu is True, β€˜device’ will be set to 0 by default.

Examples

>>> import pandas as pd
>>> from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
>>> from transformers.pipelines import pipeline
>>> from ray.train.huggingface import HuggingFacePredictor
>>>
>>> model_checkpoint = "gpt2"
>>> tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
>>> tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)
>>>
>>> model_config = AutoConfig.from_pretrained(model_checkpoint)
>>> model = AutoModelForCausalLM.from_config(model_config)
>>> predictor = HuggingFacePredictor(
...     pipeline=pipeline(
...         task="text-generation", model=model, tokenizer=tokenizer
...     )
... )
>>>
>>> prompts = pd.DataFrame(
...     ["Complete me", "And me", "Please complete"], columns=["sentences"]
... )
>>> predictions = predictor.predict(prompts)
Returns

Prediction result.

Scikit-Learn#

SklearnTrainer#

class ray.train.sklearn.SklearnTrainer(*args, **kwargs)[source]#

Bases: ray.train.base_trainer.BaseTrainer

A Trainer for scikit-learn estimator training.

This Trainer runs the fit method of the given estimator in a non-distributed manner on a single Ray Actor.

By default, the n_jobs (or thread_count) estimator parameters will be set to match the number of CPUs assigned to the Ray Actor. This behavior can be disabled by setting set_estimator_cpus=False.

If you wish to use GPU-enabled estimators (eg. cuML), make sure to set "GPU": 1 in scaling_config.trainer_resources.

The results are reported all at once and not in an iterative fashion. No checkpointing is done during training. This may be changed in the future.

Example:

import ray

from ray.train.sklearn import SklearnTrainer
from sklearn.ensemble import RandomForestRegressor

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = SklearnTrainer(
    estimator=RandomForestRegressor(),
    label_column="y",
    scaling_config=ray.air.config.ScalingConfig(
        trainer_resources={"CPU": 4}
    ),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • estimator – A scikit-learn compatible estimator to use.

  • datasets – Ray Datasets to use for training and validation. Must include a β€œtrain” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting separate metrics.

  • label_column – Name of the label column. A column with this name must be present in the training dataset. If None, no validation will be performed.

  • params – Optional dict of params to be set on the estimator before fitting. Useful for hyperparameter tuning.

  • scoring –

    Strategy to evaluate the performance of the model on the validation sets and for cross-validation. Same as in sklearn.model_selection.cross_validation. If scoring represents a single score, one can use:

    • a single string;

    • a callable that returns a single value.

    If scoring represents multiple scores, one can use:

    • a list or tuple of unique strings;

    • a callable returning a dictionary where the keys are the metric names and the values are the metric scores;

    • a dictionary with metric names as keys and callables a values.

  • cv –

    Determines the cross-validation splitting strategy. If specified, cross-validation will be run on the train dataset, in addition to computing metrics for validation datasets. Same as in sklearn.model_selection.cross_validation, with the exception of None. Possible inputs for cv are:

    • None, to skip cross-validation.

    • int, to specify the number of folds in a (Stratified)KFold,

    • CV splitter,

    • An iterable yielding (train, test) splits as arrays of indices.

    For int/None inputs, if the estimator is a classifier and y is either binary or multiclass, StratifiedKFold is used. In all other cases, KFold is used. These splitters are instantiated with shuffle=False so the splits will be the same across calls.

    If you provide a β€œcv_groups” column in the train dataset, it will be used as group labels for the samples used while splitting the dataset into train/test set. Only used in conjunction with a β€œGroup” cv instance (e.g., GroupKFold). This corresponds to the groups argument in sklearn.model_selection.cross_validation.

  • return_train_score_cv – Whether to also return train scores during cross-validation. Ignored if cv is None.

  • parallelize_cv – If set to True, will parallelize cross-validation instead of the estimator. If set to None, will detect if the estimator has any parallelism-related params (n_jobs or thread_count) and parallelize cross-validation if there are none. If False, will not parallelize cross-validation. Cannot be set to True if there are any GPUs assigned to the trainer. Ignored if cv is None.

  • set_estimator_cpus – If set to True, will automatically set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to match the number of available CPUs.

  • scaling_config – Configuration for how to scale training. Only the trainer_resources key can be provided, as the training is not distributed.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • **fit_params – Additional kwargs passed to estimator.fit() method.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

__init__(*, estimator: sklearn.base.BaseEstimator, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: Optional[str] = None, params: Optional[Dict[str, Any]] = None, scoring: Optional[Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float], Iterable[Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float]]], Dict[str, Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float]]]]] = None, cv: Optional[Union[int, Iterable, sklearn.model_selection._split.BaseCrossValidator]] = None, return_train_score_cv: bool = False, parallelize_cv: Optional[bool] = None, set_estimator_cpus: bool = True, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, **fit_params)[source]#
training_loop() None[source]#

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})

SklearnPredictor and SklearnCheckpoint#

class ray.train.sklearn.SklearnCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]#

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with sklearn-specific functionality.

Create this from a generic Checkpoint by calling SklearnCheckpoint.from_checkpoint(ckpt)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_estimator(estimator: sklearn.base.BaseEstimator, *, path: os.PathLike, preprocessor: Optional[Preprocessor] = None) SklearnCheckpoint[source]#

Create a Checkpoint that stores an sklearn Estimator.

Parameters
  • estimator – The Estimator to store in the checkpoint.

  • path – The directory where the checkpoint will be stored.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An SklearnCheckpoint containing the specified Estimator.

Examples

>>> from ray.train.sklearn import SklearnCheckpoint
>>> from sklearn.ensemble import RandomForestClassifier
>>>
>>> estimator = RandomForestClassifier()
>>> checkpoint = SklearnCheckpoint.from_estimator(estimator, path=".")

You can use a SklearnCheckpoint to create an SklearnPredictor and preform inference.

>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> predictor = SklearnPredictor.from_checkpoint(checkpoint)
get_estimator() sklearn.base.BaseEstimator[source]#

Retrieve the Estimator stored in this checkpoint.

class ray.train.sklearn.SklearnPredictor(estimator: sklearn.base.BaseEstimator, preprocessor: Optional[Preprocessor] = None)[source]#

Bases: ray.train.predictor.Predictor

A predictor for scikit-learn compatible estimators.

Parameters
  • estimator – The fitted scikit-learn compatible estimator to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.sklearn.sklearn_predictor.SklearnPredictor[source]#

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of SklearnTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a SklearnTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, num_estimator_cpus: Optional[int] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]][source]#

Run inference on data batch.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • num_estimator_cpus – If set to a value other than None, will set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to the given value.

  • **predict_kwargs – Keyword arguments passed to estimator.predict.

Examples

>>> import numpy as np
>>> from sklearn.ensemble import RandomForestClassifier
>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = RandomForestClassifier().fit(train_X, train_y)
>>> predictor = SklearnPredictor(estimator=model)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictions = predictor.predict(data, feature_columns=[0, 1])
>>> import pandas as pd
>>> from sklearn.ensemble import RandomForestClassifier
>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = RandomForestClassifier().fit(train_X, train_y)
>>> predictor = SklearnPredictor(estimator=model)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Mosaic#

MosaicTrainer#

class ray.train.mosaic.MosaicTrainer(*args, **kwargs)[source]#

Bases: ray.train.torch.torch_trainer.TorchTrainer

A Trainer for data parallel Mosaic Composers on PyTorch training.

This Trainer runs the composer.trainer.Trainer.fit() method on multiple Ray Actors. The training is carried out in a distributed fashion through PyTorch DDP. These actors already have the necessary torch process group already configured for distributed PyTorch training.

The training function ran on every Actor will first run the specified trainer_init_per_worker function to obtain an instantiated composer.Trainer object. The trainer_init_per_worker function will have access to preprocessed train and evaluation datasets.

Example

>>> import torch.utils.data  
>>> import torchvision  
>>> from torchvision import transforms, datasets  
>>>
>>> from composer.models.tasks import ComposerClassifier 
>>> import composer.optim 
>>> from composer.algorithms import LabelSmoothing 
>>>
>>> import ray
>>> from ray.air.config import ScalingConfig
>>> import ray.train as train
>>> from ray.air import session
>>> from ray.train.mosaic import MosaicTrainer 
>>>
>>> def trainer_init_per_worker(config):
...     # prepare the model for distributed training and wrap with
...     # ComposerClassifier for Composer Trainer compatibility
...     model = torchvision.models.resnet18(num_classes=10)
...     model = ComposerClassifier(ray.train.torch.prepare_model(model))
...
...     # prepare train/test dataset
...     mean = (0.507, 0.487, 0.441)
...     std = (0.267, 0.256, 0.276)
...     cifar10_transforms = transforms.Compose(
...         [transforms.ToTensor(), transforms.Normalize(mean, std)]
...     )
...     data_directory = "~/data"
...     train_dataset = datasets.CIFAR10(
...         data_directory,
...         train=True,
...         download=True,
...         transform=cifar10_transforms
...     )
...
...     # prepare train dataloader
...     batch_size_per_worker = BATCH_SIZE // session.get_world_size()
...     train_dataloader = torch.utils.data.DataLoader(
...         train_dataset,
...         batch_size=batch_size_per_worker
...     )
...     train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
...
...     # prepare optimizer
...     optimizer = composer.optim.DecoupledSGDW(
...         model.parameters(),
...         lr=0.05,
...         momentum=0.9,
...         weight_decay=2.0e-3,
...     )
...
...     return composer.trainer.Trainer(
...         model=model,
...         train_dataloader=train_dataloader,
...         optimizers=optimizer,
...         **config
...     )
...
>>> scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
>>> trainer_init_config = {
...     "max_duration": "1ba",
...     "algorithms": [LabelSmoothing()],
... } 
...
>>> trainer = MosaicTrainer(
...     trainer_init_per_worker=trainer_init_per_worker,
...     trainer_init_config=trainer_init_config,
...     scaling_config=scaling_config,
... ) 
...
>>> trainer.fit() 
Parameters
  • trainer_init_per_worker – The function that returns an instantiated composer.Trainer object and takes in configuration dictionary (config) as an argument. This dictionary is based on trainer_init_config and is modified for Ray - Composer integration.

  • datasets – Any Ray Datasets to use for training. At the moment, we do not support passing datasets to the trainer and using the dataset shards in the trainer loop. Instead, configure and load the datasets inside trainer_init_per_worker function

  • trainer_init_config – Configurations to pass into trainer_init_per_worker as kwargs. Although the kwargs can be hard-coded in the trainer_init_per_worker, using the config allows the flexibility of reusing the same worker init function while changing the trainer arguments. For example, when hyperparameter tuning you can reuse the same trainer_init_per_worker function with different hyperparameter values rather than having multiple trainer_init_per_worker functions with different hard-coded hyperparameter values.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer. Same as in TorchTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A MosiacCheckpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

__init__(trainer_init_per_worker: Callable[[Optional[Dict]], <MagicMock name='mock.Trainer' id='139942128448272'>], *, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, trainer_init_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]#

Reinforcement Learning with RLlib#

RLTrainer#

RLPredictor and RLCheckpoint#

Base Classes (Developer APIs)#

class ray.train.trainer.BaseTrainer(*args, **kwargs)[source]

Defines interface for distributed training on Ray.

Note: The base BaseTrainer class cannot be instantiated directly. Only one of its subclasses can be used.

How does a trainer work?

  • First, initialize the Trainer. The initialization runs locally, so heavyweight setup should not be done in __init__.

  • Then, when you call trainer.fit(), the Trainer is serialized and copied to a remote Ray actor. The following methods are then called in sequence on the remote actor.

  • trainer.setup(): Any heavyweight Trainer setup should be specified here.

  • trainer.preprocess_datasets(): The provided ray.data.Dataset are preprocessed with the provided ray.data.Preprocessor.

  • trainer.train_loop(): Executes the main training logic.

  • Calling trainer.fit() will return a ray.result.Result object where you can access metrics from your training run, as well as any checkpoints that may have been saved.

How do I create a new Trainer?

Subclass ray.train.trainer.BaseTrainer, and override the training_loop method, and optionally setup.

import torch

from ray.train.trainer import BaseTrainer
from ray import tune
from ray.air import session


class MyPytorchTrainer(BaseTrainer):
    def setup(self):
        self.model = torch.nn.Linear(1, 1)
        self.optimizer = torch.optim.SGD(
            self.model.parameters(), lr=0.1)

    def training_loop(self):
        # You can access any Trainer attributes directly in this method.
        # self.datasets["train"] has already been
        # preprocessed by self.preprocessor
        dataset = self.datasets["train"]

        torch_ds = dataset.iter_torch_batches(dtypes=torch.float)
        loss_fn = torch.nn.MSELoss()

        for epoch_idx in range(10):
            loss = 0
            num_batches = 0
            for batch in torch_ds:
                X, y = torch.unsqueeze(batch["x"], 1), batch["y"]
                # Compute prediction error
                pred = self.model(X)
                batch_loss = loss_fn(pred, y)

                # Backpropagation
                self.optimizer.zero_grad()
                batch_loss.backward()
                self.optimizer.step()

                loss += batch_loss.item()
                num_batches += 1
            loss /= num_batches

            # Use Tune functions to report intermediate
            # results.
            session.report({"loss": loss, "epoch": epoch_idx})

How do I use an existing Trainer or one of my custom Trainers?

Initialize the Trainer, and call Trainer.fit()

import ray
train_dataset = ray.data.from_items(
    [{"x": i, "y": i} for i in range(3)])
my_trainer = MyPytorchTrainer(datasets={"train": train_dataset})
result = my_trainer.fit()
Parameters
  • scaling_config – Configuration for how to scale training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key β€œtrain” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

__init__(*, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note

This method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the β€œtrain” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

abstract training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
fit() ray.air.result.Result[source]

Runs training.

Returns

A Result object containing the training result.

Raises
  • TrainingFailedError – If any failures during the execution of

  • self.as_trainable()` –

PublicAPI (beta): This API is in beta and may change before becoming stable.

as_trainable() Type[Trainable][source]

Convert self to a tune.Trainable class.

class ray.train.data_parallel_trainer.DataParallelTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

A Trainer for data parallel training.

You should subclass this Trainer if your Trainer follows SPMD (single program, multiple data) programming paradigm - you want multiple processes to run the same function, but on different data.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the β€œtrain” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

How do I use DataParallelTrainer or any of its subclasses?

Example:

import ray
from ray.air import session

def train_loop_for_worker():
    dataset_shard_for_this_worker = session.get_dataset_shard("train")

    assert len(dataset_shard_for_this_worker) == 1

train_dataset = ray.data.from_items([1, 2, 3])
assert len(train_dataset) == 3
trainer = DataParallelTrainer(
    ray.air.config.ScalingConfig(num_workers=3),
    datasets={"train": train_dataset},
)
result = trainer.fit()

How do I develop on top of DataParallelTrainer?

In many cases, using DataParallelTrainer directly is sufficient to execute functions on multiple actors.

However, you may want to subclass DataParallelTrainer and create a custom Trainer for the following 2 use cases:

  • Use Case 1: You want to do data parallel training, but want to have a predefined training_loop_per_worker.

  • Use Case 2: You want to implement a custom Backend that automatically handles additional setup or teardown logic on each actor, so that the users of this new trainer do not have to implement this logic. For example, a TensorflowTrainer can be built on top of DataParallelTrainer that automatically handles setting the proper environment variables for distributed Tensorflow on each actor.

For 1, you can set a predefined training loop in __init__

from ray.train.data_parallel_trainer import DataParallelTrainer

class MyDataParallelTrainer(DataParallelTrainer):
    def __init__(self, *args, **kwargs):
        predefined_train_loop_per_worker = lambda: 1
        super().__init__(predefined_train_loop_per_worker, *args, **kwargs)

For 2, you can implement the ray.train.Backend and ray.train.BackendConfig interfaces.

from dataclasses import dataclass
from ray.train.backend import Backend, BackendConfig

class MyBackend(Backend):
    def on_start(self, worker_group, backend_config):
        def set_env_var(env_var_value):
            import os
            os.environ["MY_ENV_VAR"] = env_var_value

        worker_group.execute(set_env_var, backend_config.env_var)

@dataclass
class MyBackendConfig(BackendConfig):
    env_var: str = "default_value"

    def backend_cls(self):
        return MyBackend

class MyTrainer(DataParallelTrainer):
    def __init__(self, train_loop_per_worker, my_backend_config:
        MyBackendConfig, **kwargs):

        super().__init__(
            train_loop_per_worker,
            backend_config=my_backend_config, **kwargs)
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • backend_config – Configuration for setting up a Backend (e.g. Torch, Tensorflow, Horovod) on each worker to enable distributed communication. If no Backend should be set up, then set this to None.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest. This is merged with the default dataset config for the given trainer (cls._dataset_config).

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key β€œtrain” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, backend_config: Optional[ray.train.backend.BackendConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the β€œtrain” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
get_dataset_config() Dict[str, ray.air.config.DatasetConfig][source]

Return a copy of this Trainer’s final dataset configs.

Returns

The merged default + user-supplied dataset config.

class ray.train.gbdt_trainer.GBDTTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

Abstract class for scaling gradient-boosting decision tree (GBDT) frameworks.

Inherited by XGBoostTrainer and LightGBMTrainer.

Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a β€œtrain” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – Framework specific training parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to framework train() function.

DeveloperAPI: This API may change across minor Ray releases.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)[source]
preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the β€œtrain” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
class ray.train.backend.Backend(*args, **kwargs)[source]#

Singleton for distributed communication backend.

share_cuda_visible_devices#

If True, each worker process will have CUDA_VISIBLE_DEVICES set as the visible device IDs of all workers on the same node for this training instance. If False, each worker will have CUDA_VISIBLE_DEVICES set to the device IDs allocated by Ray for that worker.

Type

bool

DeveloperAPI: This API may change across minor Ray releases.

on_start(worker_group: ray.train._internal.worker_group.WorkerGroup, backend_config: ray.train.backend.BackendConfig)[source]#

Logic for starting this backend.

on_shutdown(worker_group: ray.train._internal.worker_group.WorkerGroup, backend_config: ray.train.backend.BackendConfig)[source]#

Logic for shutting down the backend.

static encode_data(data_dict: Dict) ray.train.backend.EncodedData[source]#

Logic to encode a data dict before sending to the driver.

This function will be called on the workers for any data that is sent to the driver via session.report().

static decode_data(encoded_data: ray.train.backend.EncodedData) Dict[source]#

Logic to decode an encoded data dict.

This function will be called on the driver after receiving the encoded data dict from the worker.

class ray.train.backend.BackendConfig[source]#

Parent class for configurations of training backend.

DeveloperAPI: This API may change across minor Ray releases.