Ray AIR API

Components

Preprocessor

class ray.data.preprocessor.Preprocessor[source]

Implements an ML preprocessing operation.

Preprocessors are stateful objects that can be fitted against a Dataset and used to transform both local data batches and distributed datasets. For example, a Normalization preprocessor may calculate the mean and stdev of a field during fitting, and uses these attributes to implement its normalization transform.

Preprocessors can also be stateless and transform data without needed to be fitted. For example, a preprocessor may simply remove a column, which does not require any state to be fitted.

If you are implementing your own Preprocessor sub-class, you should override the following:

  • _fit if your preprocessor is stateful. Otherwise, set _is_fittable=False.

  • _transform_pandas and/or _transform_numpy for best performance, implement both. Otherwise, the data will be converted to the match the implemented method.

PublicAPI (beta): This API is in beta and may change before becoming stable.

class FitStatus(value)[source]

The fit status of preprocessor.

transform_stats() Optional[str][source]

Return Dataset stats for the most recent transform call, if any.

fit(dataset: ray.data.dataset.Dataset) ray.data.preprocessor.Preprocessor[source]

Fit this Preprocessor to the Dataset.

Fitted state attributes will be directly set in the Preprocessor.

Calling it more than once will overwrite all previously fitted state: preprocessor.fit(A).fit(B) is equivalent to preprocessor.fit(B).

Parameters

dataset – Input dataset.

Returns

The fitted Preprocessor with state attributes.

Return type

Preprocessor

fit_transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Fit this Preprocessor to the Dataset and then transform the Dataset.

Calling it more than once will overwrite all previously fitted state: preprocessor.fit_transform(A).fit_transform(B) is equivalent to preprocessor.fit_transform(B).

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset[source]

Transform the given dataset.

Parameters

dataset – Input Dataset.

Returns

The transformed Dataset.

Return type

ray.data.Dataset

Raises

PreprocessorNotFittedException – if fit is not called yet.

transform_batch(data: DataBatchType) DataBatchType[source]

Transform a single batch of data.

The data will be converted to the format supported by the Preprocessor, based on which _transform_* method(s) are implemented.

Parameters

data – Input data batch.

Returns

The transformed data batch. This may differ from the input type depending on which _transform_* method(s) are implemented.

Return type

DataBatchType

Generic Preprocessors

class ray.data.preprocessors.BatchMapper(fn: Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[Union[numpy.ndarray, Dict[str, numpy.ndarray]]], Union[numpy.ndarray, Dict[str, numpy.ndarray]]]], batch_format: Optional[str] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

Apply an arbitrary operation to a dataset.

BatchMapper applies a user-defined function to batches of a dataset. A batch is a Pandas DataFrame that represents a small amount of data. By modifying batches instead of individual records, this class can efficiently transform a dataset with vectorized operations.

Use this preprocessor to apply stateless operations that aren’t already built-in.

Tip

BatchMapper doesn’t need to be fit. You can call transform without calling fit.

Examples

Use BatchMapper to apply arbitrary operations like dropping a column.

>>> import pandas as pd
>>> import numpy as np
>>> from typing import Dict
>>> import ray
>>> from ray.data.preprocessors import BatchMapper
>>>
>>> df = pd.DataFrame({"X": [0, 1, 2], "Y": [3, 4, 5]})
>>> ds = ray.data.from_pandas(df)  
>>>
>>> def fn(batch: pd.DataFrame) -> pd.DataFrame:
...     return batch.drop("Y", axis="columns")
>>>
>>> preprocessor = BatchMapper(fn)
>>> preprocessor.transform(ds)  
Dataset(num_blocks=1, num_rows=3, schema={X: int64})
>>>
>>> def fn_numpy(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
...     return {"X": batch["X"]}
>>> preprocessor = BatchMapper(fn_numpy, batch_format="numpy")
>>> preprocessor.transform(ds)  
Dataset(num_blocks=1, num_rows=3, schema={X: int64})
Parameters
  • fn – The function to apply to data batches.

  • batch_format – The preferred batch format to use in UDF. If not given, we will infer based on the input dataset data format.

class ray.data.preprocessors.Chain(*preprocessors: ray.data.preprocessor.Preprocessor)[source]

Bases: ray.data.preprocessor.Preprocessor

Combine multiple preprocessors into a single Preprocessor.

When you call fit, each preprocessor is fit on the dataset produced by the preceeding preprocessor’s fit_transform.

Example

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import *
>>>
>>> df = pd.DataFrame({
...     "X0": [0, 1, 2],
...     "X1": [3, 4, 5],
...     "Y": ["orange", "blue", "orange"],
... })
>>> ds = ray.data.from_pandas(df)  
>>>
>>> preprocessor = Chain(
...     StandardScaler(columns=["X0", "X1"]),
...     Concatenator(include=["X0", "X1"], output_column_name="X"),
...     LabelEncoder(label_column="Y")
... )
>>> preprocessor.fit_transform(ds).to_pandas()  
   Y                                         X
0  1  [-1.224744871391589, -1.224744871391589]
1  0                                [0.0, 0.0]
2  1    [1.224744871391589, 1.224744871391589]
Parameters

preprocessors – The preprocessors to sequentially compose.

class ray.data.preprocessors.Concatenator(output_column_name: str = 'concat_out', include: Optional[List[str]] = None, exclude: Optional[List[str]] = None, dtype: Optional[numpy.dtype] = None, raise_if_missing: bool = False)[source]

Bases: ray.data.preprocessor.Preprocessor

Combine numeric columns into a column of type TensorDtype.

This preprocessor concatenates numeric columns and stores the result in a new column. The new column contains TensorArrayElement objects of shape \((m,)\), where \(m\) is the number of columns concatenated. The \(m\) concatenated columns are dropped after concatenation.

Examples

>>> import numpy as np
>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import Concatenator

Concatenator combines numeric columns into a column of TensorDtype.

>>> df = pd.DataFrame({"X0": [0, 3, 1], "X1": [0.5, 0.2, 0.9]})
>>> ds = ray.data.from_pandas(df)  
>>> concatenator = Concatenator()
>>> concatenator.fit_transform(ds).to_pandas()  
   concat_out
0  [0.0, 0.5]
1  [3.0, 0.2]
2  [1.0, 0.9]

By default, the created column is called "concat_out", but you can specify a different name.

>>> concatenator = Concatenator(output_column_name="tensor")
>>> concatenator.fit_transform(ds).to_pandas()  
       tensor
0  [0.0, 0.5]
1  [3.0, 0.2]
2  [1.0, 0.9]

Sometimes, you might not want to concatenate all of of the columns in your dataset. In this case, you can exclude columns with the exclude parameter.

>>> df = pd.DataFrame({"X0": [0, 3, 1], "X1": [0.5, 0.2, 0.9], "Y": ["blue", "orange", "blue"]})
>>> ds = ray.data.from_pandas(df)  
>>> concatenator = Concatenator(exclude=["Y"])
>>> concatenator.fit_transform(ds).to_pandas()  
        Y  concat_out
0    blue  [0.0, 0.5]
1  orange  [3.0, 0.2]
2    blue  [1.0, 0.9]

Alternatively, you can specify which columns to concatenate with the include parameter.

>>> concatenator = Concatenator(include=["X0", "X1"])
>>> concatenator.fit_transform(ds).to_pandas()  
        Y  concat_out
0    blue  [0.0, 0.5]
1  orange  [3.0, 0.2]
2    blue  [1.0, 0.9]

Note that if a column is in both include and exclude, the column is excluded.

>>> concatenator = Concatenator(include=["X0", "X1", "Y"], exclude=["Y"])
>>> concatenator.fit_transform(ds).to_pandas()  
        Y  concat_out
0    blue  [0.0, 0.5]
1  orange  [3.0, 0.2]
2    blue  [1.0, 0.9]

By default, the concatenated tensor is a dtype common to the input columns. However, you can also explicitly set the dtype with the dtype parameter.

>>> concatenator = Concatenator(include=["X0", "X1"], dtype=np.float32)
>>> concatenator.fit_transform(ds)  
Dataset(num_blocks=1, num_rows=3, schema={Y: object, concat_out: TensorDtype(shape=(2,), dtype=float32)})
Parameters
  • output_column_name – The desired name for the new column. Defaults to "concat_out".

  • include – A list of columns to concatenate. If None, all columns are concatenated.

  • exclude – A list of column to exclude from concatenation. If a column is in both include and exclude, the column is excluded from concatenation.

  • dtype – The dtype to convert the output tensors to. If unspecified, the dtype is determined by standard coercion rules.

  • raise_if_missing – If True, an error is raised if any of the columns in include or exclude don’t exist. Defaults to False.

Raises

ValueError – if raise_if_missing is True and a column in include or exclude doesn’t exist in the dataset.

class ray.data.preprocessors.SimpleImputer(columns: List[str], strategy: str = 'mean', fill_value: Optional[Union[str, numbers.Number]] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

Replace missing values with imputed values.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import SimpleImputer
>>> df = pd.DataFrame({"X": [0, None, 3, 3], "Y": [None, "b", "c", "c"]})
>>> ds = ray.data.from_pandas(df)  
>>> ds.to_pandas()  
     X     Y
0  0.0  None
1  NaN     b
2  3.0     c
3  3.0     c

The "mean" strategy imputes missing values with the mean of non-missing values. This strategy doesn’t work with categorical data.

>>> preprocessor = SimpleImputer(columns=["X"], strategy="mean")
>>> preprocessor.fit_transform(ds).to_pandas()  
     X     Y
0  0.0  None
1  2.0     b
2  3.0     c
3  3.0     c

The "most_frequent" strategy imputes missing values with the most frequent value in each column.

>>> preprocessor = SimpleImputer(columns=["X", "Y"], strategy="most_frequent")
>>> preprocessor.fit_transform(ds).to_pandas()  
     X  Y
0  0.0  c
1  3.0  b
2  3.0  c
3  3.0  c

The "constant" strategy imputes missing values with the value specified by fill_value.

>>> preprocessor = SimpleImputer(
...     columns=["Y"],
...     strategy="constant",
...     fill_value="?",
... )
>>> preprocessor.fit_transform(ds).to_pandas()  
     X  Y
0  0.0  ?
1  NaN  b
2  3.0  c
3  3.0  c
Parameters
  • columns – The columns to apply imputation to.

  • strategy

    How imputed values are chosen.

    • "mean": The mean of non-missing values. This strategy only works with numeric columns.

    • "most_frequent": The most common value.

    • "constant": The value passed to fill_value.

  • fill_value – The value to use when strategy is "constant".

Raises

ValueError – if strategy is not "mean", "most_frequent", or "constant".

Dataset.train_test_split(test_size: Union[int, float], *, shuffle: bool = False, seed: Optional[int] = None) Tuple[ray.data.dataset.Dataset[ray.data.block.T], ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset into train and test subsets.

Examples

>>> import ray
>>> ds = ray.data.range(8)
>>> train, test = ds.train_test_split(test_size=0.25)
>>> train.take()
[0, 1, 2, 3, 4, 5]
>>> test.take()
[6, 7]
Parameters
  • test_size – If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. The train split will always be the compliment of the test split.

  • shuffle – Whether or not to globally shuffle the dataset before splitting. Defaults to False. This may be a very expensive operation with large datasets.

  • seed – Fix the random seed to use for shuffle, otherwise one will be chosen based on system randomness. Ignored if shuffle=False.

Returns

Train and test subsets as two Datasets.

Categorical Encoders

class ray.data.preprocessors.Categorizer(columns: List[str], dtypes: Optional[Dict[str, pandas.core.dtypes.dtypes.CategoricalDtype]] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

Convert columns to pd.CategoricalDtype.

Use this preprocessor with frameworks that have built-in support for pd.CategoricalDtype like LightGBM.

Warning

If you don’t specify dtypes, fit this preprocessor before splitting your dataset into train and test splits. This ensures categories are consistent across splits.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import Categorizer
>>>
>>> df = pd.DataFrame(
... {
...     "sex": ["male", "female", "male", "female"],
...     "level": ["L4", "L5", "L3", "L4"],
... })
>>> ds = ray.data.from_pandas(df)  
>>> categorizer = Categorizer(columns=["sex", "level"])
>>> categorizer.fit_transform(ds).schema().types  
[CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5'], ordered=False)]

If you know the categories in advance, you can specify the categories with the dtypes parameter.

>>> categorizer = Categorizer(
...     columns=["sex", "level"],
...     dtypes={"level": pd.CategoricalDtype(["L3", "L4", "L5", "L6"], ordered=True)},
... )
>>> categorizer.fit_transform(ds).schema().types  
[CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5', 'L6'], ordered=True)]
Parameters
  • columns – The columns to convert to pd.CategoricalDtype.

  • dtypes – An optional dictionary that maps columns to pd.CategoricalDtype objects. If you don’t include a column in dtypes, the categories are inferred.

class ray.data.preprocessors.LabelEncoder(label_column: str)[source]

Bases: ray.data.preprocessor.Preprocessor

Encode labels as integer targets.

LabelEncoder encodes labels as integer targets that range from \(0\) to \(n - 1\), where \(n\) is the number of unique labels.

If you transform a label that isn’t in the fitted datset, then the label is encoded as float("nan").

Examples

>>> import pandas as pd
>>> import ray
>>> df = pd.DataFrame({
...     "sepal_width": [5.1, 7, 4.9, 6.2],
...     "sepal_height": [3.5, 3.2, 3, 3.4],
...     "species": ["setosa", "versicolor", "setosa", "virginica"]
... })
>>> ds = ray.data.from_pandas(df)  
>>>
>>> from ray.data.preprocessors import LabelEncoder
>>> encoder = LabelEncoder(label_column="species")
>>> encoder.fit_transform(ds).to_pandas()  
   sepal_width  sepal_height  species
0          5.1           3.5        0
1          7.0           3.2        1
2          4.9           3.0        0
3          6.2           3.4        2

If you transform a label not present in the original dataset, then the new label is encoded as float("nan").

>>> df = pd.DataFrame({
...     "sepal_width": [4.2],
...     "sepal_height": [2.7],
...     "species": ["bracteata"]
... })
>>> ds = ray.data.from_pandas(df)  
>>> encoder.transform(ds).to_pandas()  
   sepal_width  sepal_height  species
0          4.2           2.7      NaN
Parameters

label_column – A column containing labels that you want to encode.

See also

OrdinalEncoder

If you’re encoding ordered features, use OrdinalEncoder instead of LabelEncoder.

class ray.data.preprocessors.MultiHotEncoder(columns: List[str], *, max_categories: Optional[Dict[str, int]] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

Multi-hot encode categorical data.

This preprocessor replaces each list of categories with an \(m\)-length binary list, where \(m\) is the number of unique categories in the column or the value specified in max_categories. The \(i\text{-th}\) element of the binary list is \(1\) if category \(i\) is in the input list and \(0\) otherwise.

Columns must contain hashable objects or lists of hashable objects. Also, you can’t have both types in the same column.

Note

The logic is similar to scikit-learn’s MultiLabelBinarizer.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import MultiHotEncoder
>>>
>>> df = pd.DataFrame({
...     "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
...     "genre": [
...         ["comedy", "action", "sports"],
...         ["animation", "comedy",  "action"],
...         ["documentary"],
...     ],
... })
>>> ds = ray.data.from_pandas(df)  
>>>
>>> encoder = MultiHotEncoder(columns=["genre"])
>>> encoder.fit_transform(ds).to_pandas()  
                            name            genre
0                 Shaolin Soccer  [1, 0, 1, 0, 1]
1                          Moana  [1, 1, 1, 0, 0]
2  The Smartest Guys in the Room  [0, 0, 0, 1, 0]

If you specify max_categories, then MultiHotEncoder creates features for only the most frequent categories.

>>> encoder = MultiHotEncoder(columns=["genre"], max_categories={"genre": 3})
>>> encoder.fit_transform(ds).to_pandas()  
                            name      genre
0                 Shaolin Soccer  [1, 1, 1]
1                          Moana  [1, 1, 0]
2  The Smartest Guys in the Room  [0, 0, 0]
>>> encoder.stats_  
OrderedDict([('unique_values(genre)', {'comedy': 0, 'action': 1, 'sports': 2})])
Parameters
  • columns – The columns to separately encode.

  • max_categories – The maximum number of features to create for each column. If a value isn’t specified for a column, then a feature is created for every unique category in that column.

See also

OneHotEncoder

If you’re encoding individual categories instead of lists of categories, use OneHotEncoder.

OrdinalEncoder

If your categories are ordered, you may want to use OrdinalEncoder.

class ray.data.preprocessors.OneHotEncoder(columns: List[str], *, max_categories: Optional[Dict[str, int]] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

One-hot encode categorical data.

This preprocessor creates a column named {column}_{category} for each unique {category} in {column}. The value of a column is 1 if the category matches and 0 otherwise.

If you encode an infrequent category (see max_categories) or a category that isn’t in the fitted dataset, then the category is encoded as all 0s.

Columns must contain hashable objects or lists of hashable objects.

Note

Lists are treated as categories. If you want to encode individual list elements, use MultiHotEncoder.

Example

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import OneHotEncoder
>>>
>>> df = pd.DataFrame({"color": ["red", "green", "red", "red", "blue", "green"]})
>>> ds = ray.data.from_pandas(df)  
>>> encoder = OneHotEncoder(columns=["color"])
>>> encoder.fit_transform(ds).to_pandas()  
   color_blue  color_green  color_red
0           0            0          1
1           0            1          0
2           0            0          1
3           0            0          1
4           1            0          0
5           0            1          0

If you one-hot encode a value that isn’t in the fitted dataset, then the value is encoded with zeros.

>>> df = pd.DataFrame({"color": ["yellow"]})
>>> batch = ray.data.from_pandas(df)  
>>> encoder.transform(batch).to_pandas()  
   color_blue  color_green  color_red
0           0            0          0

Likewise, if you one-hot encode an infrequent value, then the value is encoded with zeros.

>>> encoder = OneHotEncoder(columns=["color"], max_categories={"color": 2})
>>> encoder.fit_transform(ds).to_pandas()  
   color_red  color_green
0          1            0
1          0            1
2          1            0
3          1            0
4          0            0
5          0            1
Parameters
  • columns – The columns to separately encode.

  • max_categories – The maximum number of features to create for each column. If a value isn’t specified for a column, then a feature is created for every category in that column.

See also

MultiHotEncoder

If you want to encode individual list elements, use MultiHotEncoder.

OrdinalEncoder

If your categories are ordered, you may want to use OrdinalEncoder.

class ray.data.preprocessors.OrdinalEncoder(columns: List[str], *, encode_lists: bool = True)[source]

Bases: ray.data.preprocessor.Preprocessor

Encode values within columns as ordered integer values.

OrdinalEncoder encodes categorical features as integers that range from \(0\) to \(n - 1\), where \(n\) is the number of categories.

If you transform a value that isn’t in the fitted datset, then the value is encoded as float("nan").

Columns must contain either hashable values or lists of hashable values. Also, you can’t have both scalars and lists in the same column.

Examples

Use OrdinalEncoder to encode categorical features as integers.

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import OrdinalEncoder
>>> df = pd.DataFrame({
...     "sex": ["male", "female", "male", "female"],
...     "level": ["L4", "L5", "L3", "L4"],
... })
>>> ds = ray.data.from_pandas(df)  
>>> encoder = OrdinalEncoder(columns=["sex", "level"])
>>> encoder.fit_transform(ds).to_pandas()  
   sex  level
0    1      1
1    0      2
2    1      0
3    0      1

If you transform a value not present in the original dataset, then the value is encoded as float("nan").

>>> df = pd.DataFrame({"sex": ["female"], "level": ["L6"]})
>>> ds = ray.data.from_pandas(df)  
>>> encoder.transform(ds).to_pandas()  
   sex  level
0    0    NaN

OrdinalEncoder can also encode categories in a list.

>>> df = pd.DataFrame({
...     "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
...     "genre": [
...         ["comedy", "action", "sports"],
...         ["animation", "comedy",  "action"],
...         ["documentary"],
...     ],
... })
>>> ds = ray.data.from_pandas(df)  
>>> encoder = OrdinalEncoder(columns=["genre"])
>>> encoder.fit_transform(ds).to_pandas()  
                            name      genre
0                 Shaolin Soccer  [2, 0, 4]
1                          Moana  [1, 2, 0]
2  The Smartest Guys in the Room        [3]
Parameters
  • columns – The columns to separately encode.

  • encode_lists – If True, encode list elements. If False, encode whole lists (i.e., replace each list with an integer). True by default.

See also

OneHotEncoder

Another preprocessor that encodes categorical data.

Feature Scalers

class ray.data.preprocessors.MaxAbsScaler(columns: List[str])[source]

Bases: ray.data.preprocessor.Preprocessor

Scale each column by its absolute max value.

The general formula is given by

\[x' = \frac{x}{\max{\vert x \vert}}\]

where \(x\) is the column and \(x'\) is the transformed column. If \(\max{\vert x \vert} = 0\) (i.e., the column contains all zeros), then the column is unmodified.

Tip

This is the recommended way to scale sparse data. If you data isn’t sparse, you can use MinMaxScaler or StandardScaler instead.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import MaxAbsScaler
>>>
>>> df = pd.DataFrame({"X1": [-6, 3], "X2": [2, -4], "X3": [0, 0]})   # noqa: E501
>>> ds = ray.data.from_pandas(df)  
>>> ds.to_pandas()  
   X1  X2  X3
0  -6   2   0
1   3  -4   0

Columns are scaled separately.

>>> preprocessor = MaxAbsScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas()  
    X1   X2  X3
0 -1.0  0.5   0
1  0.5 -1.0   0

Zero-valued columns aren’t scaled.

>>> preprocessor = MaxAbsScaler(columns=["X3"])
>>> preprocessor.fit_transform(ds).to_pandas()  
   X1  X2   X3
0  -6   2  0.0
1   3  -4  0.0
Parameters

columns – The columns to separately scale.

class ray.data.preprocessors.MinMaxScaler(columns: List[str])[source]

Bases: ray.data.preprocessor.Preprocessor

Scale each column by its range.

The general formula is given by

\[x' = \frac{x - \min(x)}{\max{x} - \min{x}}\]

where \(x\) is the column and \(x'\) is the transformed column. If \(\max{x} - \min{x} = 0\) (i.e., the column is constant-valued), then the transformed column will get filled with zeros.

Transformed values are always in the range \([0, 1]\).

Tip

This can be used as an alternative to StandardScaler.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import MinMaxScaler
>>>
>>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]})   # noqa: E501
>>> ds = ray.data.from_pandas(df)  
>>> ds.to_pandas()  
   X1  X2  X3
0  -2  -3   1
1   0  -3   1
2   2   3   1

Columns are scaled separately.

>>> preprocessor = MinMaxScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas()  
    X1   X2  X3
0  0.0  0.0   1
1  0.5  0.0   1
2  1.0  1.0   1

Constant-valued columns get filled with zeros.

>>> preprocessor = MinMaxScaler(columns=["X3"])
>>> preprocessor.fit_transform(ds).to_pandas()  
   X1  X2   X3
0  -2  -3  0.0
1   0  -3  0.0
2   2   3  0.0
Parameters

columns – The columns to separately scale.

class ray.data.preprocessors.Normalizer(columns: List[str], norm='l2')[source]

Bases: ray.data.preprocessor.Preprocessor

Scales each sample to have unit norm.

This preprocessor works by dividing each sample (i.e., row) by the sample’s norm. The general formula is given by

\[s' = \frac{s}{\lVert s \rVert_p}\]

where \(s\) is the sample, \(s'\) is the transformed sample, :math:lVert s rVert`, and \(p\) is the norm type.

The following norms are supported:

  • "l1" (\(L^1\)): Sum of the absolute values.

  • "l2" (\(L^2\)): Square root of the sum of the squared values.

  • "max" (\(L^\infty\)): Maximum value.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import Normalizer
>>>
>>> df = pd.DataFrame({"X1": [1, 1], "X2": [1, 0], "X3": [0, 1]})
>>> ds = ray.data.from_pandas(df)  
>>> ds.to_pandas()  
   X1  X2  X3
0   1   1   0
1   1   0   1

The \(L^2\)-norm of the first sample is \(\sqrt{2}\), and the \(L^2\)-norm of the second sample is \(1\).

>>> preprocessor = Normalizer(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas()  
         X1        X2  X3
0  0.707107  0.707107   0
1  1.000000  0.000000   1

The \(L^1\)-norm of the first sample is \(2\), and the \(L^1\)-norm of the second sample is \(1\).

>>> preprocessor = Normalizer(columns=["X1", "X2"], norm="l1")
>>> preprocessor.fit_transform(ds).to_pandas()  
    X1   X2  X3
0  0.5  0.5   0
1  1.0  0.0   1

The \(L^\infty\)-norm of the both samples is \(1\).

>>> preprocessor = Normalizer(columns=["X1", "X2"], norm="max")
>>> preprocessor.fit_transform(ds).to_pandas()  
    X1   X2  X3
0  1.0  1.0   0
1  1.0  0.0   1
Parameters
  • columns – The columns to scale. For each row, these colmumns are scaled to unit-norm.

  • norm – The norm to use. The supported values are "l1", "l2", or "max". Defaults to "l2".

Raises

ValueError – if norm is not "l1", "l2", or "max".

class ray.data.preprocessors.PowerTransformer(columns: List[str], power: float, method: str = 'yeo-johnson')[source]

Bases: ray.data.preprocessor.Preprocessor

Apply a power transform to make your data more normally distributed.

Some models expect data to be normally distributed. By making your data more Gaussian-like, you might be able to improve your model’s performance.

This preprocessor supports the following transformations:

Box-Cox requires all data to be positive.

Warning

You need to manually specify the transform’s power parameter. If you choose a bad value, the transformation might not work well.

Parameters
  • columns – The columns to separately transform.

  • power – A parameter that determines how your data is transformed. Practioners typically set power between \(-2.5\) and \(2.5\), although you may need to try different values to find one that works well.

  • method – A string representing which transformation to apply. Supports "yeo-johnson" and "box-cox". If you choose "box-cox", your data needs to be positive. Defaults to "yeo-johnson".

class ray.data.preprocessors.RobustScaler(columns: List[str], quantile_range: Tuple[float, float] = (0.25, 0.75))[source]

Bases: ray.data.preprocessor.Preprocessor

Scale and translate each column using quantiles.

The general formula is given by

\[x' = \frac{x - \mu_{1/2}}{\mu_h - \mu_l}\]

where \(x\) is the column, \(x'\) is the transformed column, \(\mu_{1/2}\) is the column median. \(\mu_{h}\) and \(\mu_{l}\) are the high and low quantiles, respectively. By default, \(\mu_{h}\) is the third quartile and \(\mu_{l}\) is the first quartile.

Tip

This scaler works well when your data contains many outliers.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import RobustScaler
>>>
>>> df = pd.DataFrame({
...     "X1": [1, 2, 3, 4, 5],
...     "X2": [13, 5, 14, 2, 8],
...     "X3": [1, 2, 2, 2, 3],
... })
>>> ds = ray.data.from_pandas(df)  
>>> ds.to_pandas()  
   X1  X2  X3
0   1  13   1
1   2   5   2
2   3  14   2
3   4   2   2
4   5   8   3

RobustScaler separately scales each column.

>>> preprocessor = RobustScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas()  
    X1     X2  X3
0 -1.0  0.625   1
1 -0.5 -0.375   2
2  0.0  0.750   2
3  0.5 -0.750   2
4  1.0  0.000   3
Parameters
  • columns – The columns to separately scale.

  • quantile_range – A tuple that defines the lower and upper quantiles. Values must be between 0 and 1. Defaults to the 1st and 3rd quartiles: (0.25, 0.75).

class ray.data.preprocessors.StandardScaler(columns: List[str])[source]

Bases: ray.data.preprocessor.Preprocessor

Translate and scale each column by its mean and standard deviation, respectively.

The general formula is given by

\[x' = \frac{x - \bar{x}}{s}\]

where \(x\) is the column, \(x'\) is the transformed column, \(\bar{x}\) is the column average, and \(s\) is the column’s sample standard deviation. If \(s = 0\) (i.e., the column is constant-valued), then the transformed column will contain zeros.

Warning

StandardScaler works best when your data is normal. If your data isn’t approximately normal, then the transformed features won’t be meaningful.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import StandardScaler
>>>
>>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]})
>>> ds = ray.data.from_pandas(df)  
>>> ds.to_pandas()  
   X1  X2  X3
0  -2  -3   1
1   0  -3   1
2   2   3   1

Columns are scaled separately.

>>> preprocessor = StandardScaler(columns=["X1", "X2"])
>>> preprocessor.fit_transform(ds).to_pandas()  
         X1        X2  X3
0 -1.224745 -0.707107   1
1  0.000000 -0.707107   1
2  1.224745  1.414214   1

Constant-valued columns get filled with zeros.

>>> preprocessor = StandardScaler(columns=["X3"])
>>> preprocessor.fit_transform(ds).to_pandas()  
   X1  X2   X3
0  -2  -3  0.0
1   0  -3  0.0
2   2   3  0.0
Parameters

columns – The columns to separately scale.

K-Bins Discretizers

class ray.data.preprocessors.CustomKBinsDiscretizer(columns: List[str], bins: Union[Iterable[float], pandas.core.indexes.interval.IntervalIndex, Dict[str, Union[Iterable[float], pandas.core.indexes.interval.IntervalIndex]]], *, right: bool = True, include_lowest: bool = False, duplicates: str = 'raise', dtypes: Optional[Dict[str, Union[pandas.core.dtypes.dtypes.CategoricalDtype, Type[numpy.integer]]]] = None)[source]

Bases: ray.data.preprocessors.discretizer._AbstractKBinsDiscretizer

Bin values into discrete intervals using custom bin edges.

Columns must contain numerical values.

Examples

Use CustomKBinsDiscretizer to bin continuous features.

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import CustomKBinsDiscretizer
>>> df = pd.DataFrame({
...     "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1],
...     "value_2": [10, 15, 13, 12, 23, 25],
... })
>>> ds = ray.data.from_pandas(df)
>>> discretizer = CustomKBinsDiscretizer(
...     columns=["value_1", "value_2"],
...     bins=[0, 1, 4, 10, 25]
... )
>>> discretizer.transform(ds).to_pandas()
   value_1  value_2
0        0        2
1        1        3
2        1        3
3        2        3
4        2        3
5        1        3

You can also specify different bin edges per column.

>>> discretizer = CustomKBinsDiscretizer(
...     columns=["value_1", "value_2"],
...     bins={"value_1": [0, 1, 4], "value_2": [0, 18, 35, 70]},
... )
>>> discretizer.transform(ds).to_pandas()
   value_1  value_2
0      0.0        0
1      1.0        0
2      1.0        0
3      NaN        0
4      NaN        1
5      1.0        1
Parameters
  • columns – The columns to discretize.

  • bins – Defines custom bin edges. Can be an iterable of numbers, a pd.IntervalIndex, or a dict mapping columns to either of them. Note that pd.IntervalIndex for bins must be non-overlapping.

  • right – Indicates whether bins include the rightmost edge.

  • include_lowest – Indicates whether the first interval should be left-inclusive.

  • duplicates – Can be either ‘raise’ or ‘drop’. If bin edges are not unique, raise ValueError or drop non-uniques.

  • dtypes – An optional dictionary that maps columns to pd.CategoricalDtype objects or np.integer types. If you don’t include a column in dtypes or specify it as an integer dtype, the outputted column will consist of ordered integers corresponding to bins. If you use a pd.CategoricalDtype, the outputted column will be a pd.CategoricalDtype with the categories being mapped to bins. You can use pd.CategoricalDtype(categories, ordered=True) to preserve information about bin order.

See also

UniformKBinsDiscretizer

If you want to bin data into uniform width bins.

class ray.data.preprocessors.UniformKBinsDiscretizer(columns: List[str], bins: Union[int, Dict[str, int]], *, right: bool = True, include_lowest: bool = False, duplicates: str = 'raise', dtypes: Optional[Dict[str, Union[pandas.core.dtypes.dtypes.CategoricalDtype, Type[numpy.integer]]]] = None)[source]

Bases: ray.data.preprocessors.discretizer._AbstractKBinsDiscretizer

Bin values into discrete intervals (bins) of uniform width.

Columns must contain numerical values.

Examples

Use UniformKBinsDiscretizer to bin continuous features.

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import UniformKBinsDiscretizer
>>> df = pd.DataFrame({
...     "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1],
...     "value_2": [10, 15, 13, 12, 23, 25],
... })
>>> ds = ray.data.from_pandas(df)
>>> discretizer = UniformKBinsDiscretizer(
...     columns=["value_1", "value_2"], bins=4
... )
>>> discretizer.fit_transform(ds).to_pandas()
   value_1  value_2
0        0        0
1        0        1
2        0        0
3        2        0
4        3        3
5        0        3

You can also specify different number of bins per column.

>>> discretizer = UniformKBinsDiscretizer(
...     columns=["value_1", "value_2"], bins={"value_1": 4, "value_2": 3}
... )
>>> discretizer.fit_transform(ds).to_pandas()
   value_1  value_2
0        0        0
1        0        0
2        0        0
3        2        0
4        3        2
5        0        2
Parameters
  • columns – The columns to discretize.

  • bins – Defines the number of equal-width bins. Can be either an integer (which will be applied to all columns), or a dict that maps columns to integers. The range is extended by .1% on each side to include the minimum and maximum values.

  • right – Indicates whether bins includes the rightmost edge or not.

  • include_lowest – Whether the first interval should be left-inclusive or not.

  • duplicates – Can be either ‘raise’ or ‘drop’. If bin edges are not unique, raise ValueError or drop non-uniques.

  • dtypes – An optional dictionary that maps columns to pd.CategoricalDtype objects or np.integer types. If you don’t include a column in dtypes or specify it as an integer dtype, the outputted column will consist of ordered integers corresponding to bins. If you use a pd.CategoricalDtype, the outputted column will be a pd.CategoricalDtype with the categories being mapped to bins. You can use pd.CategoricalDtype(categories, ordered=True) to preserve information about bin order.

See also

CustomKBinsDiscretizer

If you want to specify your own bin edges.

Text Encoders

class ray.data.preprocessors.CountVectorizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None, max_features: Optional[int] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

Count the frequency of tokens in a column of strings.

CountVectorizer operates on columns that contain strings. For example:

                corpus
0    I dislike Python
1       I like Python

This preprocessors creates a column named like {column}_{token} for each unique token. These columns represent the frequency of token {token} in column {column}. For example:

    corpus_I  corpus_Python  corpus_dislike  corpus_like
0         1              1               1            0
1         1              1               0            1

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import CountVectorizer
>>>
>>> df = pd.DataFrame({
...     "corpus": [
...         "Jimmy likes volleyball",
...         "Bob likes volleyball too",
...         "Bob also likes fruit jerky"
...     ]
... })
>>> ds = ray.data.from_pandas(df)  
>>>
>>> vectorizer = CountVectorizer(["corpus"])
>>> vectorizer.fit_transform(ds).to_pandas()  
   corpus_likes  corpus_volleyball  corpus_Bob  corpus_Jimmy  corpus_too  corpus_also  corpus_fruit  corpus_jerky
0             1                  1           0             1           0            0             0             0
1             1                  1           1             0           1            0             0             0
2             1                  0           1             0           0            1             1             1

You can limit the number of tokens in the vocabulary with max_features.

>>> vectorizer = CountVectorizer(["corpus"], max_features=3)
>>> vectorizer.fit_transform(ds).to_pandas()  
   corpus_likes  corpus_volleyball  corpus_Bob
0             1                  1           0
1             1                  1           1
2             1                  0           1
Parameters
  • columns – The columns to separately tokenize and count.

  • tokenization_fn – The function used to generate tokens. This function should accept a string as input and return a list of tokens as output. If unspecified, the tokenizer uses a function equivalent to lambda s: s.split(" ").

  • max_features – The maximum number of tokens to encode in the transformed dataset. If specified, only the most frequent tokens are encoded.

class ray.data.preprocessors.FeatureHasher(columns: List[str], num_features: int)[source]

Bases: ray.data.preprocessor.Preprocessor

Apply the hashing trick to a table that describes token frequencies.

FeatureHasher creates num_features columns named hash_{index}, where index ranges from \(0\) to num_features\(- 1\). The column hash_{index} describes the frequency of tokens that hash to index.

Distinct tokens can correspond to the same index. However, if num_features is large enough, then columns probably correspond to a unique token.

This preprocessor is memory efficient and quick to pickle. However, given a transformed column, you can’t know which tokens correspond to it. This might make it hard to determine which tokens are important to your model.

Warning

Sparse matrices aren’t supported. If you use a large num_features, this preprocessor might behave poorly.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import FeatureHasher

The data below describes the frequencies of tokens in "I like Python" and "I dislike Python".

>>> df = pd.DataFrame({
...     "I": [1, 1],
...     "like": [1, 0],
...     "dislike": [0, 1],
...     "Python": [1, 1]
... })
>>> ds = ray.data.from_pandas(df)  

FeatureHasher hashes each token to determine its index. For example, the index of "I" is \(hash(\texttt{"I"}) \pmod 8 = 5\).

>>> hasher = FeatureHasher(columns=["I", "like", "dislike", "Python"], num_features=8)
>>> hasher.fit_transform(ds).to_pandas().to_numpy()  
array([[0, 0, 0, 2, 0, 1, 0, 0],
       [0, 0, 0, 1, 0, 1, 1, 0]])

Notice the hash collision: both "like" and "Python" correspond to index \(3\). You can avoid hash collisions like these by increasing num_features.

Parameters
  • columns – The columns to apply the hashing trick to. Each column should describe the frequency of a token.

  • num_features – The number of features used to represent the vocabulary. You should choose a value large enough to prevent hash collisions between distinct tokens.

See also

CountVectorizer

Use this preprocessor to generate inputs for FeatureHasher.

ray.data.preprocessors.HashingVectorizer

If your input data describes documents rather than token frequencies, use HashingVectorizer.

class ray.data.preprocessors.HashingVectorizer(columns: List[str], num_features: int, tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

Count the frequency of tokens using the hashing trick.

This preprocessors creates num_features columns named like hash_{column_name}_{index}. If num_features is large enough relative to the size of your vocabulary, then each column approximately corresponds to the frequency of a unique token.

HashingVectorizer is memory efficient and quick to pickle. However, given a transformed column, you can’t know which tokens correspond to it. This might make it hard to determine which tokens are important to your model.

Note

This preprocessor transforms each input column to a document-term matrix.

A document-term matrix is a table that describes the frequency of tokens in a collection of documents. For example, the strings "I like Python" and "I dislike Python" might have the document-term matrix below:

    corpus_I  corpus_Python  corpus_dislike  corpus_like
0         1              1               1            0
1         1              1               0            1

To generate the matrix, you typically map each token to a unique index. For example:

        token  index
0        I      0
1   Python      1
2  dislike      2
3     like      3

The problem with this approach is that memory use scales linearly with the size of your vocabulary. HashingVectorizer circumvents this problem by computing indices with a hash function: \(\texttt{index} = hash(\texttt{token})\).

Warning

Sparse matrices aren’t currently supported. If you use a large num_features, this preprocessor might behave poorly.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.data.preprocessors import HashingVectorizer
>>>
>>> df = pd.DataFrame({
...     "corpus": [
...         "Jimmy likes volleyball",
...         "Bob likes volleyball too",
...         "Bob also likes fruit jerky"
...     ]
... })
>>> ds = ray.data.from_pandas(df)  
>>>
>>> vectorizer = HashingVectorizer(["corpus"], num_features=8)
>>> vectorizer.fit_transform(ds).to_pandas()  
   hash_corpus_0  hash_corpus_1  hash_corpus_2  hash_corpus_3  hash_corpus_4  hash_corpus_5  hash_corpus_6  hash_corpus_7
0              1              0              1              0              0              0              0              1
1              1              0              1              0              0              0              1              1
2              0              0              1              1              0              2              1              0
Parameters
  • columns – The columns to separately tokenize and count.

  • num_features – The number of features used to represent the vocabulary. You should choose a value large enough to prevent hash collisions between distinct tokens.

  • tokenization_fn – The function used to generate tokens. This function should accept a string as input and return a list of tokens as output. If unspecified, the tokenizer uses a function equivalent to lambda s: s.split(" ").

See also

CountVectorizer

Another method for counting token frequencies. Unlike HashingVectorizer, CountVectorizer creates a feature for each unique token. This enables you to compute the inverse transformation.

FeatureHasher

This preprocessor is similar to HashingVectorizer, except it expects a table describing token frequencies. In contrast, FeatureHasher expects a column containing documents.

class ray.data.preprocessors.Tokenizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]

Bases: ray.data.preprocessor.Preprocessor

Replace each string with a list of tokens.

Examples

>>> import pandas as pd
>>> import ray
>>> df = pd.DataFrame({"text": ["Hello, world!", "foo bar\nbaz"]})
>>> ds = ray.data.from_pandas(df)  

The default tokenization_fn delimits strings using the space character.

>>> from ray.data.preprocessors import Tokenizer
>>> tokenizer = Tokenizer(columns=["text"])
>>> tokenizer.transform(ds).to_pandas()  
               text
0  [Hello,, world!]
1   [foo, bar\nbaz]

If the default logic isn’t adequate for your use case, you can specify a custom tokenization_fn.

>>> import string
>>> def tokenization_fn(s):
...     for character in string.punctuation:
...         s = s.replace(character, "")
...     return s.split()
>>> tokenizer = Tokenizer(columns=["text"], tokenization_fn=tokenization_fn)
>>> tokenizer.transform(ds).to_pandas()  
              text
0   [Hello, world]
1  [foo, bar, baz]
Parameters
  • columns – The columns to tokenize.

  • tokenization_fn – The function used to generate tokens. This function should accept a string as input and return a list of tokens as output. If unspecified, the tokenizer uses a function equivalent to lambda s: s.split(" ").

Trainer

class ray.train.trainer.BaseTrainer(*args, **kwargs)[source]

Defines interface for distributed training on Ray.

Note: The base BaseTrainer class cannot be instantiated directly. Only one of its subclasses can be used.

How does a trainer work?

  • First, initialize the Trainer. The initialization runs locally, so heavyweight setup should not be done in __init__.

  • Then, when you call trainer.fit(), the Trainer is serialized and copied to a remote Ray actor. The following methods are then called in sequence on the remote actor.

  • trainer.setup(): Any heavyweight Trainer setup should be specified here.

  • trainer.preprocess_datasets(): The provided ray.data.Dataset are preprocessed with the provided ray.data.Preprocessor.

  • trainer.train_loop(): Executes the main training logic.

  • Calling trainer.fit() will return a ray.result.Result object where you can access metrics from your training run, as well as any checkpoints that may have been saved.

How do I create a new Trainer?

Subclass ray.train.trainer.BaseTrainer, and override the training_loop method, and optionally setup.

import torch

from ray.train.trainer import BaseTrainer
from ray import tune
from ray.air import session


class MyPytorchTrainer(BaseTrainer):
    def setup(self):
        self.model = torch.nn.Linear(1, 1)
        self.optimizer = torch.optim.SGD(
            self.model.parameters(), lr=0.1)

    def training_loop(self):
        # You can access any Trainer attributes directly in this method.
        # self.datasets["train"] has already been
        # preprocessed by self.preprocessor
        dataset = self.datasets["train"]

        torch_ds = dataset.iter_torch_batches(dtypes=torch.float)
        loss_fn = torch.nn.MSELoss()

        for epoch_idx in range(10):
            loss = 0
            num_batches = 0
            for batch in torch_ds:
                X, y = torch.unsqueeze(batch["x"], 1), batch["y"]
                # Compute prediction error
                pred = self.model(X)
                batch_loss = loss_fn(pred, y)

                # Backpropagation
                self.optimizer.zero_grad()
                batch_loss.backward()
                self.optimizer.step()

                loss += batch_loss.item()
                num_batches += 1
            loss /= num_batches

            # Use Tune functions to report intermediate
            # results.
            session.report({"loss": loss, "epoch": epoch_idx})

How do I use an existing Trainer or one of my custom Trainers?

Initialize the Trainer, and call Trainer.fit()

import ray
train_dataset = ray.data.from_items(
    [{"x": i, "y": i} for i in range(3)])
my_trainer = MyPytorchTrainer(datasets={"train": train_dataset})
result = my_trainer.fit()
Parameters
  • scaling_config – Configuration for how to scale training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

__init__(*, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note

This method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

abstract training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
fit() ray.air.result.Result[source]

Runs training.

Returns

A Result object containing the training result.

Raises
  • TrainingFailedError – If any failures during the execution of

  • self.as_trainable()`

PublicAPI (beta): This API is in beta and may change before becoming stable.

as_trainable() Type[Trainable][source]

Convert self to a tune.Trainable class.

Abstract Classes

class ray.train.data_parallel_trainer.DataParallelTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

A Trainer for data parallel training.

You should subclass this Trainer if your Trainer follows SPMD (single program, multiple data) programming paradigm - you want multiple processes to run the same function, but on different data.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

How do I use ``DataParallelTrainer`` or any of its subclasses?

Example:

import ray
from ray.air import session

def train_loop_for_worker():
    dataset_shard_for_this_worker = session.get_dataset_shard("train")

    assert len(dataset_shard_for_this_worker) == 1

train_dataset = ray.data.from_items([1, 2, 3])
assert len(train_dataset) == 3
trainer = DataParallelTrainer(
    ray.air.config.ScalingConfig(num_workers=3),
    datasets={"train": train_dataset},
)
result = trainer.fit()

How do I develop on top of ``DataParallelTrainer``?

In many cases, using DataParallelTrainer directly is sufficient to execute functions on multiple actors.

However, you may want to subclass DataParallelTrainer and create a custom Trainer for the following 2 use cases:

  • Use Case 1: You want to do data parallel training, but want to have a predefined training_loop_per_worker.

  • Use Case 2: You want to implement a custom Backend that automatically handles additional setup or teardown logic on each actor, so that the users of this new trainer do not have to implement this logic. For example, a TensorflowTrainer can be built on top of DataParallelTrainer that automatically handles setting the proper environment variables for distributed Tensorflow on each actor.

For 1, you can set a predefined training loop in __init__

from ray.train.data_parallel_trainer import DataParallelTrainer

class MyDataParallelTrainer(DataParallelTrainer):
    def __init__(self, *args, **kwargs):
        predefined_train_loop_per_worker = lambda: 1
        super().__init__(predefined_train_loop_per_worker, *args, **kwargs)

For 2, you can implement the ray.train.Backend and ray.train.BackendConfig interfaces.

from dataclasses import dataclass
from ray.train.backend import Backend, BackendConfig

class MyBackend(Backend):
    def on_start(self, worker_group, backend_config):
        def set_env_var(env_var_value):
            import os
            os.environ["MY_ENV_VAR"] = env_var_value

        worker_group.execute(set_env_var, backend_config.env_var)

@dataclass
class MyBackendConfig(BackendConfig):
    env_var: str = "default_value"

    def backend_cls(self):
        return MyBackend

class MyTrainer(DataParallelTrainer):
    def __init__(self, train_loop_per_worker, my_backend_config:
        MyBackendConfig, **kwargs):

        super().__init__(
            train_loop_per_worker,
            backend_config=my_backend_config, **kwargs)
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • backend_config – Configuration for setting up a Backend (e.g. Torch, Tensorflow, Horovod) on each worker to enable distributed communication. If no Backend should be set up, then set this to None.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest. This is merged with the default dataset config for the given trainer (cls._dataset_config).

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

DeveloperAPI: This API may change across minor Ray releases.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, backend_config: Optional[ray.train.backend.BackendConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
get_dataset_config() Dict[str, ray.air.config.DatasetConfig][source]

Return a copy of this Trainer’s final dataset configs.

Returns

The merged default + user-supplied dataset config.

class ray.train.gbdt_trainer.GBDTTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

Abstract class for scaling gradient-boosting decision tree (GBDT) frameworks.

Inherited by XGBoostTrainer and LightGBMTrainer.

Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – Framework specific training parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to framework train() function.

DeveloperAPI: This API may change across minor Ray releases.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)[source]
preprocess_datasets() None[source]

Called during fit() to preprocess dataset attributes with preprocessor.

Note

This method is run on a remote process.

This method is called prior to entering the training_loop.

If the Trainer has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.

Then, all Trainer’s datasets will be transformed by the preprocessor.

The transformed datasets will be set back in the self.datasets attribute of the Trainer to be used when overriding training_loop.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})

Training Result

class ray.air.result.Result(metrics: Optional[Dict[str, Any]], checkpoint: Optional[ray.air.checkpoint.Checkpoint], error: Optional[Exception], log_dir: Optional[pathlib.Path], metrics_dataframe: Optional[pd.DataFrame], best_checkpoints: Optional[List[Tuple[ray.air.checkpoint.Checkpoint, Dict[str, Any]]]])[source]

The final result of a ML training run or a Tune trial.

This is the class produced by Trainer.fit(). It contains a checkpoint, which can be used for resuming training and for creating a Predictor object. It also contains a metrics object describing training metrics. error is included so that non successful runs and trials can be represented as well.

The constructor is a private API.

Parameters
  • metrics – The final metrics as reported by an Trainable.

  • checkpoint – The final checkpoint of the Trainable.

  • error – The execution error of the Trainable run, if the trial finishes in error.

  • log_dir – Directory where the trial logs are saved.

  • metrics_dataframe – The full result dataframe of the Trainable. The dataframe is indexed by iterations and contains reported metrics.

  • best_checkpoints – A list of tuples of the best checkpoints saved by the Trainable and their associated metrics. The number of saved checkpoints is determined by the checkpoint_config argument of run_config (by default, all checkpoints will be saved).

PublicAPI (beta): This API is in beta and may change before becoming stable.

property config: Optional[Dict[str, Any]]

The config associated with the result.

Training Session

ray.air.session.report(metrics: Dict, *, checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None) None[source]

Report metrics and optionally save a checkpoint.

Each invocation of this method will automatically increment the underlying iteration number. The physical meaning of this “iteration” is defined by user (or more specifically the way they call report). It does not necessarily map to one epoch.

This API is the canonical way to report metrics from Tune and Train, and replaces the legacy tune.report, with tune.checkpoint_dir, train.report and train.save_checkpoint calls.

Note on directory checkpoints: AIR will take ownership of checkpoints passed to report() by moving them to a new path. The original directory will no longer be accessible to the caller after the report call.

Example

Parameters
  • metrics – The metrics you want to report.

  • checkpoint – The optional checkpoint you want to report.

ray.air.session.get_checkpoint() Optional[ray.air.checkpoint.Checkpoint][source]

Access the session’s last checkpoint to resume from if applicable.

Returns

Checkpoint object if the session is currently being resumed.

Otherwise, return None.

Example

ray.air.session.get_trial_name() str[source]

Trial name for the corresponding trial.

ray.air.session.get_trial_id() str[source]

Trial id for the corresponding trial.

ray.air.session.get_trial_resources() PlacementGroupFactory[source]

Trial resources for the corresponding trial.

ray.air.session.get_world_size() int[source]

Get the current world size (i.e. total number of workers) for this run.

import time
from ray.air import session
from ray.air.config import ScalingConfig

def train_loop_per_worker(config):
    assert session.get_world_size() == 4

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=1),
    datasets={"train": train_dataset})
trainer.fit()
ray.air.session.get_world_rank() int[source]

Get the world rank of this worker.

import time
from ray.air import session
from ray.air.config import ScalingConfig

def train_loop_per_worker():
    for iter in range(100):
        time.sleep(1)
        if session.get_world_rank() == 0:
            print("Worker 0")

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=1),
    datasets={"train": train_dataset})
trainer.fit()
ray.air.session.get_local_rank() int[source]

Get the local rank of this worker (rank of the worker on its node).

import time
from ray.air import session
from ray.air.config import ScalingConfig

def train_loop_per_worker():
    if torch.cuda.is_available():
        torch.cuda.set_device(session.get_local_rank())
    ...

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=1),
    datasets={"train": train_dataset})
trainer.fit()
ray.air.session.get_dataset_shard(dataset_name: Optional[str] = None) Optional[Union[Dataset, DatasetPipeline]][source]

Returns the Ray Dataset or DatasetPipeline shard for this worker.

You should call iter_torch_batches() or iter_tf_batches() on this shard to convert it to the appropriate framework-specific data type.

import ray
from ray import train
from ray.air import session
from ray.air.config import ScalingConfig

def train_loop_per_worker():
    model = Net()
    for iter in range(100):
        # Trainer will automatically handle sharding.
        data_shard = session.get_dataset_shard("train")
        for batch in data_shard.iter_torch_batches():
            # ...
    return model

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TorchTrainer(train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"train": train_dataset})
trainer.fit()
Parameters

dataset_name – If a Dictionary of Datasets was passed to Trainer, then specifies which dataset shard to return.

Returns

The Dataset or DatasetPipeline shard to use for this worker. If no dataset is passed into Trainer, then return None.

Trainer Configs

class ray.air.config.ScalingConfig(trainer_resources: Optional[Union[Dict, Domain, Dict[str, List]]] = None, num_workers: Optional[Union[int, Domain, Dict[str, List]]] = None, use_gpu: Union[bool, Domain, Dict[str, List]] = False, resources_per_worker: Optional[Union[Dict, Domain, Dict[str, List]]] = None, placement_strategy: Union[str, Domain, Dict[str, List]] = 'PACK', _max_cpu_fraction_per_node: Optional[Union[float, Domain, Dict[str, List]]] = None)[source]

Configuration for scaling training.

Parameters
  • trainer_resources – Resources to allocate for the trainer. If None is provided, will default to 1 CPU.

  • num_workers – The number of workers (Ray actors) to launch. Each worker will reserve 1 CPU by default. The number of CPUs reserved by each worker can be overridden with the resources_per_worker argument.

  • use_gpu – If True, training will be done on GPUs (1 per worker). Defaults to False. The number of GPUs reserved by each worker can be overridden with the resources_per_worker argument.

  • resources_per_worker – If specified, the resources defined in this Dict will be reserved for each worker. The CPU and GPU keys (case-sensitive) can be defined to override the number of CPU/GPUs used by each worker.

  • placement_strategy – The placement strategy to use for the placement group of the Ray actors. See Placement Group Strategies for the possible options.

  • _max_cpu_fraction_per_node – (Experimental) The max fraction of CPUs per node that Train will use for scheduling training actors. The remaining CPUs can be used for dataset tasks. It is highly recommended that you set this to less than 1.0 (e.g., 0.8) when passing datasets to trainers, to avoid hangs / CPU starvation of dataset tasks. Warning: this feature is experimental and is not recommended for use with autoscaling (scale-up will not trigger properly).

PublicAPI (beta): This API is in beta and may change before becoming stable.

property total_resources

Map of total resources required for the trainer.

property num_cpus_per_worker

The number of CPUs to set per worker.

property num_gpus_per_worker

The number of GPUs to set per worker.

property additional_resources_per_worker

Resources per worker, not including CPU or GPU resources.

as_placement_group_factory() PlacementGroupFactory[source]

Returns a PlacementGroupFactory to specify resources for Tune.

classmethod from_placement_group_factory(pgf: PlacementGroupFactory) ScalingConfig[source]

Create a ScalingConfig from a Tune’s PlacementGroupFactory

class ray.air.config.DatasetConfig(fit: Optional[bool] = None, split: Optional[bool] = None, required: Optional[bool] = None, transform: Optional[bool] = None, use_stream_api: Optional[bool] = None, stream_window_size: Optional[float] = None, global_shuffle: Optional[bool] = None, randomize_block_order: Optional[bool] = None)[source]

Configuration for ingest of a single Dataset.

See the AIR Dataset configuration guide for usage examples.

This config defines how the Dataset should be read into the DataParallelTrainer. It configures the preprocessing, splitting, and ingest strategy per-dataset.

DataParallelTrainers declare default DatasetConfigs for each dataset passed in the datasets argument. Users have the opportunity to selectively override these configs by passing the dataset_config argument. Trainers can also define user customizable values (e.g., XGBoostTrainer doesn’t support streaming ingest).

Parameters
  • fit – Whether to fit preprocessors on this dataset. This can be set on at most one dataset at a time. True by default for the “train” dataset only.

  • split – Whether the dataset should be split across multiple workers. True by default for the “train” dataset only.

  • required – Whether to raise an error if the Dataset isn’t provided by the user. False by default.

  • transform – Whether to transform the dataset with the fitted preprocessor. This must be enabled at least for the dataset that is fit. True by default.

  • use_stream_api – Whether the dataset should be streamed into memory using pipelined reads. When enabled, get_dataset_shard() returns DatasetPipeline instead of Dataset. The amount of memory to use is controlled by stream_window_size. False by default.

  • stream_window_size – Configure the streaming window size in bytes. A good value is something like 20% of object store memory. If set to -1, then an infinite window size will be used (similar to bulk ingest). This only has an effect if use_stream_api is set. Set to 1.0 GiB by default.

  • global_shuffle – Whether to enable global shuffle (per pipeline window in streaming mode). Note that this is an expensive all-to-all operation, and most likely you want to use local shuffle instead. See https://docs.ray.io/en/master/data/faq.html and https://docs.ray.io/en/master/ray-air/check-ingest.html. False by default.

  • randomize_block_order – Whether to randomize the iteration order over blocks. The main purpose of this is to prevent data fetching hotspots in the cluster when running many parallel workers / trials on the same data. We recommend enabling it always. True by default.

PublicAPI (beta): This API is in beta and may change before becoming stable.

fill_defaults() ray.air.config.DatasetConfig[source]

Return a copy of this config with all default values filled in.

static merge(a: Dict[str, ray.air.config.DatasetConfig], b: Optional[Dict[str, ray.air.config.DatasetConfig]]) Dict[str, ray.air.config.DatasetConfig][source]

Merge two given DatasetConfigs, the second taking precedence.

Raises

ValueError – if validation fails on the merged configs.

static validated(config: Dict[str, DatasetConfig], datasets: Dict[str, Dataset]) Dict[str, DatasetConfig][source]

Validate the given config and datasets are usable.

Returns dict of validated configs with defaults filled out.

class ray.air.config.FailureConfig(max_failures: int = 0, fail_fast: Union[bool, str] = False)[source]

Configuration related to failure handling of each training/tuning run.

Parameters
  • max_failures – Tries to recover a run at least this many times. Will recover from the latest checkpoint if present. Setting to -1 will lead to infinite recovery retries. Setting to 0 will disable retries. Defaults to 0.

  • fail_fast – Whether to fail upon the first error. Only used for Ray Tune - this does not apply to single training runs (e.g. with Trainer.fit()). If fail_fast=’raise’ provided, Ray Tune will automatically raise the exception received by the Trainable. fail_fast=’raise’ can easily leak resources and should be used with caution (it is best used with ray.init(local_mode=True)).

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.air.config.CheckpointConfig(num_to_keep: Optional[int] = None, checkpoint_score_attribute: Optional[str] = None, checkpoint_score_order: str = 'max', checkpoint_frequency: int = 0, checkpoint_at_end: Optional[bool] = None)[source]

Configurable parameters for defining the checkpointing strategy.

Default behavior is to persist all checkpoints to disk. If num_to_keep is set, the default retention policy is to keep the checkpoints with maximum timestamp, i.e. the most recent checkpoints.

Parameters
  • num_to_keep – The number of checkpoints to keep on disk for this run. If a checkpoint is persisted to disk after there are already this many checkpoints, then an existing checkpoint will be deleted. If this is None then checkpoints will not be deleted. If this is 0 then no checkpoints will be persisted to disk.

  • checkpoint_score_attribute – The attribute that will be used to score checkpoints to determine which checkpoints should be kept on disk when there are greater than num_to_keep checkpoints. This attribute must be a key from the checkpoint dictionary which has a numerical value. Per default, the last checkpoints will be kept.

  • checkpoint_score_order – Either “max” or “min”. If “max”, then checkpoints with highest values of checkpoint_score_attribute will be kept. If “min”, then checkpoints with lowest values of checkpoint_score_attribute will be kept.

  • checkpoint_frequency – Number of iterations between checkpoints. If 0 this will disable checkpointing. Please note that most trainers will still save one checkpoint at the end of training. This attribute is only supported by trainers that don’t take in custom training loops.

  • checkpoint_at_end – If True, will save a checkpoint at the end of training. This attribute is only supported by trainers that don’t take in custom training loops. Defaults to True for trainers that support it and False for generic function trainables.

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.air.config.RunConfig(name: Optional[str] = None, local_dir: Optional[str] = None, callbacks: Optional[List[Callback]] = None, stop: Optional[Union[Mapping, Stopper, Callable[[str, Mapping], bool]]] = None, failure_config: Optional[ray.air.config.FailureConfig] = None, sync_config: Optional[SyncConfig] = None, checkpoint_config: Optional[ray.air.config.CheckpointConfig] = None, progress_reporter: Optional[ProgressReporter] = None, verbose: Union[int, Verbosity] = 3, log_to_file: Union[bool, str, Tuple[str, str]] = False)[source]

Runtime configuration for training and tuning runs.

Upon resuming from a training or tuning run checkpoint, Ray Train/Tune will automatically apply the RunConfig from the previously checkpointed run.

Parameters
  • name – Name of the trial or experiment. If not provided, will be deduced from the Trainable.

  • local_dir – Local dir to save training results to. Defaults to ~/ray_results.

  • stop – Stop conditions to consider. Refer to ray.tune.stopper.Stopper for more info. Stoppers should be serializable.

  • callbacks – Callbacks to invoke. Refer to ray.tune.callback.Callback for more info. Callbacks should be serializable. Currently only stateless callbacks are supported for resumed runs. (any state of the callback will not be checkpointed by Tune and thus will not take effect in resumed runs).

  • failure_config – Failure mode configuration.

  • sync_config – Configuration object for syncing. See tune.SyncConfig.

  • checkpoint_config – Checkpointing configuration.

  • progress_reporter – Progress reporter for reporting intermediate experiment progress. Defaults to CLIReporter if running in command-line, or JupyterNotebookReporter if running in a Jupyter notebook.

  • verbose – 0, 1, 2, or 3. Verbosity mode. 0 = silent, 1 = only status updates, 2 = status and brief results, 3 = status and detailed results. Defaults to 2.

  • log_to_file – Log stdout and stderr to files in trial directories. If this is False (default), no files are written. If true, outputs are written to trialdir/stdout and trialdir/stderr, respectively. If this is a single string, this is interpreted as a file relative to the trialdir, to which both streams are written. If this is a Sequence (e.g. a Tuple), it has to have length 2 and the elements indicate the files to which stdout and stderr are written, respectively.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Checkpoint

class ray.air.checkpoint.Checkpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Ray AIR Checkpoint.

An AIR Checkpoint are a common interface for accessing models across different AIR components and libraries. A Checkpoint can have its data represented in one of three ways:

  • as a directory on local (on-disk) storage

  • as a directory on an external storage (e.g., cloud storage)

  • as an in-memory dictionary

The Checkpoint object also has methods to translate between different checkpoint storage locations. These storage representations provide flexibility in distributed environments, where you may want to recreate an instance of the same model on multiple nodes or across different Ray clusters.

Example:

from ray.air.checkpoint import Checkpoint

# Create checkpoint data dict
checkpoint_data = {"data": 123}

# Create checkpoint object from data
checkpoint = Checkpoint.from_dict(checkpoint_data)

# Save checkpoint to a directory on the file system.
path = checkpoint.to_directory()

# This path can then be passed around,
# # e.g. to a different function or a different script.
# You can also use `checkpoint.to_uri/from_uri` to
# read from/write to cloud storage

# In another function or script, recover Checkpoint object from path
checkpoint = Checkpoint.from_directory(path)

# Convert into dictionary again
recovered_data = checkpoint.to_dict()

# It is guaranteed that the original data has been recovered
assert recovered_data == checkpoint_data

Checkpoints can be used to instantiate a Predictor, BatchPredictor, or PredictorDeployment class.

The constructor is a private API, instead the from_ methods should be used to create checkpoint objects (e.g. Checkpoint.from_directory()).

Other implementation notes:

When converting between different checkpoint formats, it is guaranteed that a full round trip of conversions (e.g. directory –> dict –> obj ref –> directory) will recover the original checkpoint data. There are no guarantees made about compatibility of intermediate representations.

New data can be added to a Checkpoint during conversion. Consider the following conversion: directory –> dict (adding dict[“foo”] = “bar”) –> directory –> dict (expect to see dict[“foo”] = “bar”). Note that the second directory will contain pickle files with the serialized additional field data in them.

Similarly with a dict as a source: dict –> directory (add file “foo.txt”) –> dict –> directory (will have “foo.txt” in it again). Note that the second dict representation will contain an extra field with the serialized additional files in it.

Checkpoints can be pickled and sent to remote processes. Please note that checkpoints pointing to local directories will be pickled as data representations, so the full checkpoint data will be contained in the checkpoint object. If you want to avoid this, consider passing only the checkpoint directory to the remote task and re-construct your checkpoint object in that function. Note that this will only work if the “remote” task is scheduled on the same node or a node that also has access to the local data path (e.g. on a shared file system like NFS).

Checkpoints pointing to object store references will keep the object reference in tact - this means that these checkpoints cannot be properly deserialized on other Ray clusters or outside a Ray cluster. If you need persistence across clusters, use the to_uri() or to_directory() methods to persist your checkpoints to disk.

PublicAPI (beta): This API is in beta and may change before becoming stable.

property uri: Optional[str]

Return checkpoint URI, if available.

This will return a URI to cloud storage if this checkpoint is persisted on cloud, or a local file:// URI if this checkpoint is persisted on local disk and available on the current node.

In all other cases, this will return None. Users can then choose to persist to cloud with Checkpoint.to_uri().

Example

>>> from ray.air import Checkpoint
>>> checkpoint = Checkpoint.from_uri("s3://some-bucket/some-location")
>>> assert checkpoint.uri == "s3://some-bucket/some-location"
>>> checkpoint = Checkpoint.from_dict({"data": 1})
>>> assert checkpoint.uri == None
Returns

Checkpoint URI if this URI is reachable from the current node (e.g. cloud storage or locally available file URI).

classmethod from_bytes(data: bytes) ray.air.checkpoint.Checkpoint[source]

Create a checkpoint from the given byte string.

Parameters

data – Data object containing pickled checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_bytes() bytes[source]

Return Checkpoint serialized as bytes object.

Returns

Bytes object containing checkpoint data.

Return type

bytes

classmethod from_dict(data: dict) ray.air.checkpoint.Checkpoint[source]

Create checkpoint object from dictionary.

Parameters

data – Dictionary containing checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

to_dict() dict[source]

Return checkpoint data as dictionary.

Returns

Dictionary containing checkpoint data.

Return type

dict

classmethod from_object_ref(obj_ref: ray.ObjectRef) ray.air.checkpoint.Checkpoint[source]

Create checkpoint object from object reference.

Parameters

obj_ref – ObjectRef pointing to checkpoint data.

Returns

checkpoint object.

Return type

Checkpoint

Warning

DEPRECATED: This API is deprecated and may be removed in a future Ray release. To restore a checkpoint from a remote object ref, call ray.get(obj_ref) instead.

to_object_ref() ray.ObjectRef[source]

Return checkpoint data as object reference.

Returns

ObjectRef pointing to checkpoint data.

Return type

ray.ObjectRef

Warning

DEPRECATED: This API is deprecated and may be removed in a future Ray release. To store the checkpoint in the Ray object store, call ray.put(ckpt) instead of ckpt.to_object_ref().

classmethod from_directory(path: str) ray.air.checkpoint.Checkpoint[source]

Create checkpoint object from directory.

Parameters

path – Directory containing checkpoint data. The caller promises to not delete the directory (gifts ownership of the directory to this Checkpoint).

Returns

checkpoint object.

Return type

Checkpoint

classmethod from_checkpoint(other: ray.air.checkpoint.Checkpoint) ray.air.checkpoint.Checkpoint[source]

Create a checkpoint from a generic Checkpoint.

This method can be used to create a framework-specific checkpoint from a generic Checkpoint object.

Examples

>>> result = TorchTrainer.fit(...)  
>>> checkpoint = TorchCheckpoint.from_checkpoint(result.checkpoint)  
>>> model = checkpoint.get_model()  
Linear(in_features=1, out_features=1, bias=True)
to_directory(path: Optional[str] = None) str[source]

Write checkpoint data to directory.

Parameters

path – Target directory to restore data in. If not specified, will create a temporary directory.

Returns

Directory containing checkpoint data.

Return type

str

as_directory() Iterator[str][source]

Return checkpoint directory path in a context.

This function makes checkpoint data available as a directory while avoiding unnecessary copies and left-over temporary data.

If the checkpoint is already a directory checkpoint, it will return the existing path. If it is not, it will create a temporary directory, which will be deleted after the context is exited.

Users should treat the returned checkpoint directory as read-only and avoid changing any data within it, as it might get deleted when exiting the context.

Example:

with checkpoint.as_directory() as checkpoint_dir:
    # Do some read-only processing of files within checkpoint_dir
    pass

# At this point, if a temporary directory was created, it will have
# been deleted.
classmethod from_uri(uri: str) ray.air.checkpoint.Checkpoint[source]

Create checkpoint object from location URI (e.g. cloud storage).

Valid locations currently include AWS S3 (s3://), Google cloud storage (gs://), HDFS (hdfs://), and local files (file://).

Parameters

uri – Source location URI to read data from.

Returns

checkpoint object.

Return type

Checkpoint

to_uri(uri: str) str[source]

Write checkpoint data to location URI (e.g. cloud storage).

Parameters

uri – Target location URI to write data to.

Returns

Cloud location containing checkpoint data.

Return type

str

get_internal_representation() Tuple[str, Union[dict, str, ray.ObjectRef]][source]

Return tuple of (type, data) for the internal representation.

The internal representation can be used e.g. to compare checkpoint objects for equality or to access the underlying data storage.

The returned type is a string and one of ["local_path", "data_dict", "uri", "object_ref"].

The data is the respective data value.

Note that paths converted from file://... will be returned as local_path (without the file:// prefix) and not as uri.

Returns

Tuple of type and data.

DeveloperAPI: This API may change across minor Ray releases.

get_preprocessor() Optional[Preprocessor][source]

Return the saved preprocessor, if one exists.

Predictor

class ray.train.predictor.Predictor(preprocessor: Optional[ray.data.preprocessor.Preprocessor] = None)[source]

Predictors load models from checkpoints to perform inference.

Note

The base Predictor class cannot be instantiated directly. Only one of its subclasses can be used.

How does a Predictor work?

Predictors expose a predict method that accepts an input batch of type DataBatchType and outputs predictions of the same type as the input batch.

When the predict method is called the following occurs:

  • The input batch is converted into a pandas DataFrame. Tensor input (like a np.ndarray) will be converted into a single column Pandas Dataframe.

  • If there is a Preprocessor saved in the provided Checkpoint, the preprocessor will be used to transform the DataFrame.

  • The transformed DataFrame will be passed to the model for inference (via the predictor._predict_pandas method).

  • The predictions will be outputted by predict in the same type as the original input.

How do I create a new Predictor?

To implement a new Predictor for your particular framework, you should subclass the base Predictor and implement the following two methods:

  1. _predict_pandas: Given a pandas.DataFrame input, return a pandas.DataFrame containing predictions.

  2. from_checkpoint: Logic for creating a Predictor from an AIR Checkpoint.

  3. Optionally _predict_arrow for better performance when working with tensor data to avoid extra copies from Pandas conversions.

PublicAPI (beta): This API is in beta and may change before becoming stable.

abstract classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, **kwargs) ray.train.predictor.Predictor[source]

Create a specific predictor from a checkpoint.

Parameters
  • checkpoint – Checkpoint to load predictor data from.

  • kwargs – Arguments specific to predictor implementations.

Returns

Predictor object.

Return type

Predictor

classmethod from_pandas_udf(pandas_udf: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame]) ray.train.predictor.Predictor[source]

Create a Predictor from a Pandas UDF.

Parameters

pandas_udf – A function that takes a pandas.DataFrame and other optional kwargs and returns a pandas.DataFrame.

get_preprocessor() Optional[ray.data.preprocessor.Preprocessor][source]

Get the preprocessor to use prior to executing predictions.

set_preprocessor(preprocessor: Optional[ray.data.preprocessor.Preprocessor]) None[source]

Set the preprocessor to use prior to executing predictions.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], **kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Perform inference on a batch of data.

Parameters
  • data – A batch of input data of type DataBatchType.

  • kwargs – Arguments specific to predictor implementations. These are passed

  • _predict_pandas. (directly to) –

Returns

Prediction result. The return type will be the same as the input type.

Return type

DataBatchType

Data Types

ray.train.predictor.DataBatchType

alias of Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]]

Batch Predictor

class ray.train.batch_predictor.BatchPredictor(checkpoint: ray.air.checkpoint.Checkpoint, predictor_cls: Type[ray.train.predictor.Predictor], **predictor_kwargs)[source]

Batch predictor class.

Takes a predictor class and a checkpoint and provides an interface to run batch scoring on Ray datasets.

This batch predictor wraps around a predictor class and executes it in a distributed way when calling predict().

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_pandas_udf(pandas_udf: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame]) ray.train.batch_predictor.BatchPredictor[source]

Create a Predictor from a Pandas UDF.

Parameters

pandas_udf – A function that takes a pandas.DataFrame and other optional kwargs and returns a pandas.DataFrame.

get_preprocessor() ray.data.preprocessor.Preprocessor[source]

Get the preprocessor to use prior to executing predictions.

set_preprocessor(preprocessor: ray.data.preprocessor.Preprocessor) None[source]

Set the preprocessor to use prior to executing predictions.

predict(data: Union[ray.data.dataset.Dataset, ray.data.dataset_pipeline.DatasetPipeline], *, feature_columns: Optional[List[str]] = None, keep_columns: Optional[List[str]] = None, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: Optional[int] = None, num_gpus_per_worker: Optional[int] = None, separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) ray.data.dataset.Dataset[source]

Run batch scoring on a Dataset.

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.air import Checkpoint
>>> from ray.train.predictor import Predictor
>>> from ray.train.batch_predictor import BatchPredictor
>>> # Create a batch predictor that returns identity as the predictions.
>>> batch_pred = BatchPredictor.from_pandas_udf(
...     lambda data: pd.DataFrame({"predictions": data["feature_1"]}))
>>> # Create a dummy dataset.
>>> ds = ray.data.from_pandas(pd.DataFrame({
...     "feature_1": [1, 2, 3], "label": [1, 2, 3]}))
>>> # Execute batch prediction using this predictor.
>>> predictions = batch_pred.predict(ds,
...     feature_columns=["feature_1"], keep_columns=["label"])
>>> print(predictions)
Dataset(num_blocks=1, num_rows=3, schema={predictions: int64, label: int64})
>>> # Calculate final accuracy.
>>> def calculate_accuracy(df):
...    return pd.DataFrame({"correct": df["predictions"] == df["label"]})
>>> correct = predictions.map_batches(calculate_accuracy)
>>> print("Final accuracy: ",
...    correct.sum(on="correct") / correct.count())
Final accuracy:  1.0
Parameters
  • data – Ray dataset or pipeline to run batch prediction on.

  • feature_columns – List of columns in data to use for prediction. Columns not specified will be dropped from data before being passed to the predictor. If None, use all columns.

  • keep_columns – List of columns in data to include in the prediction result. This is useful for calculating final accuracies/metrics on the result dataset. If None, the columns in the output dataset will contain just the prediction results.

  • batch_size – Split dataset into batches of this size for prediction.

  • min_scoring_workers – Minimum number of scoring actors.

  • max_scoring_workers – If set, specify the maximum number of scoring actors.

  • num_cpus_per_worker – Number of CPUs to allocate per scoring worker.

  • num_gpus_per_worker – Number of GPUs to allocate per scoring worker.

  • separate_gpu_stage – If using GPUs, specifies whether to execute GPU processing in a separate stage (enabled by default). This avoids running expensive preprocessing steps on GPU workers.

  • ray_remote_args – Additional resource requirements to request from ray.

  • predict_kwargs – Keyword arguments passed to the predictor’s predict() method.

Returns

Dataset containing scoring results.

predict_pipelined(data: ray.data.dataset.Dataset, *, blocks_per_window: Optional[int] = None, bytes_per_window: Optional[int] = None, feature_columns: Optional[List[str]] = None, keep_columns: Optional[List[str]] = None, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: Optional[int] = None, num_gpus_per_worker: Optional[int] = None, separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) ray.data.dataset_pipeline.DatasetPipeline[source]

Setup a prediction pipeline for batch scoring.

Unlike predict(), this generates a DatasetPipeline object and does not perform execution. Execution can be triggered by pulling from the pipeline.

This is a convenience wrapper around calling window() on the Dataset prior to passing it BatchPredictor.predict().

Examples

>>> import pandas as pd
>>> import ray
>>> from ray.air import Checkpoint
>>> from ray.train.predictor import Predictor
>>> from ray.train.batch_predictor import BatchPredictor
>>> # Create a batch predictor that always returns `42` for each input.
>>> batch_pred = BatchPredictor.from_pandas_udf(
...     lambda data: pd.DataFrame({"a": [42] * len(data)}))
>>> # Create a dummy dataset.
>>> ds = ray.data.range_tensor(1000, parallelism=4)
>>> # Setup a prediction pipeline.
>>> print(batch_pred.predict_pipelined(
...     ds, blocks_per_window=1))
DatasetPipeline(num_windows=4, num_stages=3)
Parameters
  • data – Ray dataset to run batch prediction on.

  • blocks_per_window – The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.

  • bytes_per_window – Specify the window size in bytes instead of blocks. This will be treated as an upper bound for the window size, but each window will still include at least one block. This is mutually exclusive with blocks_per_window.

  • feature_columns – List of columns in data to use for prediction. Columns not specified will be dropped from data before being passed to the predictor. If None, use all columns.

  • keep_columns – List of columns in data to include in the prediction result. This is useful for calculating final accuracies/metrics on the result dataset. If None, the columns in the output dataset will contain just the prediction results.

  • batch_size – Split dataset into batches of this size for prediction.

  • min_scoring_workers – Minimum number of scoring actors.

  • max_scoring_workers – If set, specify the maximum number of scoring actors.

  • num_cpus_per_worker – Number of CPUs to allocate per scoring worker.

  • num_gpus_per_worker – Number of GPUs to allocate per scoring worker.

  • separate_gpu_stage – If using GPUs, specifies whether to execute GPU processing in a separate stage (enabled by default). This avoids running expensive preprocessing steps on GPU workers.

  • ray_remote_args – Additional resource requirements to request from ray.

  • predict_kwargs – Keyword arguments passed to the predictor’s predict() method.

Returns

DatasetPipeline that generates scoring results.

Tuner

class ray.tune.tuner.Tuner(trainable: Optional[Union[str, Callable, Type[ray.tune.trainable.trainable.Trainable], BaseTrainer]] = None, *, param_space: Optional[Dict[str, Any]] = None, tune_config: Optional[ray.tune.tune_config.TuneConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, _tuner_kwargs: Optional[Dict] = None, _tuner_internal: Optional[ray.tune.impl.tuner_internal.TunerInternal] = None)[source]

Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune.

Parameters
  • trainable – The trainable to be tuned.

  • param_space – Search space of the tuning job. One thing to note is that both preprocessor and dataset can be tuned here.

  • tune_config – Tuning algorithm specific configs. Refer to ray.tune.tune_config.TuneConfig for more info.

  • run_config – Runtime configuration that is specific to individual trials. If passed, this will overwrite the run config passed to the Trainer, if applicable. Refer to ray.air.config.RunConfig for more info.

Usage pattern:

from sklearn.datasets import load_breast_cancer

from ray import tune
from ray.data import from_pandas
from ray.air.config import RunConfig, ScalingConfig
from ray.train.xgboost import XGBoostTrainer
from ray.tune.tuner import Tuner

def get_dataset():
    data_raw = load_breast_cancer(as_frame=True)
    dataset_df = data_raw["data"]
    dataset_df["target"] = data_raw["target"]
    dataset = from_pandas(dataset_df)
    return dataset

trainer = XGBoostTrainer(
    label_column="target",
    params={},
    datasets={"train": get_dataset()},
)

param_space = {
    "scaling_config": ScalingConfig(
        num_workers=tune.grid_search([2, 4]),
        resources_per_worker={
            "CPU": tune.grid_search([1, 2]),
        },
    ),
    # You can even grid search various datasets in Tune.
    # "datasets": {
    #     "train": tune.grid_search(
    #         [ds1, ds2]
    #     ),
    # },
    "params": {
        "objective": "binary:logistic",
        "tree_method": "approx",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9),
    },
}
tuner = Tuner(trainable=trainer, param_space=param_space,
    run_config=RunConfig(name="my_tune_run"))
analysis = tuner.fit()

To retry a failed tune run, you can then do

tuner = Tuner.restore(experiment_checkpoint_dir)
tuner.fit()

experiment_checkpoint_dir can be easily located near the end of the console output of your first failed run.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod restore(path: str, resume_unfinished: bool = True, resume_errored: bool = False, restart_errored: bool = False) ray.tune.tuner.Tuner[source]

Restores Tuner after a previously failed run.

All trials from the existing run will be added to the result table. The argument flags control how existing but unfinished or errored trials are resumed.

Finished trials are always added to the overview table. They will not be resumed.

Unfinished trials can be controlled with the resume_unfinished flag. If True (default), they will be continued. If False, they will be added as terminated trials (even if they were only created and never trained).

Errored trials can be controlled with the resume_errored and restart_errored flags. The former will resume errored trials from their latest checkpoints. The latter will restart errored trials from scratch and prevent loading their last checkpoints.

Parameters
  • path – The path where the previous failed run is checkpointed. This information could be easily located near the end of the console output of previous run. Note: depending on whether ray client mode is used or not, this path may or may not exist on your local machine.

  • resume_unfinished – If True, will continue to run unfinished trials.

  • resume_errored – If True, will re-schedule errored trials and try to restore from their latest checkpoints.

  • restart_errored – If True, will re-schedule errored trials but force restarting them from scratch (no checkpoint will be loaded).

fit() ray.tune.result_grid.ResultGrid[source]

Executes hyperparameter tuning job as configured and returns result.

Failure handling: For the kind of exception that happens during the execution of a trial, one may inspect it together with stacktrace through the returned result grid. See ResultGrid for reference. Each trial may fail up to a certain number. This is configured by RunConfig.FailureConfig.max_failures.

Exception that happens beyond trials will be thrown by this method as well. In such cases, there will be instruction like the following printed out at the end of console output to inform users on how to resume.

Please use tuner = Tuner.restore(“~/ray_results/tuner_resume”) to resume.

Raises
  • RayTaskError – If user-provided trainable raises an exception

  • TuneError – General Ray Tune error.

TuneConfig

class ray.tune.tune_config.TuneConfig(mode: Optional[str] = None, metric: Optional[str] = None, search_alg: Optional[Union[ray.tune.search.searcher.Searcher, ray.tune.search.search_algorithm.SearchAlgorithm]] = None, scheduler: Optional[ray.tune.schedulers.trial_scheduler.TrialScheduler] = None, num_samples: int = 1, max_concurrent_trials: Optional[int] = None, time_budget_s: Optional[Union[int, float, datetime.timedelta]] = None, reuse_actors: Optional[bool] = None)[source]

Tune specific configs.

Parameters
  • metric – Metric to optimize. This metric should be reported with tune.report(). If set, will be passed to the search algorithm and scheduler.

  • mode – Must be one of [min, max]. Determines whether objective is minimizing or maximizing the metric attribute. If set, will be passed to the search algorithm and scheduler.

  • search_alg – Search algorithm for optimization. Default to random search.

  • scheduler – Scheduler for executing the experiment. Choose among FIFO (default), MedianStopping, AsyncHyperBand, HyperBand and PopulationBasedTraining. Refer to ray.tune.schedulers for more options.

  • num_samples – Number of times to sample from the hyperparameter space. Defaults to 1. If grid_search is provided as an argument, the grid will be repeated num_samples of times. If this is -1, (virtually) infinite samples are generated until a stopping condition is met.

  • max_concurrent_trials – Maximum number of trials to run concurrently. Must be non-negative. If None or 0, no limit will be applied. This is achieved by wrapping the search_alg in a ConcurrencyLimiter, and thus setting this argument will raise an exception if the search_alg is already a ConcurrencyLimiter. Defaults to None.

  • time_budget_s – Global time budget in seconds after which all trials are stopped. Can also be a datetime.timedelta object.

  • reuse_actors – Whether to reuse actors between different trials when possible. This can drastically speed up experiments that start and stop actors often (e.g., PBT in time-multiplexing mode). This requires trials to have the same resource requirements. Defaults to True for function trainables (including most Ray AIR trainers) and False for class and registered trainables (e.g. RLlib).

PublicAPI (beta): This API is in beta and may change before becoming stable.

Tuner Results

class ray.tune.result_grid.ResultGrid(experiment_analysis: ray.tune.analysis.experiment_analysis.ExperimentAnalysis)[source]

A set of Result objects for interacting with Ray Tune results.

You can use it to inspect the trials and obtain the best result.

The constructor is a private API. This object can only be created as a result of Tuner.fit().

Example:

import random
from ray import air, tune

def random_error_trainable(config):
    if random.random() < 0.5:
        return {"loss": 0.0}
    else:
        raise ValueError("This is an error")

tuner = tune.Tuner(
    random_error_trainable,
    run_config=air.RunConfig(name="example-experiment"),
    tune_config=tune.TuneConfig(num_samples=10),
)
result_grid = tuner.fit()

for i in range(len(result_grid)):
    result = result_grid[i]
    if not result.error:
        print(f"Trial finishes successfully with metric {result.metric}.")
    else:
        print(f"Trial failed with error {result.error}.")

You can also use result_grid for more advanced analysis.

# Get the best result based on a particular metric.
best_result = result_grid.get_best_result(metric="loss", mode="min")

# Get the best checkpoint corresponding to the best result.
best_checkpoint = best_result.checkpoint

# Get a dataframe for the last reported results of all of the trials
df = result_grid.get_dataframe()

# Get a dataframe for the minimum loss seen for each trial
df = result_grid.get_dataframe(metric="loss", mode="min")

Note that trials of all statuses are included in the final result grid. If a trial is not in terminated state, its latest result and checkpoint as seen by Tune will be provided.

ResultGrid will be the successor of the ExperimentAnalysis object but is not yet at feature parity. For interacting with an existing experiment, located at local_dir, do the following:

from ray.tune import ExperimentAnalysis
analysis = ExperimentAnalysis("~/ray_results/example-experiment")

PublicAPI (beta): This API is in beta and may change before becoming stable.

get_best_result(metric: Optional[str] = None, mode: Optional[str] = None, scope: str = 'last', filter_nan_and_inf: bool = True) ray.air.result.Result[source]

Get the best result from all the trials run.

Parameters
  • metric – Key for trial info to order on. Defaults to the metric specified in your Tuner’s TuneConfig.

  • mode – One of [min, max]. Defaults to the mode specified in your Tuner’s TuneConfig.

  • scope – One of [all, last, avg, last-5-avg, last-10-avg]. If scope=last, only look at each trial’s final step for metric, and compare across trials based on mode=[min,max]. If scope=avg, consider the simple average over all steps for metric and compare across trials based on mode=[min,max]. If scope=last-5-avg or scope=last-10-avg, consider the simple average over the last 5 or 10 steps for metric and compare across trials based on mode=[min,max]. If scope=all, find each trial’s min/max score for metric based on mode, and compare trials based on mode=[min,max].

  • filter_nan_and_inf – If True (default), NaN or infinite values are disregarded and these trials are never selected as the best trial.

get_dataframe(filter_metric: Optional[str] = None, filter_mode: Optional[str] = None) pandas.core.frame.DataFrame[source]

Return dataframe of all trials with their configs and reported results.

Per default, this returns the last reported results for each trial.

If filter_metric and filter_mode are set, the results from each trial are filtered for this metric and mode. For example, if filter_metric="some_metric" and filter_mode="max", for each trial, every received result is checked, and the one where some_metric is maximal is returned.

Example

result_grid = Tuner.fit(...)

# Get last reported results per trial
df = result_grid.get_dataframe()

# Get best ever reported accuracy per trial
df = result_grid.get_dataframe(metric="accuracy", mode="max")
Parameters
  • filter_metric – Metric to filter best result for.

  • filter_mode – If filter_metric is given, one of ["min", "max"] to specify if we should find the minimum or maximum result.

Returns

Pandas DataFrame with each trial as a row and their results as columns.

property errors

Returns the exceptions of errored trials.

property num_errors

Returns the number of errored trials.

property num_terminated

Returns the number of terminated (but not errored) trials.

Serving

ray.serve.air_integrations.PredictorDeployment

alias of Deployment(name=PredictorDeployment,version=None,route_prefix=/PredictorDeployment)

class ray.serve.air_integrations.PredictorWrapper(predictor_cls: Union[str, Type[Predictor]], checkpoint: Union[Checkpoint, str], http_adapter: Union[str, Callable[[Any], Any]] = 'ray.serve.http_adapters.json_to_ndarray', batching_params: Optional[Union[Dict[str, int], bool]] = None, predict_kwargs: Optional[Dict[str, Any]] = None, **predictor_from_checkpoint_kwargs)[source]

Serve any Ray AIR predictor from an AIR checkpoint.

Parameters
  • predictor_cls – The class or path for predictor class. The type must be a subclass of ray.train.predictor.Predictor.

  • checkpoint

    The checkpoint object or a uri to load checkpoint from

    • The checkpoint object must be an instance of ray.air.checkpoint.Checkpoint.

    • The uri string will be called to construct a checkpoint object using Checkpoint.from_uri("uri_to_load_from").

  • http_adapter – The FastAPI input conversion function. By default, Serve will use the NdArray schema and convert to numpy array. You can pass in any FastAPI dependency resolver that returns an array. When you pass in a string, Serve will import it. Please refer to Serve HTTP adatpers documentation to learn more.

  • batching_params – override the default parameters to ray.serve.batch(). Pass False to disable batching.

  • predict_kwargs – optional keyword arguments passed to the Predictor.predict method upon each call.

  • **predictor_from_checkpoint_kwargs – Additional keyword arguments passed to the Predictor.from_checkpoint() call.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

Trainer and Predictor Integrations

XGBoost

class ray.train.xgboost.XGBoostTrainer(*args, **kwargs)[source]

Bases: ray.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel XGBoost training.

This Trainer runs the XGBoost training loop in a distributed manner using multiple Ray Actors.

Note

XGBoostTrainer does not modify or otherwise alter the working of the XGBoost distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on XGBoost distributed training, refer to XGBoost documentation.

Example

import ray

from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = XGBoostTrainer(
    label_column="y",
    params={"objective": "reg:squarederror"},
    scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – XGBoost training parameters. Refer to XGBoost documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to xgboost.DMatrix objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to xgboost.train() function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)
class ray.train.xgboost.XGBoostCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with XGBoost-specific functionality.

Create this from a generic Checkpoint by calling XGBoostCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(booster: xgboost.core.Booster, *, preprocessor: Optional[Preprocessor] = None) XGBoostCheckpoint[source]

Create a Checkpoint that stores an XGBoost model.

Parameters
  • booster – The XGBoost model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An XGBoostCheckpoint containing the specified Estimator.

Examples

>>> import numpy as np
>>> import ray
>>> from ray.train.xgboost import XGBoostCheckpoint
>>> import xgboost
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = xgboost.XGBClassifier().fit(train_X, train_y)
>>> checkpoint = XGBoostCheckpoint.from_model(model.get_booster())

You can use a XGBoostCheckpoint to create an XGBoostPredictor and preform inference.

>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> predictor = XGBoostPredictor.from_checkpoint(checkpoint)
get_model() xgboost.core.Booster[source]

Retrieve the XGBoost model stored in this checkpoint.

class ray.train.xgboost.XGBoostPredictor(model: xgboost.core.Booster, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for XGBoost models.

Parameters
  • model – The XGBoost booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.xgboost.xgboost_predictor.XGBoostPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of XGBoostTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a XGBoostTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, dmatrix_kwargs: Optional[Dict[str, Any]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

The data is converted into an XGBoost DMatrix before being inputted to the model.

Parameters
  • data – A batch of input data.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • dmatrix_kwargs – Dict of keyword arguments passed to xgboost.DMatrix.

  • **predict_kwargs – Keyword arguments passed to xgboost.Booster.predict.

Examples

>>> import numpy as np
>>> import xgboost as xgb
>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = xgb.XGBClassifier().fit(train_X, train_y)
>>> predictor = XGBoostPredictor(model=model.get_booster())
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictor.predict(data, feature_columns=[0, 1])
array([0.5, 0.5], dtype=float32)
>>> import pandas as pd
>>> import xgboost as xgb
>>> from ray.train.xgboost import XGBoostPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = xgb.XGBClassifier().fit(train_X, train_y)
>>> predictor = XGBoostPredictor(model=model.get_booster())
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictor.predict(data, feature_columns=["A", "B"])
   predictions
0          0.5
1          0.5
Returns

Prediction result.

LightGBM

class ray.train.lightgbm.LightGBMTrainer(*args, **kwargs)[source]

Bases: ray.train.gbdt_trainer.GBDTTrainer

A Trainer for data parallel LightGBM training.

This Trainer runs the LightGBM training loop in a distributed manner using multiple Ray Actors.

If you would like to take advantage of LightGBM’s built-in handling for features with the categorical data type, consider using the Categorizer preprocessor to set the dtypes in the dataset.

Note

LightGBMTrainer does not modify or otherwise alter the working of the LightGBM distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on LightGBM distributed training, refer to LightGBM documentation.

Example

import ray

from ray.train.lightgbm import LightGBMTrainer
from ray.air.config import ScalingConfig

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = LightGBMTrainer(
    label_column="y",
    params={"objective": "regression"},
    scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.

  • label_column – Name of the label column. A column with this name must be present in the training dataset.

  • params – LightGBM training parameters passed to lightgbm.train(). Refer to LightGBM documentation for a list of possible parameters.

  • dmatrix_params – Dict of dataset name:dict of kwargs passed to respective xgboost_ray.RayDMatrix initializations, which in turn are passed to lightgbm.Dataset objects created on each worker. For example, this can be used to add sample weights with the weights parameter.

  • scaling_config – Configuration for how to scale data parallel training.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

  • **train_kwargs – Additional kwargs passed to lightgbm.train() function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)
class ray.train.lightgbm.LightGBMCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with LightGBM-specific functionality.

Create this from a generic Checkpoint by calling LightGBMCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(booster: lightgbm.basic.Booster, *, preprocessor: Optional[Preprocessor] = None) LightGBMCheckpoint[source]

Create a Checkpoint that stores a LightGBM model.

Parameters
  • booster – The LightGBM model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An LightGBMCheckpoint containing the specified Estimator.

Examples

>>> import lightgbm
>>> import numpy as np
>>> from ray.train.lightgbm import LightGBMCheckpoint
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = lightgbm.LGBMClassifier().fit(train_X, train_y)
>>> checkpoint = LightGBMCheckpoint.from_model(model.booster_)

You can use a LightGBMCheckpoint to create an LightGBMPredictor and preform inference.

>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> predictor = LightGBMPredictor.from_checkpoint(checkpoint)
get_model() lightgbm.basic.Booster[source]

Retrieve the LightGBM model stored in this checkpoint.

class ray.train.lightgbm.LightGBMPredictor(model: lightgbm.basic.Booster, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for LightGBM models.

Parameters
  • model – The LightGBM booster to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.lightgbm.lightgbm_predictor.LightGBMPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of LightGBMTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a LightGBMTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

Parameters
  • data – A batch of input data.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • **predict_kwargs – Keyword arguments passed to lightgbm.Booster.predict.

Examples

>>> import numpy as np
>>> import lightgbm as lgbm
>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = lgbm.LGBMClassifier().fit(train_X, train_y)
>>> predictor = LightGBMPredictor(model=model.booster_)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictions = predictor.predict(data, feature_columns=[0, 1])
>>> import pandas as pd
>>> import lightgbm as lgbm
>>> from ray.train.lightgbm import LightGBMPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = lgbm.LGBMClassifier().fit(train_X, train_y)
>>> predictor = LightGBMPredictor(model=model.booster_)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

TensorFlow

class ray.train.tensorflow.TensorflowTrainer(*args, **kwargs)[source]

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Tensorflow training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary TensorFlow process group already configured for distributed TensorFlow training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

You can also use ray.train.tensorflow.prepare_dataset_shard() within your training code.

def train_loop_per_worker():
    # Turns off autosharding for a dataset.
    # You should use this if you are doing
    # `session.get_dataset_shard(...).iter_tf_batches(...)`
    # as the data will be already sharded.
    train.tensorflow.prepare_dataset_shard(...)

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

To save a model to use for the TensorflowPredictor, you must save it under the “model” kwarg in Checkpoint passed to session.report().

Example:

import tensorflow as tf

import ray
from ray.air import session, Checkpoint
from ray.train.tensorflow import prepare_dataset_shard, TensorflowTrainer
from ray.air.config import ScalingConfig

input_size = 1

def build_model():
    # toy neural network : 1-layer
    return tf.keras.Sequential(
        [tf.keras.layers.Dense(
            1, activation="linear", input_shape=(input_size,))]
    )

def train_loop_for_worker(config):
    dataset_shard = session.get_dataset_shard("train")
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = build_model()
        model.compile(
            optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    def to_tf_dataset(dataset, batch_size):
        def to_tensor_iterator():
            for batch in dataset.iter_tf_batches(
                batch_size=batch_size, dtypes=tf.float32
            ):
                yield tf.expand_dims(batch["x"], 1), batch["y"]

        output_signature = (
            tf.TensorSpec(shape=(None, 1), dtype=tf.float32),
            tf.TensorSpec(shape=(None), dtype=tf.float32),
        )
        tf_dataset = tf.data.Dataset.from_generator(
            to_tensor_iterator, output_signature=output_signature
        )
        return prepare_dataset_shard(tf_dataset)

    for epoch in range(config["num_epochs"]):
        tf_dataset = to_tf_dataset(dataset=dataset_shard, batch_size=1)
        model.fit(tf_dataset)
        # You can also use ray.air.callbacks.keras.Callback
        # for reporting and checkpointing instead of reporting manually.
        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=model.get_weights())
            ),
        )

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = TensorflowTrainer(scaling_config=ScalingConfig(num_workers=3),
    datasets={"train": train_dataset},
    train_loop_config={"num_epochs": 2})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • tensorflow_config – Configuration for setting up the TensorFlow backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, tensorflow_config: Optional[ray.train.tensorflow.config.TensorflowConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
class ray.train.tensorflow.TensorflowCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with TensorFlow-specific functionality.

Create this from a generic Checkpoint by calling TensorflowCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_model(model: keras.engine.training.Model, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint[source]

Create a Checkpoint that stores a Keras model.

Parameters
  • model – The Keras model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TensorflowCheckpoint containing the specified model.

Examples

>>> from ray.train.tensorflow import TensorflowCheckpoint
>>> import tensorflow as tf
>>>
>>> model = tf.keras.applications.resnet.ResNet101()  
>>> checkpoint = TensorflowCheckpoint.from_model(model)  
get_model_weights() keras.engine.training.Model[source]

Retrieve the model weights stored in this checkpoint.

class ray.train.tensorflow.TensorflowConfig[source]

Bases: ray.train.backend.BackendConfig

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.tensorflow.prepare_dataset_shard(tf_dataset_shard: tensorflow.python.data.ops.dataset_ops.DatasetV2)[source]

A utility function that overrides default config for Tensorflow Dataset.

This should be used on a TensorFlow Dataset created by calling iter_tf_batches() on a ray.data.Dataset returned by ray.train.get_dataset_shard() since the dataset has already been sharded across the workers.

Parameters

tf_dataset_shard (tf.data.Dataset) – A TensorFlow Dataset.

Returns

  • autosharding turned off

  • prefetching turned on with autotune enabled

Return type

A TensorFlow Dataset with

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.train.tensorflow.TensorflowPredictor(model_definition: Union[Callable[[], keras.engine.training.Model], Type[keras.engine.training.Model]], preprocessor: Optional[Preprocessor] = None, model_weights: Optional[list] = None, use_gpu: bool = False)[source]

Bases: ray.train._internal.dl_predictor.DLPredictor

A predictor for TensorFlow models.

Parameters
  • model_definition – A callable that returns a TensorFlow Keras model to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • model_weights – List of weights to use for the model.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model_definition: Union[Callable[[], keras.engine.training.Model], Type[keras.engine.training.Model]], use_gpu: bool = False) ray.train.tensorflow.tensorflow_predictor.TensorflowPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TensorflowTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TensorflowTrainer run.

  • model_definition – A callable that returns a TensorFlow Keras model to use. Model weights will be loaded from the checkpoint.

call_model(tensor: Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]]) Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]][source]

Runs inference on a single batch of tensor data.

This method is called by TorchPredictor.predict after converting the original data batch to torch tensors.

Override this method to add custom logic for processing the model input or output.

Example

# List outputs are not supported by default TensorflowPredictor.
def build_model() -> tf.keras.Model:
    input = tf.keras.layers.Input(shape=1)
    model = tf.keras.models.Model(inputs=input, outputs=[input, input])
    return model

# Use a custom predictor to format model output as a dict.
class CustomPredictor(TensorflowPredictor):
    def call_model(self, tensor):
        model_output = super().call_model(tensor)
        return {
            str(i): model_output[i] for i in range(len(model_output))
        }

predictor = CustomPredictor(model_definition=build_model)
predictions = predictor.predict(data_batch)
Parameters

tensor – A batch of data to predict on, represented as either a single PyTorch tensor or for multi-input models, a dictionary of tensors.

Returns

The model outputs, either as a single tensor or a dictionary of tensors.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], dtype: Optional[Union[tensorflow.python.framework.dtypes.DType, Dict[str, tensorflow.python.framework.dtypes.DType]]] = None) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single Tensorflow tensor before being inputted to the model.

If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • dtype – The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.

Examples

>>> import numpy as np
>>> import tensorflow as tf
>>> from ray.train.tensorflow import TensorflowPredictor
>>>
>>> def build_model():
...     return tf.keras.Sequential(
...         [
...             tf.keras.layers.InputLayer(input_shape=()),
...             tf.keras.layers.Flatten(),
...             tf.keras.layers.Dense(1),
...         ]
...     )
>>>
>>> weights = [np.array([[2.0]]), np.array([0.0])]
>>> predictor = TensorflowPredictor(
...     model_definition=build_model, model_weights=weights)
>>>
>>> data = np.asarray([1, 2, 3])
>>> predictions = predictor.predict(data) 
>>> import pandas as pd
>>> import tensorflow as tf
>>> from ray.train.tensorflow import TensorflowPredictor
>>>
>>> def build_model():
...     input1 = tf.keras.layers.Input(shape=(1,), name="A")
...     input2 = tf.keras.layers.Input(shape=(1,), name="B")
...     merged = tf.keras.layers.Concatenate(axis=1)([input1, input2])
...     output = tf.keras.layers.Dense(2, input_dim=2)(merged)
...     return tf.keras.models.Model(
...         inputs=[input1, input2], outputs=output)
>>>
>>> predictor = TensorflowPredictor(model_definition=build_model)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>>
>>> predictions = predictor.predict(data) 
Returns

Prediction result. The return type will be the same as the

input type.

Return type

DataBatchType

PyTorch

class ray.train.torch.TorchTrainer(*args, **kwargs)[source]

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel PyTorch training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary torch process group already configured for distributed PyTorch training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

You can also use any of the Torch specific function utils, such as ray.train.torch.get_device() and ray.train.torch.prepare_model()

def train_loop_per_worker():
    # Prepares model for distribted training by wrapping in
    # `DistributedDataParallel` and moving to correct device.
    train.torch.prepare_model(...)

    # Configures the dataloader for distributed training by adding a
    # `DistributedSampler`.
    # You should NOT use this if you are doing
    # `session.get_dataset_shard(...).iter_torch_batches(...)`
    train.torch.prepare_data_loader(...)

    # Returns the current torch device.
    train.torch.get_device()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

To save a model to use for the TorchPredictor, you must save it under the “model” kwarg in Checkpoint passed to session.report().

Example

import torch
import torch.nn as nn

import ray
from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

    model = train.torch.prepare_model(model)

    for epoch in range(num_epochs):
        for batches in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
            output = model(inputs)
            loss = loss_fn(output, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=model.state_dict())
            ),
        )

train_dataset = ray.data.from_items(
    [{"x": x, "y": 2 * x + 1} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset})
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
class ray.train.torch.TorchCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with Torch-specific functionality.

Create this from a generic Checkpoint by calling TorchCheckpoint.from_checkpoint(ckpt).

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_state_dict(state_dict: Dict[str, Any], *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint[source]

Create a Checkpoint that stores a model state dictionary.

Tip

This is the recommended method for creating TorchCheckpoints.

Parameters
  • state_dict – The model state dictionary to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TorchCheckpoint containing the specified state dictionary.

Examples

>>> from ray.train.torch import TorchCheckpoint
>>> import torch
>>>
>>> model = torch.nn.Linear(1, 1)
>>> checkpoint = TorchCheckpoint.from_state_dict(model.state_dict())

To load the state dictionary, call get_model().

>>> checkpoint.get_model(torch.nn.Linear(1, 1))
Linear(in_features=1, out_features=1, bias=True)
classmethod from_model(model: torch.nn.modules.module.Module, *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint[source]

Create a Checkpoint that stores a Torch model.

Note

PyTorch recommends storing state dictionaries. To create a TorchCheckpoint from a state dictionary, call from_state_dict(). To learn more about state dictionaries, read Saving and Loading Models.

Parameters
  • model – The Torch model to store in the checkpoint.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A TorchCheckpoint containing the specified model.

Examples

>>> from ray.train.torch import TorchCheckpoint
>>> import torch
>>>
>>> model = torch.nn.Identity()
>>> checkpoint = TorchCheckpoint.from_model(model)

You can use a TorchCheckpoint to create an TorchPredictor and perform inference.

>>> from ray.train.torch import TorchPredictor
>>>
>>> predictor = TorchPredictor.from_checkpoint(checkpoint)
get_model(model: Optional[torch.nn.modules.module.Module] = None) torch.nn.modules.module.Module[source]

Retrieve the model stored in this checkpoint.

Parameters

model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model. Otherwise, the model will be discarded.

class ray.train.torch.TorchConfig(backend: Optional[str] = None, init_method: str = 'env', timeout_s: int = 1800)[source]

Bases: ray.train.backend.BackendConfig

Configuration for torch process group setup.

See https://pytorch.org/docs/stable/distributed.html for more info.

Parameters
  • backend – The backend to use for training. See torch.distributed.init_process_group for more info and valid values. If set to None, nccl will be used if GPUs are requested, else gloo will be used.

  • init_method – The initialization method to use. Either “env” for environment variable initialization or “tcp” for TCP initialization. Defaults to “env”.

  • timeout_s – Seconds for process group operations to timeout.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.accelerate(amp: bool = False) None[source]

Enables training optimizations.

Parameters

amp – If true, perform training with automatic mixed precision. Otherwise, use full precision.

Warning

train.torch.accelerate cannot be called more than once, and it must be called before any other train.torch utility function.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.get_device() torch.device[source]

Gets the correct torch device to use for training.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_model(model: torch.nn.modules.module.Module, move_to_device: bool = True, parallel_strategy: Optional[str] = 'ddp', parallel_strategy_kwargs: Optional[Dict[str, Any]] = None, wrap_ddp: bool = True, ddp_kwargs: Optional[Dict[str, Any]] = None) torch.nn.modules.module.Module[source]

Prepares the model for distributed execution.

This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).

Parameters
  • model (torch.nn.Module) – A torch model to prepare.

  • move_to_device – Whether to move the model to the correct device. If set to False, the model needs to manually be moved to the correct device.

  • parallel_strategy ("ddp", "fsdp", or None) – Whether to wrap models in DistributedDataParallel, FullyShardedDataParallel, or neither.

  • parallel_strategy_kwargs (Dict[str, Any]) – Args to pass into DistributedDataParallel or FullyShardedDataParallel initialization if parallel_strategy is set to “ddp” or “fsdp”, respectively.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_optimizer(optimizer: torch.optim.optimizer.Optimizer) torch.optim.optimizer.Optimizer[source]

Wraps optimizer to support automatic mixed precision.

Parameters

optimizer (torch.optim.Optimizer) – The DataLoader to prepare.

Returns

A wrapped optimizer.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.prepare_data_loader(data_loader: torch.utils.data.dataloader.DataLoader, add_dist_sampler: bool = True, move_to_device: bool = True, auto_transfer: bool = True) torch.utils.data.dataloader.DataLoader[source]

Prepares DataLoader for distributed execution.

This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).

Parameters
  • data_loader (torch.utils.data.DataLoader) – The DataLoader to prepare.

  • add_dist_sampler – Whether to add a DistributedSampler to the provided DataLoader.

  • move_to_device – If set, automatically move the data returned by the data loader to the correct device.

  • auto_transfer – If set and device is GPU, another CUDA stream is created to automatically copy data from host (CPU) memory to device (GPU) memory (the default CUDA stream still runs the training procedure). If device is CPU, it will be disabled regardless of the setting. This configuration will be ignored if move_to_device is False.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.backward(tensor: torch.Tensor) None[source]

Computes the gradient of the specified tensor w.r.t. graph leaves.

Parameters

tensor (torch.Tensor) – Tensor of which the derivative will be computed.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.train.torch.enable_reproducibility(seed: int = 0) None[source]

Limits sources of nondeterministic behavior.

This function:

  • Seeds PyTorch, Python, and NumPy.

  • Disables CUDA convolution benchmarking.

  • Configures PyTorch to use determinstic algorithms.

  • Seeds workers spawned for multi-process data loading.

Parameters

seed – The number to seed libraries and data workers with.

Warning

train.torch.enable_reproducibility() can’t guarantee completely reproducible results across executions. To learn more, read the PyTorch notes on randomness.

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.train.torch.TorchWorkerProfiler(trace_dir: Optional[str] = None)[source]

Bases: object

Utility class for running PyTorch Profiler on a Train worker.

Parameters

trace_dir (Optional[str]) – The directory to store traces on the worker node. If None, this will use a default temporary dir.

Warning

DEPRECATED: This API is deprecated and may be removed in a future Ray release.

trace_handler(p: torch.profiler.profiler.profile)[source]

A stateful PyTorch Profiler trace handler.

This will the export chrome trace to a file on disk.

These exported traces can then be fetched by calling get_and_clear_profile_traces.

Parameters

p – A PyTorch Profiler profile.

get_and_clear_profile_traces()[source]

Reads unread Profiler traces from this worker.

Returns

The traces in a format consumable by TorchTensorboardProfilerCallback.

class ray.train.torch.TorchPredictor(model: torch.nn.modules.module.Module, preprocessor: Optional[Preprocessor] = None, use_gpu: bool = False)[source]

Bases: ray.train._internal.dl_predictor.DLPredictor

A predictor for PyTorch models.

Parameters
  • model – The torch module to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

PublicAPI (beta): This API is in beta and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None, use_gpu: bool = False) ray.train.torch.torch_predictor.TorchPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of TorchTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a TorchTrainer run.

  • model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this model. If the checkpoint already contains the model itself, this model argument will be discarded.

  • use_gpu – If set, the model will be moved to GPU on instantiation and prediction happens on GPU.

call_model(tensor: Union[torch.Tensor, Dict[str, torch.Tensor]]) Union[torch.Tensor, Dict[str, torch.Tensor]][source]

Runs inference on a single batch of tensor data.

This method is called by TorchPredictor.predict after converting the original data batch to torch tensors.

Override this method to add custom logic for processing the model input or output.

Example

# List outputs are not supported by default TorchPredictor.
class MyModel(torch.nn.Module):
    def forward(self, input_tensor):
        return [input_tensor, input_tensor]

# Use a custom predictor to format model output as a dict.
class CustomPredictor(TorchPredictor):
    def call_model(self, tensor):
        model_output = super().call_model(tensor)
        return {
            str(i): model_output[i] for i in range(len(model_output))
        }

predictor = CustomPredictor(model=MyModel())
predictions = predictor.predict(data_batch)
Parameters

tensor – A batch of data to predict on, represented as either a single PyTorch tensor or for multi-input models, a dictionary of tensors.

Returns

The model outputs, either as a single tensor or a dictionary of tensors.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], dtype: Optional[Union[torch.dtype, Dict[str, torch.dtype]]] = None) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single PyTorch tensor before being inputted to the model.

If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).

Parameters
  • data – A batch of input data of DataBatchType.

  • dtype – The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.

Examples

>>> import numpy as np
>>> import torch
>>> from ray.train.torch import TorchPredictor
>>>
>>> model = torch.nn.Linear(2, 1)
>>> predictor = TorchPredictor(model=model)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data, dtype=torch.float)
>>> import pandas as pd
>>> import torch
>>> from ray.train.torch import TorchPredictor
>>>
>>> class CustomModule(torch.nn.Module):
...     def __init__(self):
...         super().__init__()
...         self.linear1 = torch.nn.Linear(1, 1)
...         self.linear2 = torch.nn.Linear(1, 1)
...
...     def forward(self, input_dict: dict):
...         out1 = self.linear1(input_dict["A"].unsqueeze(1))
...         out2 = self.linear2(input_dict["B"].unsqueeze(1))
...         return out1 + out2
>>>
>>> predictor = TorchPredictor(model=CustomModule())
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>>
>>> predictions = predictor.predict(data, dtype=torch.float)
Returns

Prediction result. The return type will be the same as the

input type.

Return type

DataBatchType

Horovod

class ray.train.horovod.HorovodTrainer(*args, **kwargs)[source]

Bases: ray.train.data_parallel_trainer.DataParallelTrainer

A Trainer for data parallel Horovod training.

This Trainer runs the function train_loop_per_worker on multiple Ray Actors. These actors already have the necessary Horovod setup already configured for distributed Horovod training.

The train_loop_per_worker function is expected to take in either 0 or 1 arguments:

def train_loop_per_worker():
    ...
def train_loop_per_worker(config: Dict):
    ...

If train_loop_per_worker accepts an argument, then train_loop_config will be passed in as the argument. This is useful if you want to tune the values in train_loop_config as hyperparameters.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed by session.get_dataset_shard("train") inside train_loop_per_worker. All the other datasets will not be split and session.get_dataset_shard(...) will return the the entire Dataset.

Inside the train_loop_per_worker function, you can use any of the Ray AIR session methods.

def train_loop_per_worker():
    # Report intermediate results for callbacks or logging and
    # checkpoint data.
    session.report(...)

    # Returns dict of last saved checkpoint.
    session.get_checkpoint()

    # Returns the Ray Dataset shard for the given key.
    session.get_dataset_shard("my_dataset")

    # Returns the total number of workers executing training.
    session.get_world_size()

    # Returns the rank of this worker.
    session.get_world_rank()

    # Returns the rank of the worker on the current node.
    session.get_local_rank()

Any returns from the train_loop_per_worker will be discarded and not used or persisted anywhere.

You could use TensorflowPredictor or TorchPredictor in conjunction with HorovodTrainer. You must save the model under the “model” kwarg in the Checkpoint passed to session.report(), so that it can be used by corresponding predictors.

Example:

import ray
import ray.train as train
import ray.train.torch. # Need this to use `train.torch.get_device()`
import horovod.torch as hvd
import torch
import torch.nn as nn
from ray.air import session, Checkpoint
from ray.train.horovod import HorovodTrainer
from ray.air.config import ScalingConfig

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)
    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    hvd.init()
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()
    device = train.torch.get_device()
    model.to(device)
    loss_fn = nn.MSELoss()
    lr_scaler = 1
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
        optimizer,
        named_parameters=model.named_parameters(),
        op=hvd.Average,
    )
    for epoch in range(num_epochs):
        model.train()
        for batch in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
            inputs.to(device)
            labels.to(device)
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")
        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(model=model.state_dict())
            ),
        )
train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
trainer = HorovodTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()
Parameters
  • train_loop_per_worker – The training function to execute. This can either take in no arguments or a config dict.

  • train_loop_config – Configurations to pass into train_loop_per_worker if it accepts an argument.

  • horovod_config – Configuration for setting up the Horovod backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (beta): This API is in beta and may change before becoming stable.

__init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, horovod_config: Optional[ray.train.horovod.config.HorovodConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
class ray.train.horovod.HorovodConfig(nics: Optional[Set[str]] = None, verbose: int = 1, key: Optional[str] = None, ssh_port: Optional[int] = None, ssh_identity_file: Optional[str] = None, ssh_str: Optional[str] = None, timeout_s: int = 300, placement_group_timeout_s: int = 100)[source]

Bases: ray.train.backend.BackendConfig

Configurations for Horovod setup.

See https://github.com/horovod/horovod/blob/master/horovod/runner/common/util/settings.py # noqa: E501

Parameters
  • nics (Optional[Set[str]) – Network interfaces that can be used for communication.

  • verbose – Horovod logging verbosity.

  • key (Optional[str]) – Secret used for communication between workers.

  • ssh_port (Optional[int]) – Port for SSH server running on worker nodes.

  • ssh_identity_file (Optional[str]) – Path to the identity file to ssh into different hosts on the cluster.

  • ssh_str (Optional[str]) – CAUTION WHEN USING THIS. Private key file contents. Writes the private key to ssh_identity_file.

  • timeout_s – Timeout parameter for Gloo rendezvous.

  • placement_group_timeout_s – Timeout parameter for Ray Placement Group creation. Currently unused.

PublicAPI (beta): This API is in beta and may change before becoming stable.

HuggingFace

class ray.train.huggingface.HuggingFaceTrainer(*args, **kwargs)[source]

Bases: ray.train.torch.torch_trainer.TorchTrainer

A Trainer for data parallel HuggingFace Transformers on PyTorch training.

This Trainer runs the transformers.Trainer.train() method on multiple Ray Actors. The training is carried out in a distributed fashion through PyTorch DDP. These actors already have the necessary torch process group already configured for distributed PyTorch training. If you have PyTorch >= 1.12.0 installed, you can also run FSDP training by specifying the fsdp argument in TrainingArguments. For more information on configuring FSDP, refer to Hugging Face documentation.

The training function ran on every Actor will first run the specified trainer_init_per_worker function to obtain an instantiated transformers.Trainer object. The trainer_init_per_worker function will have access to preprocessed train and evaluation datasets.

If the datasets dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards, with each Actor training on a single shard. All the other datasets will not be split.

Please note that if you use a custom transformers.Trainer subclass, the get_train_dataloader method will be wrapped around to disable sharding by transformers.IterableDatasetShard, as the dataset will already be sharded on the Ray AIR side.

HuggingFace loggers will be automatically disabled, and the local_rank argument in TrainingArguments will be automatically set. Please note that if you want to use CPU training, you will need to set the no_cuda argument in TrainingArguments manually - otherwise, an exception (segfault) may be thrown.

This Trainer requires transformers>=4.19.0 package.

Example

# Based on
# huggingface/notebooks/examples/language_modeling_from_scratch.ipynb

# Hugging Face imports
from datasets import load_dataset
import transformers
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer

import ray
from ray.train.huggingface import HuggingFaceTrainer
from ray.air.config import ScalingConfig

model_checkpoint = "gpt2"
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
block_size = 128

datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)

def tokenize_function(examples):
    return tokenizer(examples["text"])

tokenized_datasets = datasets.map(
    tokenize_function, batched=True, num_proc=1, remove_columns=["text"]
)

def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {
        k: sum(examples[k], []) for k in examples.keys()
    }
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model
    # supported it.
    # instead of this drop, you can customize this part to your needs.
    total_length = (total_length // block_size) * block_size
    # Split by chunks of max_len.
    result = {
        k: [
            t[i : i + block_size]
            for i in range(0, total_length, block_size)
        ]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result

lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    batch_size=1000,
    num_proc=1,
)
ray_train_ds = ray.data.from_huggingface(lm_datasets["train"])
ray_evaluation_ds = ray.data.from_huggingface(
    lm_datasets["validation"]
)

def trainer_init_per_worker(train_dataset, eval_dataset, **config):
    model_config = AutoConfig.from_pretrained(model_checkpoint)
    model = AutoModelForCausalLM.from_config(model_config)
    args = transformers.TrainingArguments(
        output_dir=f"{model_checkpoint}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
    )
    return transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )

scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
trainer = HuggingFaceTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "evaluation": ray_evaluation_ds},
)
result = trainer.fit()
Parameters
  • trainer_init_per_worker – The function that returns an instantiated transformers.Trainer object and takes in the following arguments: train Torch.Dataset, optional evaluation Torch.Dataset and config as kwargs. The Torch Datasets are automatically created by converting the Ray Datasets internally before they are passed into the function.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset and (optionally) key “evaluation” to denote the evaluation dataset. Can only contain a training dataset and up to one extra dataset to be used for evaluation. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided.

  • trainer_init_config – Configurations to pass into trainer_init_per_worker as kwargs.

  • torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the backend_config arg of DataParallelTrainer. Same as in TorchTrainer.

  • scaling_config – Configuration for how to scale data parallel training.

  • dataset_config – Configuration for dataset ingest.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

__init__(trainer_init_per_worker: Callable[[torch.utils.data.dataset.Dataset, Optional[torch.utils.data.dataset.Dataset], Any], transformers.trainer.Trainer], *, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], trainer_init_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
setup() None[source]

Called during fit() to perform initial setup on the Trainer.

Note

This method is run on a remote process.

This method will not be called on the driver, so any expensive setup operations should be placed here and not in __init__.

This method is called prior to preprocess_datasets and training_loop.

class ray.train.huggingface.HuggingFaceCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with HuggingFace-specific functionality.

Create this from a generic Checkpoint by calling HuggingFaceCheckpoint.from_checkpoint(ckpt)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_model(model: Union[transformers.modeling_utils.PreTrainedModel, torch.nn.modules.module.Module], tokenizer: Optional[transformers.tokenization_utils.PreTrainedTokenizer] = None, *, path: os.PathLike, preprocessor: Optional[Preprocessor] = None) HuggingFaceCheckpoint[source]

Create a Checkpoint that stores a HuggingFace model.

Parameters
  • model – The pretrained transformer or Torch model to store in the checkpoint.

  • tokenizer – The Tokenizer to use in the Transformers pipeline for inference.

  • path – The directory where the checkpoint will be stored.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

A HuggingFaceCheckpoint containing the specified model.

get_model(model: Union[Type[transformers.modeling_utils.PreTrainedModel], torch.nn.modules.module.Module], **pretrained_model_kwargs) Union[transformers.modeling_utils.PreTrainedModel, torch.nn.modules.module.Module][source]

Retrieve the model stored in this checkpoint.

get_tokenizer(tokenizer: Type[transformers.tokenization_utils.PreTrainedTokenizer], **kwargs) Optional[transformers.tokenization_utils.PreTrainedTokenizer][source]

Create a tokenizer using the data stored in this checkpoint.

get_training_arguments() transformers.training_args.TrainingArguments[source]

Retrieve the training arguments stored in this checkpoint.

class ray.train.huggingface.HuggingFacePredictor(pipeline: Optional[transformers.pipelines.base.Pipeline] = None, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for HuggingFace Transformers PyTorch models.

This predictor uses Transformers Pipelines for inference.

Parameters
  • pipeline – The Transformers pipeline to use for inference.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, *, pipeline_cls: Optional[Type[transformers.pipelines.base.Pipeline]] = None, **pipeline_kwargs) ray.train.huggingface.huggingface_predictor.HuggingFacePredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of HuggingFaceTrainer.

Parameters
  • checkpoint – The checkpoint to load the model, tokenizer and preprocessor from. It is expected to be from the result of a HuggingFaceTrainer run.

  • pipeline_cls – A transformers.pipelines.Pipeline class to use. If not specified, will use the pipeline abstraction wrapper.

  • **pipeline_kwargs – Any kwargs to pass to the pipeline initialization. If pipeline is None, this must contain the ‘task’ argument. Cannot contain ‘model’. Can be used to override the tokenizer with ‘tokenizer’.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

The data is converted into a list (unless pipeline is a TableQuestionAnsweringPipeline) and passed to the pipeline object.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, use all columns.

  • **pipeline_call_kwargs – additional kwargs to pass to the pipeline object.

Examples

>>> import pandas as pd
>>> from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
>>> from transformers.pipelines import pipeline
>>> from ray.train.huggingface import HuggingFacePredictor
>>>
>>> model_checkpoint = "gpt2"
>>> tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
>>> tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)
>>>
>>> model_config = AutoConfig.from_pretrained(model_checkpoint)
>>> model = AutoModelForCausalLM.from_config(model_config)
>>> predictor = HuggingFacePredictor(
...     pipeline=pipeline(
...         task="text-generation", model=model, tokenizer=tokenizer
...     )
... )
>>>
>>> prompts = pd.DataFrame(
...     ["Complete me", "And me", "Please complete"], columns=["sentences"]
... )
>>> predictions = predictor.predict(prompts)
Returns

Prediction result.

Scikit-Learn

class ray.train.sklearn.SklearnTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

A Trainer for scikit-learn estimator training.

This Trainer runs the fit method of the given estimator in a non-distributed manner on a single Ray Actor.

By default, the n_jobs (or thread_count) estimator parameters will be set to match the number of CPUs assigned to the Ray Actor. This behavior can be disabled by setting set_estimator_cpus=False.

If you wish to use GPU-enabled estimators (eg. cuML), make sure to set "GPU": 1 in scaling_config.trainer_resources.

The results are reported all at once and not in an iterative fashion. No checkpointing is done during training. This may be changed in the future.

Example:

import ray

from ray.train.sklearn import SklearnTrainer
from sklearn.ensemble import RandomForestRegressor

train_dataset = ray.data.from_items(
    [{"x": x, "y": x + 1} for x in range(32)])
trainer = SklearnTrainer(
    estimator=RandomForestRegressor(),
    label_column="y",
    scaling_config=ray.air.config.ScalingConfig(
        trainer_resources={"CPU": 4}
    ),
    datasets={"train": train_dataset}
)
result = trainer.fit()
Parameters
  • estimator – A scikit-learn compatible estimator to use.

  • datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. All non-training datasets will be used as separate validation sets, each reporting separate metrics.

  • label_column – Name of the label column. A column with this name must be present in the training dataset. If None, no validation will be performed.

  • params – Optional dict of params to be set on the estimator before fitting. Useful for hyperparameter tuning.

  • scoring

    Strategy to evaluate the performance of the model on the validation sets and for cross-validation. Same as in sklearn.model_selection.cross_validation. If scoring represents a single score, one can use:

    • a single string;

    • a callable that returns a single value.

    If scoring represents multiple scores, one can use:

    • a list or tuple of unique strings;

    • a callable returning a dictionary where the keys are the metric names and the values are the metric scores;

    • a dictionary with metric names as keys and callables a values.

  • cv

    Determines the cross-validation splitting strategy. If specified, cross-validation will be run on the train dataset, in addition to computing metrics for validation datasets. Same as in sklearn.model_selection.cross_validation, with the exception of None. Possible inputs for cv are:

    • None, to skip cross-validation.

    • int, to specify the number of folds in a (Stratified)KFold,

    • CV splitter,

    • An iterable yielding (train, test) splits as arrays of indices.

    For int/None inputs, if the estimator is a classifier and y is either binary or multiclass, StratifiedKFold is used. In all other cases, KFold is used. These splitters are instantiated with shuffle=False so the splits will be the same across calls.

    If you provide a “cv_groups” column in the train dataset, it will be used as group labels for the samples used while splitting the dataset into train/test set. Only used in conjunction with a “Group” cv instance (e.g., GroupKFold). This corresponds to the groups argument in sklearn.model_selection.cross_validation.

  • return_train_score_cv – Whether to also return train scores during cross-validation. Ignored if cv is None.

  • parallelize_cv – If set to True, will parallelize cross-validation instead of the estimator. If set to None, will detect if the estimator has any parallelism-related params (n_jobs or thread_count) and parallelize cross-validation if there are none. If False, will not parallelize cross-validation. Cannot be set to True if there are any GPUs assigned to the trainer. Ignored if cv is None.

  • set_estimator_cpus – If set to True, will automatically set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to match the number of available CPUs.

  • scaling_config – Configuration for how to scale training. Only the trainer_resources key can be provided, as the training is not distributed.

  • run_config – Configuration for the execution of the training run.

  • preprocessor – A ray.data.Preprocessor to preprocess the provided datasets.

  • **fit_params – Additional kwargs passed to estimator.fit() method.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

__init__(*, estimator: sklearn.base.BaseEstimator, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: Optional[str] = None, params: Optional[Dict[str, Any]] = None, scoring: Optional[Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float], Iterable[Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float]]], Dict[str, Union[str, Callable[[sklearn.base.BaseEstimator, Union[pandas.core.frame.DataFrame, numpy.ndarray], Union[pandas.core.frame.DataFrame, numpy.ndarray]], float]]]]] = None, cv: Optional[Union[int, Iterable, sklearn.model_selection._split.BaseCrossValidator]] = None, return_train_score_cv: bool = False, parallelize_cv: Optional[bool] = None, set_estimator_cpus: bool = True, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, **fit_params)[source]
training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
class ray.train.sklearn.SklearnCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with sklearn-specific functionality.

Create this from a generic Checkpoint by calling SklearnCheckpoint.from_checkpoint(ckpt)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_estimator(estimator: sklearn.base.BaseEstimator, *, path: os.PathLike, preprocessor: Optional[Preprocessor] = None) SklearnCheckpoint[source]

Create a Checkpoint that stores an sklearn Estimator.

Parameters
  • estimator – The Estimator to store in the checkpoint.

  • path – The directory where the checkpoint will be stored.

  • preprocessor – A fitted preprocessor to be applied before inference.

Returns

An SklearnCheckpoint containing the specified Estimator.

Examples

>>> from ray.train.sklearn import SklearnCheckpoint
>>> from sklearn.ensemble import RandomForestClassifier
>>>
>>> estimator = RandomForestClassifier()
>>> checkpoint = SklearnCheckpoint.from_estimator(estimator, path=".")

You can use a SklearnCheckpoint to create an SklearnPredictor and preform inference.

>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> predictor = SklearnPredictor.from_checkpoint(checkpoint)
get_estimator() sklearn.base.BaseEstimator[source]

Retrieve the Estimator stored in this checkpoint.

class ray.train.sklearn.SklearnPredictor(estimator: sklearn.base.BaseEstimator, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for scikit-learn compatible estimators.

Parameters
  • estimator – The fitted scikit-learn compatible estimator to use for predictions.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.sklearn.sklearn_predictor.SklearnPredictor[source]

Instantiate the predictor from a Checkpoint.

The checkpoint is expected to be a result of SklearnTrainer.

Parameters

checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a SklearnTrainer run.

predict(data: Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, num_estimator_cpus: Optional[int] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, pyarrow.Table, Dict[str, numpy.ndarray]][source]

Run inference on data batch.

Parameters
  • data – A batch of input data. Either a pandas DataFrame or numpy array.

  • feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in data.

  • num_estimator_cpus – If set to a value other than None, will set the values of all n_jobs and thread_count parameters in the estimator (including in nested objects) to the given value.

  • **predict_kwargs – Keyword arguments passed to estimator.predict.

Examples

>>> import numpy as np
>>> from sklearn.ensemble import RandomForestClassifier
>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> train_X = np.array([[1, 2], [3, 4]])
>>> train_y = np.array([0, 1])
>>>
>>> model = RandomForestClassifier().fit(train_X, train_y)
>>> predictor = SklearnPredictor(estimator=model)
>>>
>>> data = np.array([[1, 2], [3, 4]])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = np.array([[1, 2, 8], [3, 4, 9]])
>>> predictions = predictor.predict(data, feature_columns=[0, 1])
>>> import pandas as pd
>>> from sklearn.ensemble import RandomForestClassifier
>>> from ray.train.sklearn import SklearnPredictor
>>>
>>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> train_y = pd.Series([0, 1])
>>>
>>> model = RandomForestClassifier().fit(train_X, train_y)
>>> predictor = SklearnPredictor(estimator=model)
>>>
>>> # Pandas dataframe.
>>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"])
>>> predictions = predictor.predict(data)
>>>
>>> # Only use first and second column as the feature
>>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"])
>>> predictions = predictor.predict(data, feature_columns=["A", "B"])
Returns

Prediction result.

Reinforcement Learning (RLlib)

class ray.train.rl.RLCheckpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[ray.ObjectRef] = None)[source]

Bases: ray.air.checkpoint.Checkpoint

A Checkpoint with RLlib-specific functionality.

Create this from a generic Checkpoint by calling RLCheckpoint.from_checkpoint(ckpt).

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

get_policy(env: Optional[Any] = None) ray.rllib.policy.policy.Policy[source]

Retrieve the policy stored in this checkpoint.

Parameters

env – Optional environment to instantiate the trainer with. If not given, it is parsed from the saved trainer configuration.

Returns

The policy stored in this checkpoint.

class ray.train.rl.RLPredictor(policy: ray.rllib.policy.policy.Policy, preprocessor: Optional[Preprocessor] = None)[source]

Bases: ray.train.predictor.Predictor

A predictor for RLlib policies.

Parameters
  • policy – The RLlib policy on which to perform inference on.

  • preprocessor – A preprocessor used to transform data batches prior to prediction.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, env: Optional[Any] = None, **kwargs) ray.train.predictor.Predictor[source]

Create RLPredictor from checkpoint.

This method requires that the checkpoint was created with the Ray AIR RLTrainer.

Parameters
  • checkpoint – The checkpoint to load the model and preprocessor from.

  • env – Optional environment to instantiate the trainer with. If not given, it is parsed from the saved trainer configuration instead.

class ray.train.rl.RLTrainer(*args, **kwargs)[source]

Bases: ray.train.base_trainer.BaseTrainer

Reinforcement learning trainer.

This trainer provides an interface to RLlib trainables.

If datasets and preprocessors are used, they can be utilized for offline training, e.g. using behavior cloning. Otherwise, this trainer will use online training.

Parameters
  • algorithm – Algorithm to train on. Can be a string reference, (e.g. "PPO") or a RLlib trainer class.

  • scaling_config – Configuration for how to scale training.

  • run_config – Configuration for the execution of the training run.

  • datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a preprocessor is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by the preprocessor if one is provided. If specified, datasets will be used for offline training. Will be configured as an RLlib input config item.

  • preprocessor – A preprocessor to preprocess the provided datasets.

  • resume_from_checkpoint – A checkpoint to resume training from.

Example

Online training:

from ray.air.config import RunConfig, ScalingConfig
from ray.train.rl import RLTrainer

trainer = RLTrainer(
    run_config=RunConfig(stop={"training_iteration": 5}),
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
    algorithm="PPO",
    config={
        "env": "CartPole-v0",
        "framework": "tf",
        "evaluation_num_workers": 1,
        "evaluation_interval": 1,
        "evaluation_config": {"input": "sampler"},
    },
)
result = trainer.fit()

Example

Offline training (assumes data is stored in /tmp/data-dir):

import ray
from ray.air.config import RunConfig, ScalingConfig
from ray.train.rl import RLTrainer
from ray.rllib.algorithms.bc.bc import BC

dataset = ray.data.read_json(
    "/tmp/data-dir", parallelism=2, ray_remote_args={"num_cpus": 1}
)

trainer = RLTrainer(
    run_config=RunConfig(stop={"training_iteration": 5}),
    scaling_config=ScalingConfig(
        num_workers=2,
        use_gpu=False,
    ),
    datasets={"train": dataset},
    algorithm=BCTrainer,
    config={
        "env": "CartPole-v0",
        "framework": "tf",
        "evaluation_num_workers": 1,
        "evaluation_interval": 1,
        "evaluation_config": {"input": "sampler"},
    },
)
result = trainer.fit()

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

training_loop() None[source]

Loop called by fit() to run training and report results to Tune.

Note

This method runs on a remote process.

self.datasets have already been preprocessed by self.preprocessor.

You can use the Tune Function API functions (session.report() and session.get_checkpoint()) inside this training loop.

Example:

from ray.train.trainer import BaseTrainer

class MyTrainer(BaseTrainer):
    def training_loop(self):
        for epoch_idx in range(5):
            ...
            session.report({"epoch": epoch_idx})
as_trainable() Type[ray.tune.trainable.trainable.Trainable][source]

Convert self to a tune.Trainable class.

Monitoring Integrations

Comet

class ray.air.callbacks.comet.CometLoggerCallback(online: bool = True, tags: Optional[List[str]] = None, save_checkpoints: bool = False, **experiment_kwargs)[source]

CometLoggerCallback for logging Tune results to Comet.

Comet (https://comet.ml/site/) is a tool to manage and optimize the entire ML lifecycle, from experiment tracking, model optimization and dataset versioning to model production monitoring.

This Ray Tune LoggerCallback sends metrics and parameters to Comet for tracking.

In order to use the CometLoggerCallback you must first install Comet via pip install comet_ml

Then set the following environment variables export COMET_API_KEY=<Your API Key>

Alternatively, you can also pass in your API Key as an argument to the CometLoggerCallback constructor.

CometLoggerCallback(api_key=<Your API Key>)

Parameters
  • online – Whether to make use of an Online or Offline Experiment. Defaults to True.

  • tags – Tags to add to the logged Experiment. Defaults to None.

  • save_checkpoints – If True, model checkpoints will be saved to Comet ML as artifacts. Defaults to False.

  • **experiment_kwargs – Other keyword arguments will be passed to the constructor for comet_ml.Experiment (or OfflineExperiment if online=False).

Please consult the Comet ML documentation for more information on the Experiment and OfflineExperiment classes: https://comet.ml/site/

Example:

from ray.air.callbacks.comet import CometLoggerCallback
tune.run(
    train,
    config=config
    callbacks=[CometLoggerCallback(
        True,
        ['tag1', 'tag2'],
        workspace='my_workspace',
        project_name='my_project_name'
        )]
)

Keras

class ray.air.callbacks.keras.Callback(metrics: Optional[Union[str, List[str], Dict[str, str]]] = None, on: Union[str, List[str]] = 'epoch_end', frequency: Union[int, List[int]] = 1)[source]

Keras callback for Ray AIR reporting and checkpointing.

You can use this in both TuneSession and TrainSession.

Example

Parameters
  • metrics – Metrics to report. If this is a list, each item describes the metric key reported to Keras, and it will reported under the same name. If this is a dict, each key will be the name reported and the respective value will be the metric key reported to Keras. If this is None, all Keras logs will be reported.

  • on – When to report metrics. Must be one of the Keras event hooks (less the on_), e.g. “train_start”, or “predict_end”. Defaults to “epoch_end”.

  • frequency – Checkpoint frequency. If this is an integer n, checkpoints are saved every n times each hook was called. If this is a list, it specifies the checkpoint frequencies for each hook individually.

PublicAPI (beta): This API is in beta and may change before becoming stable.

MLflow

class ray.air.callbacks.mlflow.MLflowLoggerCallback(tracking_uri: Optional[str] = None, registry_uri: Optional[str] = None, experiment_name: Optional[str] = None, tags: Optional[Dict] = None, save_artifact: bool = False)[source]

MLflow Logger to automatically log Tune results and config to MLflow.

MLflow (https://mlflow.org) Tracking is an open source library for recording and querying experiments. This Ray Tune LoggerCallback sends information (config parameters, training results & metrics, and artifacts) to MLflow for automatic experiment tracking.

Parameters
  • tracking_uri – The tracking URI for where to manage experiments and runs. This can either be a local file path or a remote server. This arg gets passed directly to mlflow initialization. When using Tune in a multi-node setting, make sure to set this to a remote server and not a local file path.

  • registry_uri – The registry URI that gets passed directly to mlflow initialization.

  • experiment_name – The experiment name to use for this Tune run. If the experiment with the name already exists with MLflow, it will be reused. If not, a new experiment will be created with that name.

  • tags – An optional dictionary of string keys and values to set as tags on the run

  • save_artifact – If set to True, automatically save the entire contents of the Tune local_dir as an artifact to the corresponding run in MlFlow.

Example:

from ray.air.callbacks.mlflow import MLflowLoggerCallback

tags = { "user_name" : "John",
         "git_commit_hash" : "abc123"}

tune.run(
    train_fn,
    config={
        # define search space here
        "parameter_1": tune.choice([1, 2, 3]),
        "parameter_2": tune.choice([4, 5, 6]),
    },
    callbacks=[MLflowLoggerCallback(
        experiment_name="experiment1",
        tags=tags,
        save_artifact=True)])

Weights and Biases

class ray.air.callbacks.wandb.WandbLoggerCallback(project: Optional[str] = None, group: Optional[str] = None, api_key_file: Optional[str] = None, api_key: Optional[str] = None, excludes: Optional[List[str]] = None, log_config: bool = False, save_checkpoints: bool = False, **kwargs)[source]

Weights and biases (https://www.wandb.ai/) is a tool for experiment tracking, model optimization, and dataset versioning. This Ray Tune LoggerCallback sends metrics to Wandb for automatic tracking and visualization.

Parameters
  • project – Name of the Wandb project. Mandatory.

  • group – Name of the Wandb group. Defaults to the trainable name.

  • api_key_file – Path to file containing the Wandb API KEY. This file only needs to be present on the node running the Tune script if using the WandbLogger.

  • api_key – Wandb API Key. Alternative to setting api_key_file.

  • excludes – List of metrics that should be excluded from the log.

  • log_config – Boolean indicating if the config parameter of the results dict should be logged. This makes sense if parameters will change during training, e.g. with PopulationBasedTraining. Defaults to False.

  • save_checkpoints – If True, model checkpoints will be saved to Wandb as artifacts. Defaults to False.

  • **kwargs – The keyword arguments will be pased to wandb.init().

Wandb’s group, run_id and run_name are automatically selected by Tune, but can be overwritten by filling out the respective configuration values.

Please see here for all other valid configuration settings: https://docs.wandb.ai/library/init

Example:

from ray.tune.logger import DEFAULT_LOGGERS
from ray.air.callbacks.wandb import WandbLoggerCallback
tune.run(
    train_fn,
    config={
        # define search space here
        "parameter_1": tune.choice([1, 2, 3]),
        "parameter_2": tune.choice([4, 5, 6]),
    },
    callbacks=[WandbLoggerCallback(
        project="Optimization_Project",
        api_key_file="/path/to/file",
        log_config=True)])