Ray AIR API
Contents
Ray AIR API#
Components#
Preprocessor#
- class ray.data.preprocessor.Preprocessor[source]#
Implements an ML preprocessing operation.
Preprocessors are stateful objects that can be fitted against a Dataset and used to transform both local data batches and distributed datasets. For example, a Normalization preprocessor may calculate the mean and stdev of a field during fitting, and uses these attributes to implement its normalization transform.
Preprocessors can also be stateless and transform data without needed to be fitted. For example, a preprocessor may simply remove a column, which does not require any state to be fitted.
If you are implementing your own Preprocessor sub-class, you should override the following:
_fit
if your preprocessor is stateful. Otherwise, set_is_fittable=False
._transform_pandas
and/or_transform_numpy
for best performance, implement both. Otherwise, the data will be converted to the match the implemented method.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- transform_stats() Optional[str] [source]#
Return Dataset stats for the most recent transform call, if any.
- fit(dataset: Dataset) Preprocessor [source]#
Fit this Preprocessor to the Dataset.
Fitted state attributes will be directly set in the Preprocessor.
Calling it more than once will overwrite all previously fitted state:
preprocessor.fit(A).fit(B)
is equivalent topreprocessor.fit(B)
.- Parameters
dataset β Input dataset.
- Returns
The fitted Preprocessor with state attributes.
- Return type
- fit_transform(dataset: Dataset) Dataset [source]#
Fit this Preprocessor to the Dataset and then transform the Dataset.
Calling it more than once will overwrite all previously fitted state:
preprocessor.fit_transform(A).fit_transform(B)
is equivalent topreprocessor.fit_transform(B)
.- Parameters
dataset β Input Dataset.
- Returns
The transformed Dataset.
- Return type
- transform(dataset: Dataset) Dataset [source]#
Transform the given dataset.
- Parameters
dataset β Input Dataset.
- Returns
The transformed Dataset.
- Return type
- Raises
PreprocessorNotFittedException β if
fit
is not called yet.
- transform_batch(data: DataBatchType) DataBatchType [source]#
Transform a single batch of data.
The data will be converted to the format supported by the Preprocessor, based on which
_transform_*
methods are implemented.- Parameters
data β Input data batch.
- Returns
The transformed data batch. This may differ from the input type depending on which
_transform_*
methods are implemented.- Return type
DataBatchType
Generic Preprocessors#
- class ray.data.preprocessors.BatchMapper(fn: Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[Union[numpy.ndarray, Dict[str, numpy.ndarray]]], Union[numpy.ndarray, Dict[str, numpy.ndarray]]]], batch_format: Optional[ray.air.util.data_batch_conversion.BatchFormat], batch_size: Optional[Union[int, typing_extensions.Literal[default]]] = 'default')[source]#
Bases:
ray.data.preprocessor.Preprocessor
Apply an arbitrary operation to a dataset.
BatchMapper
applies a user-defined function to batches of a dataset. A batch is a PandasDataFrame
that represents a small amount of data. By modifying batches instead of individual records, this class can efficiently transform a dataset with vectorized operations.Use this preprocessor to apply stateless operations that arenβt already built-in.
Tip
BatchMapper
doesnβt need to be fit. You can calltransform
without callingfit
.Examples
Use
BatchMapper
to apply arbitrary operations like dropping a column.>>> import pandas as pd >>> import numpy as np >>> from typing import Dict >>> import ray >>> from ray.data.preprocessors import BatchMapper >>> >>> df = pd.DataFrame({"X": [0, 1, 2], "Y": [3, 4, 5]}) >>> ds = ray.data.from_pandas(df) >>> >>> def fn(batch: pd.DataFrame) -> pd.DataFrame: ... return batch.drop("Y", axis="columns") >>> >>> preprocessor = BatchMapper(fn, batch_format="pandas") >>> preprocessor.transform(ds) Dataset(num_blocks=1, num_rows=3, schema={X: int64}) >>> >>> def fn_numpy(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: ... return {"X": batch["X"]} >>> preprocessor = BatchMapper(fn_numpy, batch_format="numpy") >>> preprocessor.transform(ds) Dataset(num_blocks=1, num_rows=3, schema={X: int64})
- Parameters
fn β The function to apply to data batches.
batch_size β The desired number of rows in each data batch provided to
fn
. Semantics are the same as in`dataset.map_batches()
: specifyingNone
wil use the entire underlying blocks as batches (blocks may contain different number of rows) and the actual size of the batch provided tofn
may be smaller thanbatch_size
ifbatch_size
doesnβt evenly divide the block(s) sent to a given map task. Defaults to 4096, which is the same default value asdataset.map_batches()
.batch_format β The preferred batch format to use in UDF. If not given, we will infer based on the input dataset data format.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.Chain(*preprocessors: ray.data.preprocessor.Preprocessor)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Combine multiple preprocessors into a single
Preprocessor
.When you call
fit
, each preprocessor is fit on the dataset produced by the preceeding preprocessorβsfit_transform
.Example
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import * >>> >>> df = pd.DataFrame({ ... "X0": [0, 1, 2], ... "X1": [3, 4, 5], ... "Y": ["orange", "blue", "orange"], ... }) >>> ds = ray.data.from_pandas(df) >>> >>> preprocessor = Chain( ... StandardScaler(columns=["X0", "X1"]), ... Concatenator(include=["X0", "X1"], output_column_name="X"), ... LabelEncoder(label_column="Y") ... ) >>> preprocessor.fit_transform(ds).to_pandas() Y X 0 1 [-1.224744871391589, -1.224744871391589] 1 0 [0.0, 0.0] 2 1 [1.224744871391589, 1.224744871391589]
- Parameters
preprocessors β The preprocessors to sequentially compose.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.Concatenator(output_column_name: str = 'concat_out', include: Optional[List[str]] = None, exclude: Optional[Union[str, List[str]]] = None, dtype: Optional[numpy.dtype] = None, raise_if_missing: bool = False)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Combine numeric columns into a column of type
TensorDtype
.This preprocessor concatenates numeric columns and stores the result in a new column. The new column contains
TensorArrayElement
objects of shape \((m,)\), where \(m\) is the number of columns concatenated. The \(m\) concatenated columns are dropped after concatenation.Examples
>>> import numpy as np >>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import Concatenator
Concatenator
combines numeric columns into a column ofTensorDtype
.>>> df = pd.DataFrame({"X0": [0, 3, 1], "X1": [0.5, 0.2, 0.9]}) >>> ds = ray.data.from_pandas(df) >>> concatenator = Concatenator() >>> concatenator.fit_transform(ds).to_pandas() concat_out 0 [0.0, 0.5] 1 [3.0, 0.2] 2 [1.0, 0.9]
By default, the created column is called
"concat_out"
, but you can specify a different name.>>> concatenator = Concatenator(output_column_name="tensor") >>> concatenator.fit_transform(ds).to_pandas() tensor 0 [0.0, 0.5] 1 [3.0, 0.2] 2 [1.0, 0.9]
Sometimes, you might not want to concatenate all of of the columns in your dataset. In this case, you can exclude columns with the
exclude
parameter.>>> df = pd.DataFrame({"X0": [0, 3, 1], "X1": [0.5, 0.2, 0.9], "Y": ["blue", "orange", "blue"]}) >>> ds = ray.data.from_pandas(df) >>> concatenator = Concatenator(exclude=["Y"]) >>> concatenator.fit_transform(ds).to_pandas() Y concat_out 0 blue [0.0, 0.5] 1 orange [3.0, 0.2] 2 blue [1.0, 0.9]
Alternatively, you can specify which columns to concatenate with the
include
parameter.>>> concatenator = Concatenator(include=["X0", "X1"]) >>> concatenator.fit_transform(ds).to_pandas() Y concat_out 0 blue [0.0, 0.5] 1 orange [3.0, 0.2] 2 blue [1.0, 0.9]
Note that if a column is in both
include
andexclude
, the column is excluded.>>> concatenator = Concatenator(include=["X0", "X1", "Y"], exclude=["Y"]) >>> concatenator.fit_transform(ds).to_pandas() Y concat_out 0 blue [0.0, 0.5] 1 orange [3.0, 0.2] 2 blue [1.0, 0.9]
By default, the concatenated tensor is a
dtype
common to the input columns. However, you can also explicitly set thedtype
with thedtype
parameter.>>> concatenator = Concatenator(include=["X0", "X1"], dtype=np.float32) >>> concatenator.fit_transform(ds) Dataset(num_blocks=1, num_rows=3, schema={Y: object, concat_out: TensorDtype(shape=(2,), dtype=float32)})
- Parameters
output_column_name β The desired name for the new column. Defaults to
"concat_out"
.include β A list of columns to concatenate. If
None
, all columns are concatenated.exclude β A list of column to exclude from concatenation. If a column is in both
include
andexclude
, the column is excluded from concatenation.dtype β The
dtype
to convert the output tensors to. If unspecified, thedtype
is determined by standard coercion rules.raise_if_missing β If
True
, an error is raised if any of the columns ininclude
orexclude
donβt exist. Defaults toFalse
.
- Raises
ValueError β if
raise_if_missing
isTrue
and a column ininclude
orexclude
doesnβt exist in the dataset.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.SimpleImputer(columns: List[str], strategy: str = 'mean', fill_value: Optional[Union[str, numbers.Number]] = None)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Replace missing values with imputed values.
Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import SimpleImputer >>> df = pd.DataFrame({"X": [0, None, 3, 3], "Y": [None, "b", "c", "c"]}) >>> ds = ray.data.from_pandas(df) >>> ds.to_pandas() X Y 0 0.0 None 1 NaN b 2 3.0 c 3 3.0 c
The
"mean"
strategy imputes missing values with the mean of non-missing values. This strategy doesnβt work with categorical data.>>> preprocessor = SimpleImputer(columns=["X"], strategy="mean") >>> preprocessor.fit_transform(ds).to_pandas() X Y 0 0.0 None 1 2.0 b 2 3.0 c 3 3.0 c
The
"most_frequent"
strategy imputes missing values with the most frequent value in each column.>>> preprocessor = SimpleImputer(columns=["X", "Y"], strategy="most_frequent") >>> preprocessor.fit_transform(ds).to_pandas() X Y 0 0.0 c 1 3.0 b 2 3.0 c 3 3.0 c
The
"constant"
strategy imputes missing values with the value specified byfill_value
.>>> preprocessor = SimpleImputer( ... columns=["Y"], ... strategy="constant", ... fill_value="?", ... ) >>> preprocessor.fit_transform(ds).to_pandas() X Y 0 0.0 ? 1 NaN b 2 3.0 c 3 3.0 c
- Parameters
columns β The columns to apply imputation to.
strategy β
How imputed values are chosen.
"mean"
: The mean of non-missing values. This strategy only works with numeric columns."most_frequent"
: The most common value."constant"
: The value passed tofill_value
.
fill_value β The value to use when
strategy
is"constant"
.
- Raises
ValueError β if
strategy
is not"mean"
,"most_frequent"
, or"constant"
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- Dataset.train_test_split(test_size: Union[int, float], *, shuffle: bool = False, seed: Optional[int] = None) Tuple[ray.data.dataset.Dataset[ray.data.block.T], ray.data.dataset.Dataset[ray.data.block.T]] [source]
Split the dataset into train and test subsets.
Examples
>>> import ray >>> ds = ray.data.range(8) >>> train, test = ds.train_test_split(test_size=0.25) >>> train.take() [0, 1, 2, 3, 4, 5] >>> test.take() [6, 7]
- Parameters
test_size β If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. The train split will always be the compliment of the test split.
shuffle β Whether or not to globally shuffle the dataset before splitting. Defaults to False. This may be a very expensive operation with large datasets.
seed β Fix the random seed to use for shuffle, otherwise one will be chosen based on system randomness. Ignored if
shuffle=False
.
- Returns
Train and test subsets as two Datasets.
Categorical Encoders#
- class ray.data.preprocessors.Categorizer(columns: List[str], dtypes: Optional[Dict[str, pandas.core.dtypes.dtypes.CategoricalDtype]] = None)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Convert columns to
pd.CategoricalDtype
.Use this preprocessor with frameworks that have built-in support for
pd.CategoricalDtype
like LightGBM.Warning
If you donβt specify
dtypes
, fit this preprocessor before splitting your dataset into train and test splits. This ensures categories are consistent across splits.Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import Categorizer >>> >>> df = pd.DataFrame( ... { ... "sex": ["male", "female", "male", "female"], ... "level": ["L4", "L5", "L3", "L4"], ... }) >>> ds = ray.data.from_pandas(df) >>> categorizer = Categorizer(columns=["sex", "level"]) >>> categorizer.fit_transform(ds).schema().types [CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5'], ordered=False)]
If you know the categories in advance, you can specify the categories with the
dtypes
parameter.>>> categorizer = Categorizer( ... columns=["sex", "level"], ... dtypes={"level": pd.CategoricalDtype(["L3", "L4", "L5", "L6"], ordered=True)}, ... ) >>> categorizer.fit_transform(ds).schema().types [CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5', 'L6'], ordered=True)]
- Parameters
columns β The columns to convert to
pd.CategoricalDtype
.dtypes β An optional dictionary that maps columns to
pd.CategoricalDtype
objects. If you donβt include a column indtypes
, the categories are inferred.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.LabelEncoder(label_column: str)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Encode labels as integer targets.
LabelEncoder
encodes labels as integer targets that range from \(0\) to \(n - 1\), where \(n\) is the number of unique labels.If you transform a label that isnβt in the fitted datset, then the label is encoded as
float("nan")
.Examples
>>> import pandas as pd >>> import ray >>> df = pd.DataFrame({ ... "sepal_width": [5.1, 7, 4.9, 6.2], ... "sepal_height": [3.5, 3.2, 3, 3.4], ... "species": ["setosa", "versicolor", "setosa", "virginica"] ... }) >>> ds = ray.data.from_pandas(df) >>> >>> from ray.data.preprocessors import LabelEncoder >>> encoder = LabelEncoder(label_column="species") >>> encoder.fit_transform(ds).to_pandas() sepal_width sepal_height species 0 5.1 3.5 0 1 7.0 3.2 1 2 4.9 3.0 0 3 6.2 3.4 2
If you transform a label not present in the original dataset, then the new label is encoded as
float("nan")
.>>> df = pd.DataFrame({ ... "sepal_width": [4.2], ... "sepal_height": [2.7], ... "species": ["bracteata"] ... }) >>> ds = ray.data.from_pandas(df) >>> encoder.transform(ds).to_pandas() sepal_width sepal_height species 0 4.2 2.7 NaN
- Parameters
label_column β A column containing labels that you want to encode.
See also
OrdinalEncoder
If youβre encoding ordered features, use
OrdinalEncoder
instead ofLabelEncoder
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.MultiHotEncoder(columns: List[str], *, max_categories: Optional[Dict[str, int]] = None)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Multi-hot encode categorical data.
This preprocessor replaces each list of categories with an \(m\)-length binary list, where \(m\) is the number of unique categories in the column or the value specified in
max_categories
. The \(i\text{-th}\) element of the binary list is \(1\) if category \(i\) is in the input list and \(0\) otherwise.Columns must contain hashable objects or lists of hashable objects. Also, you canβt have both types in the same column.
Note
The logic is similar to scikit-learnβs MultiLabelBinarizer.
Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import MultiHotEncoder >>> >>> df = pd.DataFrame({ ... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"], ... "genre": [ ... ["comedy", "action", "sports"], ... ["animation", "comedy", "action"], ... ["documentary"], ... ], ... }) >>> ds = ray.data.from_pandas(df) >>> >>> encoder = MultiHotEncoder(columns=["genre"]) >>> encoder.fit_transform(ds).to_pandas() name genre 0 Shaolin Soccer [1, 0, 1, 0, 1] 1 Moana [1, 1, 1, 0, 0] 2 The Smartest Guys in the Room [0, 0, 0, 1, 0]
If you specify
max_categories
, thenMultiHotEncoder
creates features for only the most frequent categories.>>> encoder = MultiHotEncoder(columns=["genre"], max_categories={"genre": 3}) >>> encoder.fit_transform(ds).to_pandas() name genre 0 Shaolin Soccer [1, 1, 1] 1 Moana [1, 1, 0] 2 The Smartest Guys in the Room [0, 0, 0] >>> encoder.stats_ OrderedDict([('unique_values(genre)', {'comedy': 0, 'action': 1, 'sports': 2})])
- Parameters
columns β The columns to separately encode.
max_categories β The maximum number of features to create for each column. If a value isnβt specified for a column, then a feature is created for every unique category in that column.
See also
OneHotEncoder
If youβre encoding individual categories instead of lists of categories, use
OneHotEncoder
.OrdinalEncoder
If your categories are ordered, you may want to use
OrdinalEncoder
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.OneHotEncoder(columns: List[str], *, max_categories: Optional[Dict[str, int]] = None)[source]#
Bases:
ray.data.preprocessor.Preprocessor
One-hot encode categorical data.
This preprocessor creates a column named
{column}_{category}
for each unique{category}
in{column}
. The value of a column is 1 if the category matches and 0 otherwise.If you encode an infrequent category (see
max_categories
) or a category that isnβt in the fitted dataset, then the category is encoded as all 0s.Columns must contain hashable objects or lists of hashable objects.
Note
Lists are treated as categories. If you want to encode individual list elements, use
MultiHotEncoder
.Example
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import OneHotEncoder >>> >>> df = pd.DataFrame({"color": ["red", "green", "red", "red", "blue", "green"]}) >>> ds = ray.data.from_pandas(df) >>> encoder = OneHotEncoder(columns=["color"]) >>> encoder.fit_transform(ds).to_pandas() color_blue color_green color_red 0 0 0 1 1 0 1 0 2 0 0 1 3 0 0 1 4 1 0 0 5 0 1 0
If you one-hot encode a value that isnβt in the fitted dataset, then the value is encoded with zeros.
>>> df = pd.DataFrame({"color": ["yellow"]}) >>> batch = ray.data.from_pandas(df) >>> encoder.transform(batch).to_pandas() color_blue color_green color_red 0 0 0 0
Likewise, if you one-hot encode an infrequent value, then the value is encoded with zeros.
>>> encoder = OneHotEncoder(columns=["color"], max_categories={"color": 2}) >>> encoder.fit_transform(ds).to_pandas() color_red color_green 0 1 0 1 0 1 2 1 0 3 1 0 4 0 0 5 0 1
- Parameters
columns β The columns to separately encode.
max_categories β The maximum number of features to create for each column. If a value isnβt specified for a column, then a feature is created for every category in that column.
See also
MultiHotEncoder
If you want to encode individual list elements, use
MultiHotEncoder
.OrdinalEncoder
If your categories are ordered, you may want to use
OrdinalEncoder
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.OrdinalEncoder(columns: List[str], *, encode_lists: bool = True)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Encode values within columns as ordered integer values.
OrdinalEncoder
encodes categorical features as integers that range from \(0\) to \(n - 1\), where \(n\) is the number of categories.If you transform a value that isnβt in the fitted datset, then the value is encoded as
float("nan")
.Columns must contain either hashable values or lists of hashable values. Also, you canβt have both scalars and lists in the same column.
Examples
Use
OrdinalEncoder
to encode categorical features as integers.>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import OrdinalEncoder >>> df = pd.DataFrame({ ... "sex": ["male", "female", "male", "female"], ... "level": ["L4", "L5", "L3", "L4"], ... }) >>> ds = ray.data.from_pandas(df) >>> encoder = OrdinalEncoder(columns=["sex", "level"]) >>> encoder.fit_transform(ds).to_pandas() sex level 0 1 1 1 0 2 2 1 0 3 0 1
If you transform a value not present in the original dataset, then the value is encoded as
float("nan")
.>>> df = pd.DataFrame({"sex": ["female"], "level": ["L6"]}) >>> ds = ray.data.from_pandas(df) >>> encoder.transform(ds).to_pandas() sex level 0 0 NaN
OrdinalEncoder
can also encode categories in a list.>>> df = pd.DataFrame({ ... "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"], ... "genre": [ ... ["comedy", "action", "sports"], ... ["animation", "comedy", "action"], ... ["documentary"], ... ], ... }) >>> ds = ray.data.from_pandas(df) >>> encoder = OrdinalEncoder(columns=["genre"]) >>> encoder.fit_transform(ds).to_pandas() name genre 0 Shaolin Soccer [2, 0, 4] 1 Moana [1, 2, 0] 2 The Smartest Guys in the Room [3]
- Parameters
columns β The columns to separately encode.
encode_lists β If
True
, encode list elements. IfFalse
, encode whole lists (i.e., replace each list with an integer).True
by default.
See also
OneHotEncoder
Another preprocessor that encodes categorical data.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
Feature Scalers#
- class ray.data.preprocessors.MaxAbsScaler(columns: List[str])[source]#
Bases:
ray.data.preprocessor.Preprocessor
Scale each column by its absolute max value.
The general formula is given by
\[x' = \frac{x}{\max{\vert x \vert}}\]where \(x\) is the column and \(x'\) is the transformed column. If \(\max{\vert x \vert} = 0\) (i.e., the column contains all zeros), then the column is unmodified.
Tip
This is the recommended way to scale sparse data. If you data isnβt sparse, you can use
MinMaxScaler
orStandardScaler
instead.Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import MaxAbsScaler >>> >>> df = pd.DataFrame({"X1": [-6, 3], "X2": [2, -4], "X3": [0, 0]}) # noqa: E501 >>> ds = ray.data.from_pandas(df) >>> ds.to_pandas() X1 X2 X3 0 -6 2 0 1 3 -4 0
Columns are scaled separately.
>>> preprocessor = MaxAbsScaler(columns=["X1", "X2"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 -1.0 0.5 0 1 0.5 -1.0 0
Zero-valued columns arenβt scaled.
>>> preprocessor = MaxAbsScaler(columns=["X3"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 -6 2 0.0 1 3 -4 0.0
- Parameters
columns β The columns to separately scale.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.MinMaxScaler(columns: List[str])[source]#
Bases:
ray.data.preprocessor.Preprocessor
Scale each column by its range.
The general formula is given by
\[x' = \frac{x - \min(x)}{\max{x} - \min{x}}\]where \(x\) is the column and \(x'\) is the transformed column. If \(\max{x} - \min{x} = 0\) (i.e., the column is constant-valued), then the transformed column will get filled with zeros.
Transformed values are always in the range \([0, 1]\).
Tip
This can be used as an alternative to
StandardScaler
.Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import MinMaxScaler >>> >>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]}) # noqa: E501 >>> ds = ray.data.from_pandas(df) >>> ds.to_pandas() X1 X2 X3 0 -2 -3 1 1 0 -3 1 2 2 3 1
Columns are scaled separately.
>>> preprocessor = MinMaxScaler(columns=["X1", "X2"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 0.0 0.0 1 1 0.5 0.0 1 2 1.0 1.0 1
Constant-valued columns get filled with zeros.
>>> preprocessor = MinMaxScaler(columns=["X3"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 -2 -3 0.0 1 0 -3 0.0 2 2 3 0.0
- Parameters
columns β The columns to separately scale.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.Normalizer(columns: List[str], norm='l2')[source]#
Bases:
ray.data.preprocessor.Preprocessor
Scales each sample to have unit norm.
This preprocessor works by dividing each sample (i.e., row) by the sampleβs norm. The general formula is given by
\[s' = \frac{s}{\lVert s \rVert_p}\]where \(s\) is the sample, \(s'\) is the transformed sample, :math:lVert s rVert`, and \(p\) is the norm type.
The following norms are supported:
"l1"
(\(L^1\)): Sum of the absolute values."l2"
(\(L^2\)): Square root of the sum of the squared values."max"
(\(L^\infty\)): Maximum value.
Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import Normalizer >>> >>> df = pd.DataFrame({"X1": [1, 1], "X2": [1, 0], "X3": [0, 1]}) >>> ds = ray.data.from_pandas(df) >>> ds.to_pandas() X1 X2 X3 0 1 1 0 1 1 0 1
The \(L^2\)-norm of the first sample is \(\sqrt{2}\), and the \(L^2\)-norm of the second sample is \(1\).
>>> preprocessor = Normalizer(columns=["X1", "X2"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 0.707107 0.707107 0 1 1.000000 0.000000 1
The \(L^1\)-norm of the first sample is \(2\), and the \(L^1\)-norm of the second sample is \(1\).
>>> preprocessor = Normalizer(columns=["X1", "X2"], norm="l1") >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 0.5 0.5 0 1 1.0 0.0 1
The \(L^\infty\)-norm of the both samples is \(1\).
>>> preprocessor = Normalizer(columns=["X1", "X2"], norm="max") >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 1.0 1.0 0 1 1.0 0.0 1
- Parameters
columns β The columns to scale. For each row, these colmumns are scaled to unit-norm.
norm β The norm to use. The supported values are
"l1"
,"l2"
, or"max"
. Defaults to"l2"
.
- Raises
ValueError β if
norm
is not"l1"
,"l2"
, or"max"
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.PowerTransformer(columns: List[str], power: float, method: str = 'yeo-johnson')[source]#
Bases:
ray.data.preprocessor.Preprocessor
Apply a power transform to make your data more normally distributed.
Some models expect data to be normally distributed. By making your data more Gaussian-like, you might be able to improve your modelβs performance.
This preprocessor supports the following transformations:
Box-Cox requires all data to be positive.
Warning
You need to manually specify the transformβs power parameter. If you choose a bad value, the transformation might not work well.
- Parameters
columns β The columns to separately transform.
power β A parameter that determines how your data is transformed. Practioners typically set
power
between \(-2.5\) and \(2.5\), although you may need to try different values to find one that works well.method β A string representing which transformation to apply. Supports
"yeo-johnson"
and"box-cox"
. If you choose"box-cox"
, your data needs to be positive. Defaults to"yeo-johnson"
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.RobustScaler(columns: List[str], quantile_range: Tuple[float, float] = (0.25, 0.75))[source]#
Bases:
ray.data.preprocessor.Preprocessor
Scale and translate each column using quantiles.
The general formula is given by
\[x' = \frac{x - \mu_{1/2}}{\mu_h - \mu_l}\]where \(x\) is the column, \(x'\) is the transformed column, \(\mu_{1/2}\) is the column median. \(\mu_{h}\) and \(\mu_{l}\) are the high and low quantiles, respectively. By default, \(\mu_{h}\) is the third quartile and \(\mu_{l}\) is the first quartile.
Tip
This scaler works well when your data contains many outliers.
Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import RobustScaler >>> >>> df = pd.DataFrame({ ... "X1": [1, 2, 3, 4, 5], ... "X2": [13, 5, 14, 2, 8], ... "X3": [1, 2, 2, 2, 3], ... }) >>> ds = ray.data.from_pandas(df) >>> ds.to_pandas() X1 X2 X3 0 1 13 1 1 2 5 2 2 3 14 2 3 4 2 2 4 5 8 3
RobustScaler
separately scales each column.>>> preprocessor = RobustScaler(columns=["X1", "X2"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 -1.0 0.625 1 1 -0.5 -0.375 2 2 0.0 0.750 2 3 0.5 -0.750 2 4 1.0 0.000 3
- Parameters
columns β The columns to separately scale.
quantile_range β A tuple that defines the lower and upper quantiles. Values must be between 0 and 1. Defaults to the 1st and 3rd quartiles:
(0.25, 0.75)
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.StandardScaler(columns: List[str])[source]#
Bases:
ray.data.preprocessor.Preprocessor
Translate and scale each column by its mean and standard deviation, respectively.
The general formula is given by
\[x' = \frac{x - \bar{x}}{s}\]where \(x\) is the column, \(x'\) is the transformed column, \(\bar{x}\) is the column average, and \(s\) is the columnβs sample standard deviation. If \(s = 0\) (i.e., the column is constant-valued), then the transformed column will contain zeros.
Warning
StandardScaler
works best when your data is normal. If your data isnβt approximately normal, then the transformed features wonβt be meaningful.Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import StandardScaler >>> >>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]}) >>> ds = ray.data.from_pandas(df) >>> ds.to_pandas() X1 X2 X3 0 -2 -3 1 1 0 -3 1 2 2 3 1
Columns are scaled separately.
>>> preprocessor = StandardScaler(columns=["X1", "X2"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 -1.224745 -0.707107 1 1 0.000000 -0.707107 1 2 1.224745 1.414214 1
Constant-valued columns get filled with zeros.
>>> preprocessor = StandardScaler(columns=["X3"]) >>> preprocessor.fit_transform(ds).to_pandas() X1 X2 X3 0 -2 -3 0.0 1 0 -3 0.0 2 2 3 0.0
- Parameters
columns β The columns to separately scale.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
K-Bins Discretizers#
- class ray.data.preprocessors.CustomKBinsDiscretizer(columns: List[str], bins: Union[Iterable[float], pandas.core.indexes.interval.IntervalIndex, Dict[str, Union[Iterable[float], pandas.core.indexes.interval.IntervalIndex]]], *, right: bool = True, include_lowest: bool = False, duplicates: str = 'raise', dtypes: Optional[Dict[str, Union[pandas.core.dtypes.dtypes.CategoricalDtype, Type[numpy.integer]]]] = None)[source]#
Bases:
ray.data.preprocessors.discretizer._AbstractKBinsDiscretizer
Bin values into discrete intervals using custom bin edges.
Columns must contain numerical values.
Examples
Use
CustomKBinsDiscretizer
to bin continuous features.>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import CustomKBinsDiscretizer >>> df = pd.DataFrame({ ... "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1], ... "value_2": [10, 15, 13, 12, 23, 25], ... }) >>> ds = ray.data.from_pandas(df) >>> discretizer = CustomKBinsDiscretizer( ... columns=["value_1", "value_2"], ... bins=[0, 1, 4, 10, 25] ... ) >>> discretizer.transform(ds).to_pandas() value_1 value_2 0 0 2 1 1 3 2 1 3 3 2 3 4 2 3 5 1 3
You can also specify different bin edges per column.
>>> discretizer = CustomKBinsDiscretizer( ... columns=["value_1", "value_2"], ... bins={"value_1": [0, 1, 4], "value_2": [0, 18, 35, 70]}, ... ) >>> discretizer.transform(ds).to_pandas() value_1 value_2 0 0.0 0 1 1.0 0 2 1.0 0 3 NaN 0 4 NaN 1 5 1.0 1
- Parameters
columns β The columns to discretize.
bins β Defines custom bin edges. Can be an iterable of numbers, a
pd.IntervalIndex
, or a dict mapping columns to either of them. Note thatpd.IntervalIndex
for bins must be non-overlapping.right β Indicates whether bins include the rightmost edge.
include_lowest β Indicates whether the first interval should be left-inclusive.
duplicates β Can be either βraiseβ or βdropβ. If bin edges are not unique, raise
ValueError
or drop non-uniques.dtypes β An optional dictionary that maps columns to
pd.CategoricalDtype
objects ornp.integer
types. If you donβt include a column indtypes
or specify it as an integer dtype, the outputted column will consist of ordered integers corresponding to bins. If you use apd.CategoricalDtype
, the outputted column will be apd.CategoricalDtype
with the categories being mapped to bins. You can usepd.CategoricalDtype(categories, ordered=True)
to preserve information about bin order.
See also
UniformKBinsDiscretizer
If you want to bin data into uniform width bins.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.UniformKBinsDiscretizer(columns: List[str], bins: Union[int, Dict[str, int]], *, right: bool = True, include_lowest: bool = False, duplicates: str = 'raise', dtypes: Optional[Dict[str, Union[pandas.core.dtypes.dtypes.CategoricalDtype, Type[numpy.integer]]]] = None)[source]#
Bases:
ray.data.preprocessors.discretizer._AbstractKBinsDiscretizer
Bin values into discrete intervals (bins) of uniform width.
Columns must contain numerical values.
Examples
Use
UniformKBinsDiscretizer
to bin continuous features.>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import UniformKBinsDiscretizer >>> df = pd.DataFrame({ ... "value_1": [0.2, 1.4, 2.5, 6.2, 9.7, 2.1], ... "value_2": [10, 15, 13, 12, 23, 25], ... }) >>> ds = ray.data.from_pandas(df) >>> discretizer = UniformKBinsDiscretizer( ... columns=["value_1", "value_2"], bins=4 ... ) >>> discretizer.fit_transform(ds).to_pandas() value_1 value_2 0 0 0 1 0 1 2 0 0 3 2 0 4 3 3 5 0 3
You can also specify different number of bins per column.
>>> discretizer = UniformKBinsDiscretizer( ... columns=["value_1", "value_2"], bins={"value_1": 4, "value_2": 3} ... ) >>> discretizer.fit_transform(ds).to_pandas() value_1 value_2 0 0 0 1 0 0 2 0 0 3 2 0 4 3 2 5 0 2
- Parameters
columns β The columns to discretize.
bins β Defines the number of equal-width bins. Can be either an integer (which will be applied to all columns), or a dict that maps columns to integers. The range is extended by .1% on each side to include the minimum and maximum values.
right β Indicates whether bins includes the rightmost edge or not.
include_lowest β Whether the first interval should be left-inclusive or not.
duplicates β Can be either βraiseβ or βdropβ. If bin edges are not unique, raise
ValueError
or drop non-uniques.dtypes β An optional dictionary that maps columns to
pd.CategoricalDtype
objects ornp.integer
types. If you donβt include a column indtypes
or specify it as an integer dtype, the outputted column will consist of ordered integers corresponding to bins. If you use apd.CategoricalDtype
, the outputted column will be apd.CategoricalDtype
with the categories being mapped to bins. You can usepd.CategoricalDtype(categories, ordered=True)
to preserve information about bin order.
See also
CustomKBinsDiscretizer
If you want to specify your own bin edges.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
Image Preprocessors#
- class ray.data.preprocessors.TorchVisionPreprocessor(columns: List[str], transform: Callable[[np.ndarray], torch.Tensor], batched: bool = False)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Apply a TorchVision transform to image columns.
Examples
>>> import ray >>> dataset = ray.data.read_images("s3://anonymous@air-example-data-2/imagenet-sample-images") >>> dataset Dataset(num_blocks=..., num_rows=..., schema={image: ArrowTensorType(shape=(..., 3), dtype=float)})
TorchVisionPreprocessor
passes ndarrays to your transform. To convert ndarrays to Torch tensors, addToTensor
to your pipeline.>>> from torchvision import transforms >>> from ray.data.preprocessors import TorchVisionPreprocessor >>> transform = transforms.Compose([ ... transforms.ToTensor(), ... transforms.Resize((224, 224)), ... ]) >>> preprocessor = TorchVisionPreprocessor(["image"], transform=transform) >>> preprocessor.transform(dataset) Dataset(num_blocks=..., num_rows=..., schema={image: ArrowTensorType(shape=(3, 224, 224), dtype=float)})
For better performance, set
batched
toTrue
and replaceToTensor
with a batch-supportingLambda
.>>> def to_tensor(batch: np.ndarray) -> torch.Tensor: ... tensor = torch.as_tensor(batch, dtype=torch.float) ... # (B, H, W, C) -> (B, C, H, W) ... tensor = tensor.permute(0, 3, 1, 2).contiguous() ... # [0., 255.] -> [0., 1.] ... tensor = tensor.div(255) ... return tensor >>> transform = transforms.Compose([ ... transforms.Lambda(to_tensor), ... transforms.Resize((224, 224)) ... ]) >>> preprocessor = TorchVisionPreprocessor( ... ["image"], transform=transform, batched=True ... ) >>> preprocessor.transform(dataset) Dataset(num_blocks=..., num_rows=..., schema={image: ArrowTensorType(shape=(3, 224, 224), dtype=float)})
- Parameters
columns β The columns to apply the TorchVision transform to.
transform β The TorchVision transform you want to apply. This transform should accept an
np.ndarray
as input and return atorch.Tensor
as output.batched β If
True
, applytransform
to batches of shape \((B, H, W, C)\). Otherwise, applytransform
to individual images.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
Text Encoders#
- class ray.data.preprocessors.CountVectorizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None, max_features: Optional[int] = None)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Count the frequency of tokens in a column of strings.
CountVectorizer
operates on columns that contain strings. For example:corpus 0 I dislike Python 1 I like Python
This preprocessors creates a column named like
{column}_{token}
for each unique token. These columns represent the frequency of token{token}
in column{column}
. For example:corpus_I corpus_Python corpus_dislike corpus_like 0 1 1 1 0 1 1 1 0 1
Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import CountVectorizer >>> >>> df = pd.DataFrame({ ... "corpus": [ ... "Jimmy likes volleyball", ... "Bob likes volleyball too", ... "Bob also likes fruit jerky" ... ] ... }) >>> ds = ray.data.from_pandas(df) >>> >>> vectorizer = CountVectorizer(["corpus"]) >>> vectorizer.fit_transform(ds).to_pandas() corpus_likes corpus_volleyball corpus_Bob corpus_Jimmy corpus_too corpus_also corpus_fruit corpus_jerky 0 1 1 0 1 0 0 0 0 1 1 1 1 0 1 0 0 0 2 1 0 1 0 0 1 1 1
You can limit the number of tokens in the vocabulary with
max_features
.>>> vectorizer = CountVectorizer(["corpus"], max_features=3) >>> vectorizer.fit_transform(ds).to_pandas() corpus_likes corpus_volleyball corpus_Bob 0 1 1 0 1 1 1 1 2 1 0 1
- Parameters
columns β The columns to separately tokenize and count.
tokenization_fn β The function used to generate tokens. This function should accept a string as input and return a list of tokens as output. If unspecified, the tokenizer uses a function equivalent to
lambda s: s.split(" ")
.max_features β The maximum number of tokens to encode in the transformed dataset. If specified, only the most frequent tokens are encoded.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.FeatureHasher(columns: List[str], num_features: int)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Apply the hashing trick to a table that describes token frequencies.
FeatureHasher
createsnum_features
columns namedhash_{index}
, whereindex
ranges from \(0\) tonum_features
\(- 1\). The columnhash_{index}
describes the frequency of tokens that hash toindex
.Distinct tokens can correspond to the same index. However, if
num_features
is large enough, then columns probably correspond to a unique token.This preprocessor is memory efficient and quick to pickle. However, given a transformed column, you canβt know which tokens correspond to it. This might make it hard to determine which tokens are important to your model.
Warning
Sparse matrices arenβt supported. If you use a large
num_features
, this preprocessor might behave poorly.Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import FeatureHasher
The data below describes the frequencies of tokens in
"I like Python"
and"I dislike Python"
.>>> df = pd.DataFrame({ ... "I": [1, 1], ... "like": [1, 0], ... "dislike": [0, 1], ... "Python": [1, 1] ... }) >>> ds = ray.data.from_pandas(df)
FeatureHasher
hashes each token to determine its index. For example, the index of"I"
is \(hash(\texttt{"I"}) \pmod 8 = 5\).>>> hasher = FeatureHasher(columns=["I", "like", "dislike", "Python"], num_features=8) >>> hasher.fit_transform(ds).to_pandas().to_numpy() array([[0, 0, 0, 2, 0, 1, 0, 0], [0, 0, 0, 1, 0, 1, 1, 0]])
Notice the hash collision: both
"like"
and"Python"
correspond to index \(3\). You can avoid hash collisions like these by increasingnum_features
.- Parameters
columns β The columns to apply the hashing trick to. Each column should describe the frequency of a token.
num_features β The number of features used to represent the vocabulary. You should choose a value large enough to prevent hash collisions between distinct tokens.
See also
CountVectorizer
Use this preprocessor to generate inputs for
FeatureHasher
.ray.data.preprocessors.HashingVectorizer
If your input data describes documents rather than token frequencies, use
HashingVectorizer
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.HashingVectorizer(columns: List[str], num_features: int, tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Count the frequency of tokens using the hashing trick.
This preprocessors creates
num_features
columns named likehash_{column_name}_{index}
. Ifnum_features
is large enough relative to the size of your vocabulary, then each column approximately corresponds to the frequency of a unique token.HashingVectorizer
is memory efficient and quick to pickle. However, given a transformed column, you canβt know which tokens correspond to it. This might make it hard to determine which tokens are important to your model.Note
This preprocessor transforms each input column to a document-term matrix.
A document-term matrix is a table that describes the frequency of tokens in a collection of documents. For example, the strings
"I like Python"
and"I dislike Python"
might have the document-term matrix below:corpus_I corpus_Python corpus_dislike corpus_like 0 1 1 1 0 1 1 1 0 1
To generate the matrix, you typically map each token to a unique index. For example:
token index 0 I 0 1 Python 1 2 dislike 2 3 like 3
The problem with this approach is that memory use scales linearly with the size of your vocabulary.
HashingVectorizer
circumvents this problem by computing indices with a hash function: \(\texttt{index} = hash(\texttt{token})\).Warning
Sparse matrices arenβt currently supported. If you use a large
num_features
, this preprocessor might behave poorly.Examples
>>> import pandas as pd >>> import ray >>> from ray.data.preprocessors import HashingVectorizer >>> >>> df = pd.DataFrame({ ... "corpus": [ ... "Jimmy likes volleyball", ... "Bob likes volleyball too", ... "Bob also likes fruit jerky" ... ] ... }) >>> ds = ray.data.from_pandas(df) >>> >>> vectorizer = HashingVectorizer(["corpus"], num_features=8) >>> vectorizer.fit_transform(ds).to_pandas() hash_corpus_0 hash_corpus_1 hash_corpus_2 hash_corpus_3 hash_corpus_4 hash_corpus_5 hash_corpus_6 hash_corpus_7 0 1 0 1 0 0 0 0 1 1 1 0 1 0 0 0 1 1 2 0 0 1 1 0 2 1 0
- Parameters
columns β The columns to separately tokenize and count.
num_features β The number of features used to represent the vocabulary. You should choose a value large enough to prevent hash collisions between distinct tokens.
tokenization_fn β The function used to generate tokens. This function should accept a string as input and return a list of tokens as output. If unspecified, the tokenizer uses a function equivalent to
lambda s: s.split(" ")
.
See also
CountVectorizer
Another method for counting token frequencies. Unlike
HashingVectorizer
,CountVectorizer
creates a feature for each unique token. This enables you to compute the inverse transformation.FeatureHasher
This preprocessor is similar to
HashingVectorizer
, except it expects a table describing token frequencies. In contrast,FeatureHasher
expects a column containing documents.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.data.preprocessors.Tokenizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]#
Bases:
ray.data.preprocessor.Preprocessor
Replace each string with a list of tokens.
Examples
>>> import pandas as pd >>> import ray >>> df = pd.DataFrame({"text": ["Hello, world!", "foo bar\nbaz"]}) >>> ds = ray.data.from_pandas(df)
The default
tokenization_fn
delimits strings using the space character.>>> from ray.data.preprocessors import Tokenizer >>> tokenizer = Tokenizer(columns=["text"]) >>> tokenizer.transform(ds).to_pandas() text 0 [Hello,, world!] 1 [foo, bar\nbaz]
If the default logic isnβt adequate for your use case, you can specify a custom
tokenization_fn
.>>> import string >>> def tokenization_fn(s): ... for character in string.punctuation: ... s = s.replace(character, "") ... return s.split() >>> tokenizer = Tokenizer(columns=["text"], tokenization_fn=tokenization_fn) >>> tokenizer.transform(ds).to_pandas() text 0 [Hello, world] 1 [foo, bar, baz]
- Parameters
columns β The columns to tokenize.
tokenization_fn β The function used to generate tokens. This function should accept a string as input and return a list of tokens as output. If unspecified, the tokenizer uses a function equivalent to
lambda s: s.split(" ")
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
Trainer#
- class ray.train.trainer.BaseTrainer(*args, **kwargs)[source]#
Defines interface for distributed training on Ray.
Note: The base
BaseTrainer
class cannot be instantiated directly. Only one of its subclasses can be used.How does a trainer work?
First, initialize the Trainer. The initialization runs locally, so heavyweight setup should not be done in
__init__
.Then, when you call
trainer.fit()
, the Trainer is serialized and copied to a remote Ray actor. The following methods are then called in sequence on the remote actor.trainer.setup()
: Any heavyweight Trainer setup should be specified here.trainer.preprocess_datasets()
: The provided ray.data.Dataset are preprocessed with the provided ray.data.Preprocessor.trainer.train_loop()
: Executes the main training logic.Calling
trainer.fit()
will return aray.result.Result
object where you can access metrics from your training run, as well as any checkpoints that may have been saved.
How do I create a new Trainer?
Subclass
ray.train.trainer.BaseTrainer
, and override thetraining_loop
method, and optionallysetup
.import torch from ray.train.trainer import BaseTrainer from ray import tune from ray.air import session class MyPytorchTrainer(BaseTrainer): def setup(self): self.model = torch.nn.Linear(1, 1) self.optimizer = torch.optim.SGD( self.model.parameters(), lr=0.1) def training_loop(self): # You can access any Trainer attributes directly in this method. # self.datasets["train"] has already been # preprocessed by self.preprocessor dataset = self.datasets["train"] torch_ds = dataset.iter_torch_batches(dtypes=torch.float) loss_fn = torch.nn.MSELoss() for epoch_idx in range(10): loss = 0 num_batches = 0 for batch in torch_ds: X, y = torch.unsqueeze(batch["x"], 1), batch["y"] # Compute prediction error pred = self.model(X) batch_loss = loss_fn(pred, y) # Backpropagation self.optimizer.zero_grad() batch_loss.backward() self.optimizer.step() loss += batch_loss.item() num_batches += 1 loss /= num_batches # Use Tune functions to report intermediate # results. session.report({"loss": loss, "epoch": epoch_idx})
How do I use an existing Trainer or one of my custom Trainers?
Initialize the Trainer, and call Trainer.fit()
import ray train_dataset = ray.data.from_items( [{"x": i, "y": i} for i in range(3)]) my_trainer = MyPytorchTrainer(datasets={"train": train_dataset}) result = my_trainer.fit()
- Parameters
scaling_config β Configuration for how to scale training.
run_config β Configuration for the execution of the training run.
datasets β Any Ray Datasets to use for training. Use the key βtrainβ to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor β A preprocessor to preprocess the provided datasets.
resume_from_checkpoint β A checkpoint to resume training from.
DeveloperAPI: This API may change across minor Ray releases.
- __init__(*, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]#
- setup() None [source]#
Called during fit() to perform initial setup on the Trainer.
Note
This method is run on a remote process.
This method will not be called on the driver, so any expensive setup operations should be placed here and not in
__init__
.This method is called prior to
preprocess_datasets
andtraining_loop
.
- preprocess_datasets() None [source]#
Called during fit() to preprocess dataset attributes with preprocessor.
Note
This method is run on a remote process.
This method is called prior to entering the training_loop.
If the
Trainer
has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the βtrainβ key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.Then, all Trainerβs datasets will be transformed by the preprocessor.
The transformed datasets will be set back in the
self.datasets
attribute of the Trainer to be used when overridingtraining_loop
.
- abstract training_loop() None [source]#
Loop called by fit() to run training and report results to Tune.
Note
This method runs on a remote process.
self.datasets
have already been preprocessed byself.preprocessor
.You can use the Tune Function API functions (
session.report()
andsession.get_checkpoint()
) inside this training loop.Example:
from ray.train.trainer import BaseTrainer class MyTrainer(BaseTrainer): def training_loop(self): for epoch_idx in range(5): ... session.report({"epoch": epoch_idx})
- fit() ray.air.result.Result [source]#
Runs training.
- Returns
A Result object containing the training result.
- Raises
TrainingFailedError β If any failures during the execution of
self.as_trainable()` β
PublicAPI (beta): This API is in beta and may change before becoming stable.
Abstract Classes#
- class ray.train.data_parallel_trainer.DataParallelTrainer(*args, **kwargs)[source]#
Bases:
ray.train.base_trainer.BaseTrainer
A Trainer for data parallel training.
You should subclass this Trainer if your Trainer follows SPMD (single program, multiple data) programming paradigm - you want multiple processes to run the same function, but on different data.
This Trainer runs the function
train_loop_per_worker
on multiple Ray Actors.The
train_loop_per_worker
function is expected to take in either 0 or 1 arguments:def train_loop_per_worker(): ...
def train_loop_per_worker(config: Dict): ...
If
train_loop_per_worker
accepts an argument, thentrain_loop_config
will be passed in as the argument. This is useful if you want to tune the values intrain_loop_config
as hyperparameters.If the
datasets
dict contains a training dataset (denoted by the βtrainβ key), then it will be split into multiple dataset shards that can then be accessed bysession.get_dataset_shard("train")
insidetrain_loop_per_worker
. All the other datasets will not be split andsession.get_dataset_shard(...)
will return the the entire Dataset.Inside the
train_loop_per_worker
function, you can use any of the Ray AIR session methods.def train_loop_per_worker(): # Report intermediate results for callbacks or logging and # checkpoint data. session.report(...) # Returns dict of last saved checkpoint. session.get_checkpoint() # Returns the Ray Dataset shard for the given key. session.get_dataset_shard("my_dataset") # Returns the total number of workers executing training. session.get_world_size() # Returns the rank of this worker. session.get_world_rank() # Returns the rank of the worker on the current node. session.get_local_rank()
Any returns from the
train_loop_per_worker
will be discarded and not used or persisted anywhere.How do I use DataParallelTrainer or any of its subclasses?
Example:
import ray from ray.air import session def train_loop_for_worker(): dataset_shard_for_this_worker = session.get_dataset_shard("train") assert len(dataset_shard_for_this_worker) == 1 train_dataset = ray.data.from_items([1, 2, 3]) assert len(train_dataset) == 3 trainer = DataParallelTrainer( ray.air.config.ScalingConfig(num_workers=3), datasets={"train": train_dataset}, ) result = trainer.fit()
How do I develop on top of DataParallelTrainer?
In many cases, using DataParallelTrainer directly is sufficient to execute functions on multiple actors.
However, you may want to subclass
DataParallelTrainer
and create a custom Trainer for the following 2 use cases:Use Case 1: You want to do data parallel training, but want to have a predefined
training_loop_per_worker
.Use Case 2: You want to implement a custom
Backend
that automatically handles additional setup or teardown logic on each actor, so that the users of this new trainer do not have to implement this logic. For example, aTensorflowTrainer
can be built on top ofDataParallelTrainer
that automatically handles setting the proper environment variables for distributed Tensorflow on each actor.
For 1, you can set a predefined training loop in __init__
from ray.train.data_parallel_trainer import DataParallelTrainer class MyDataParallelTrainer(DataParallelTrainer): def __init__(self, *args, **kwargs): predefined_train_loop_per_worker = lambda: 1 super().__init__(predefined_train_loop_per_worker, *args, **kwargs)
For 2, you can implement the
ray.train.Backend
andray.train.BackendConfig
interfaces.from dataclasses import dataclass from ray.train.backend import Backend, BackendConfig class MyBackend(Backend): def on_start(self, worker_group, backend_config): def set_env_var(env_var_value): import os os.environ["MY_ENV_VAR"] = env_var_value worker_group.execute(set_env_var, backend_config.env_var) @dataclass class MyBackendConfig(BackendConfig): env_var: str = "default_value" def backend_cls(self): return MyBackend class MyTrainer(DataParallelTrainer): def __init__(self, train_loop_per_worker, my_backend_config: MyBackendConfig, **kwargs): super().__init__( train_loop_per_worker, backend_config=my_backend_config, **kwargs)
- Parameters
train_loop_per_worker β The training function to execute. This can either take in no arguments or a
config
dict.train_loop_config β Configurations to pass into
train_loop_per_worker
if it accepts an argument.backend_config β Configuration for setting up a Backend (e.g. Torch, Tensorflow, Horovod) on each worker to enable distributed communication. If no Backend should be set up, then set this to None.
scaling_config β Configuration for how to scale data parallel training.
dataset_config β Configuration for dataset ingest. This is merged with the default dataset config for the given trainer (
cls._dataset_config
).run_config β Configuration for the execution of the training run.
datasets β Any Ray Datasets to use for training. Use the key βtrainβ to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor β A ray.data.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint β A checkpoint to resume training from.
DeveloperAPI: This API may change across minor Ray releases.
- __init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, backend_config: Optional[ray.train.backend.BackendConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]#
- preprocess_datasets() None [source]#
Called during fit() to preprocess dataset attributes with preprocessor.
Note
This method is run on a remote process.
This method is called prior to entering the training_loop.
If the
Trainer
has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the βtrainβ key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.Then, all Trainerβs datasets will be transformed by the preprocessor.
The transformed datasets will be set back in the
self.datasets
attribute of the Trainer to be used when overridingtraining_loop
.
- training_loop() None [source]#
Loop called by fit() to run training and report results to Tune.
Note
This method runs on a remote process.
self.datasets
have already been preprocessed byself.preprocessor
.You can use the Tune Function API functions (
session.report()
andsession.get_checkpoint()
) inside this training loop.Example:
from ray.train.trainer import BaseTrainer class MyTrainer(BaseTrainer): def training_loop(self): for epoch_idx in range(5): ... session.report({"epoch": epoch_idx})
- get_dataset_config() Dict[str, ray.air.config.DatasetConfig] [source]#
Return a copy of this Trainerβs final dataset configs.
- Returns
The merged default + user-supplied dataset config.
- class ray.train.gbdt_trainer.GBDTTrainer(*args, **kwargs)[source]#
Bases:
ray.train.base_trainer.BaseTrainer
Abstract class for scaling gradient-boosting decision tree (GBDT) frameworks.
Inherited by XGBoostTrainer and LightGBMTrainer.
- Parameters
datasets β Ray Datasets to use for training and validation. Must include a βtrainβ key denoting the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.label_column β Name of the label column. A column with this name must be present in the training dataset.
params β Framework specific training parameters.
dmatrix_params β Dict of
dataset name:dict of kwargs
passed to respectivexgboost_ray.RayDMatrix
initializations.scaling_config β Configuration for how to scale data parallel training.
run_config β Configuration for the execution of the training run.
preprocessor β A ray.data.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint β A checkpoint to resume training from.
**train_kwargs β Additional kwargs passed to framework
train()
function.
DeveloperAPI: This API may change across minor Ray releases.
- __init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)[source]#
- preprocess_datasets() None [source]#
Called during fit() to preprocess dataset attributes with preprocessor.
Note
This method is run on a remote process.
This method is called prior to entering the training_loop.
If the
Trainer
has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the βtrainβ key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.Then, all Trainerβs datasets will be transformed by the preprocessor.
The transformed datasets will be set back in the
self.datasets
attribute of the Trainer to be used when overridingtraining_loop
.
- training_loop() None [source]#
Loop called by fit() to run training and report results to Tune.
Note
This method runs on a remote process.
self.datasets
have already been preprocessed byself.preprocessor
.You can use the Tune Function API functions (
session.report()
andsession.get_checkpoint()
) inside this training loop.Example:
from ray.train.trainer import BaseTrainer class MyTrainer(BaseTrainer): def training_loop(self): for epoch_idx in range(5): ... session.report({"epoch": epoch_idx})
- class ray.air.util.check_ingest.DummyTrainer(*args, **kwargs)[source]#
Bases:
ray.train.data_parallel_trainer.DataParallelTrainer
A Trainer that does nothing except read the data for a given number of epochs.
It prints out as much debugging statistics as possible.
This is useful for debugging data ingest problem. This trainer supports normal scaling options same as any other Trainer (e.g., num_workers, use_gpu).
DeveloperAPI: This API may change across minor Ray releases.
- __init__(*args, scaling_config: Optional[ray.air.config.ScalingConfig] = None, num_epochs: int = 1, prefetch_blocks: int = 1, batch_size: Optional[int] = 4096, **kwargs)[source]#
- preprocess_datasets()[source]#
Called during fit() to preprocess dataset attributes with preprocessor.
Note
This method is run on a remote process.
This method is called prior to entering the training_loop.
If the
Trainer
has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the βtrainβ key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.Then, all Trainerβs datasets will be transformed by the preprocessor.
The transformed datasets will be set back in the
self.datasets
attribute of the Trainer to be used when overridingtraining_loop
.
Dataset Iteration#
- class ray.data.DatasetIterator[source]
An iterator for reading items from a
Dataset
orDatasetPipeline
.For Datasets, each iteration call represents a complete read of all items in the Dataset. For DatasetPipelines, each iteration call represents one pass (epoch) over the base Dataset. Note that for DatasetPipelines, each pass iterates over the original Dataset, instead of a window (if
.window()
was used).If using Ray AIR, each trainer actor should get its own iterator by calling
session.get_dataset_shard("train")
.Examples
>>> import ray >>> ds = ray.data.range(5) >>> ds Dataset(num_blocks=5, num_rows=5, schema=<class 'int'>) >>> ds.iterator() DatasetIterator(Dataset(num_blocks=5, num_rows=5, schema=<class 'int'>)) >>> ds = ds.repeat(); ds DatasetPipeline(num_windows=inf, num_stages=2) >>> ds.iterator() DatasetIterator(DatasetPipeline(num_windows=inf, num_stages=2))
Tip
For debugging purposes, use
make_local_dataset_iterator()
to create a localDatasetIterator
from aDataset
, aPreprocessor
, and aDatasetConfig
.PublicAPI (beta): This API is in beta and may change before becoming stable.
- abstract iter_batches(*, prefetch_blocks: int = 0, batch_size: int = 256, batch_format: typing_extensions.Literal[default, numpy, pandas] = 'default', drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]] [source]
Return a local batched iterator over the dataset.
Examples
>>> import ray >>> for batch in ray.data.range( ... 1000000 ... ).iterator().iter_batches(): ... print(batch)
Time complexity: O(1)
- Parameters
prefetch_blocks β The number of blocks to prefetch ahead of the current block during the scan.
batch_size β The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than
batch_size
rows ifdrop_last
isFalse
. Defaults to 256.batch_format β The format in which to return each batch. Specify βdefaultβ to use the default block format (promoting tables to Pandas and tensors to NumPy), βpandasβ to select
pandas.DataFrame
, βpyarrowβ to selectpyarrow.Table
, or βnumpyβ to selectnumpy.ndarray
for tensor datasets andDict[str, numpy.ndarray]
for tabular datasets. Default is βdefaultβ.drop_last β Whether to drop the last batch if itβs incomplete.
local_shuffle_buffer_size β If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained.
local_shuffle_seed β The seed to use for the local random shuffle.
- Returns
An iterator over record batches.
- abstract iter_torch_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, dtypes: Optional[Union[torch.dtype, Dict[str, torch.dtype]]] = None, device: Optional[str] = None, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[TorchTensorBatchType] [source]
Return a local batched iterator of Torch Tensors over the dataset.
This iterator will yield single-tensor batches if the underlying dataset consists of a single column; otherwise, it will yield a dictionary of column-tensors. If looking for more flexibility in the tensor conversion (e.g. casting dtypes) or the batch format, try using
iter_batches
directly.Examples
>>> import ray >>> for batch in ray.data.range( ... 12, ... ).iterator().iter_torch_batches(batch_size=4): ... print(batch.shape) torch.Size([4, 1]) torch.Size([4, 1]) torch.Size([4, 1])
Time complexity: O(1)
- Parameters
prefetch_blocks β The number of blocks to prefetch ahead of the current block during the scan.
batch_size β The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than
batch_size
rows ifdrop_last
isFalse
. Defaults to 256.dtypes β The Torch dtype(s) for the created tensor(s); if None, the dtype will be inferred from the tensor data.
device β The device on which the tensor should be placed; if None, the Torch tensor will be constructed on the CPU.
drop_last β Whether to drop the last batch if itβs incomplete.
local_shuffle_buffer_size β If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to
batch_size
, and thereforebatch_size
must also be specified when using local shuffling.local_shuffle_seed β The seed to use for the local random shuffle.
- Returns
An iterator over Torch Tensor batches.
- to_tf(feature_columns: Union[str, List[str]], label_columns: Union[str, List[str]], *, prefetch_blocks: int = 0, batch_size: int = 1, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) tf.data.Dataset [source]
Return a TF Dataset over this dataset.
Warning
If your dataset contains ragged tensors, this method errors. To prevent errors, resize tensors or disable tensor extension casting.
Examples
>>> import ray >>> ds = ray.data.read_csv( ... "s3://anonymous@air-example-data/iris.csv" ... ) >>> it = ds.iterator(); it DatasetIterator(Dataset(num_blocks=1, num_rows=150, schema={sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64}))
If your model accepts a single tensor as input, specify a single feature column.
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target") <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your model accepts a dictionary as input, specify a list of feature columns.
>>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target") <_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your dataset contains multiple features but your model accepts a single tensor as input, combine features with
Concatenator
.>>> from ray.data.preprocessors import Concatenator >>> preprocessor = Concatenator(output_column_name="features", exclude="target") >>> it = preprocessor.transform(ds).iterator() >>> it DatasetIterator(Dataset(num_blocks=1, num_rows=150, schema={target: int64, features: TensorDtype(shape=(4,), dtype=float64)})) >>> it.to_tf("features", "target") <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
- Parameters
feature_columns β Columns that correspond to model inputs. If this is a string, the input data is a tensor. If this is a list, the input data is a
dict
that maps column names to their tensor representation.label_column β Columns that correspond to model targets. If this is a string, the target data is a tensor. If this is a list, the target data is a
dict
that maps column names to their tensor representation.prefetch_blocks β The number of blocks to prefetch ahead of the current block during the scan.
batch_size β Record batch size. Defaults to 1.
drop_last β Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.
local_shuffle_buffer_size β If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to
batch_size
, and thereforebatch_size
must also be specified when using local shuffling.local_shuffle_seed β The seed to use for the local random shuffle.
- Returns
A
tf.data.Dataset
that yields inputs and targets.
- abstract stats() str [source]
Returns a string containing execution timing information.
- ray.air.util.check_ingest.make_local_dataset_iterator(dataset: ray.data.dataset.Dataset, preprocessor: ray.data.preprocessor.Preprocessor, dataset_config: ray.air.config.DatasetConfig) ray.data.dataset_iterator.DatasetIterator [source]#
A helper function to create a local
DatasetIterator
, like the one returned byget_dataset_shard()
.This function should only be used for development and debugging. It will raise an exception if called by a worker instead of the driver.
- Parameters
dataset β The input Dataset.
preprocessor β The preprocessor that will be applied to the input dataset.
dataset_config β The dataset config normally passed to the trainer.
DeveloperAPI: This API may change across minor Ray releases.
Training Result#
- class ray.air.result.Result(metrics: Optional[Dict[str, Any]], checkpoint: Optional[ray.air.checkpoint.Checkpoint], error: Optional[Exception], log_dir: Optional[pathlib.Path], metrics_dataframe: Optional[pd.DataFrame], best_checkpoints: Optional[List[Tuple[ray.air.checkpoint.Checkpoint, Dict[str, Any]]]])[source]#
The final result of a ML training run or a Tune trial.
This is the class produced by Trainer.fit(). It contains a checkpoint, which can be used for resuming training and for creating a Predictor object. It also contains a metrics object describing training metrics.
error
is included so that non successful runs and trials can be represented as well.The constructor is a private API.
- Parameters
metrics β The final metrics as reported by an Trainable.
checkpoint β The final checkpoint of the Trainable.
error β The execution error of the Trainable run, if the trial finishes in error.
log_dir β Directory where the trial logs are saved.
metrics_dataframe β The full result dataframe of the Trainable. The dataframe is indexed by iterations and contains reported metrics.
best_checkpoints β A list of tuples of the best checkpoints saved by the Trainable and their associated metrics. The number of saved checkpoints is determined by the
checkpoint_config
argument ofrun_config
(by default, all checkpoints will be saved).
PublicAPI (beta): This API is in beta and may change before becoming stable.
- property config: Optional[Dict[str, Any]]#
The config associated with the result.
Training Session#
- ray.air.session.report(metrics: Dict, *, checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None) None [source]#
Report metrics and optionally save a checkpoint.
Each invocation of this method will automatically increment the underlying iteration number. The physical meaning of this βiterationβ is defined by user (or more specifically the way they call
report
). It does not necessarily map to one epoch.This API is the canonical way to report metrics from Tune and Train, and replaces the legacy
tune.report
,with tune.checkpoint_dir
,train.report
andtrain.save_checkpoint
calls.Note on directory checkpoints: AIR will take ownership of checkpoints passed to
report()
by moving them to a new path. The original directory will no longer be accessible to the caller after the report call.Example
- Parameters
metrics β The metrics you want to report.
checkpoint β The optional checkpoint you want to report.
- ray.air.session.get_checkpoint() Optional[ray.air.checkpoint.Checkpoint] [source]#
Access the sessionβs last checkpoint to resume from if applicable.
- Returns
- Checkpoint object if the session is currently being resumed.
Otherwise, return None.
######## Using it in the *per worker* train loop (TrainSession) ###### from ray.air import session from ray.air.checkpoint import Checkpoint from ray.air.config import ScalingConfig def train_func(): ckpt = session.get_checkpoint() if ckpt: with ckpt.as_directory() as loaded_checkpoint_dir: import tensorflow as tf model = tf.keras.models.load_model(loaded_checkpoint_dir) else: model = build_model() model.save("my_model", overwrite=True) session.report( metrics={"iter": 1}, checkpoint=Checkpoint.from_directory("my_model") ) scaling_config = ScalingConfig(num_workers=2) trainer = TensorflowTrainer( train_loop_per_worker=train_func, scaling_config=scaling_config ) result = trainer.fit() # trainer2 will pick up from the checkpoint saved by trainer1. trainer2 = TensorflowTrainer( train_loop_per_worker=train_func, scaling_config=scaling_config, # this is ultimately what is accessed through # ``Session.get_checkpoint()`` resume_from_checkpoint=result.checkpoint, ) result2 = trainer2.fit()
- ray.air.session.get_trial_resources() PlacementGroupFactory [source]#
Trial resources for the corresponding trial.
- ray.air.session.get_trial_dir() str [source]#
Log directory corresponding to the trial directory for a Tune session. If calling from a Train session, this will give the trial directory of its parent Tune session.
from ray import tune from ray.air import session def train_func(): # Example: # >>> session.get_trial_dir() # ~/ray_results/<exp-name>/<trial-dir> tuner = tune.Tuner(train_func) tuner.fit()
- ray.air.session.get_world_size() int [source]#
Get the current world size (i.e. total number of workers) for this run.
import time from ray.air import session from ray.air.config import ScalingConfig def train_loop_per_worker(config): assert session.get_world_size() == 4 train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = TensorflowTrainer(train_loop_per_worker, scaling_config=ScalingConfig(num_workers=1), datasets={"train": train_dataset}) trainer.fit()
- ray.air.session.get_world_rank() int [source]#
Get the world rank of this worker.
import time from ray.air import session from ray.air.config import ScalingConfig def train_loop_per_worker(): for iter in range(100): time.sleep(1) if session.get_world_rank() == 0: print("Worker 0") train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = TensorflowTrainer(train_loop_per_worker, scaling_config=ScalingConfig(num_workers=1), datasets={"train": train_dataset}) trainer.fit()
- ray.air.session.get_local_rank() int [source]#
Get the local rank of this worker (rank of the worker on its node).
import time from ray.air import session from ray.air.config import ScalingConfig def train_loop_per_worker(): if torch.cuda.is_available(): torch.cuda.set_device(session.get_local_rank()) ... train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = TensorflowTrainer(train_loop_per_worker, scaling_config=ScalingConfig(num_workers=1), datasets={"train": train_dataset}) trainer.fit()
- ray.air.session.get_local_world_size() int [source]#
Get the local rank of this worker (rank of the worker on its node).
Example
>>> import ray >>> from ray.air import session >>> from ray.air.config import ScalingConfig >>> from ray.train.torch import TorchTrainer >>> >>> def train_loop_per_worker(): ... return session.get_local_world_size() >>> >>> train_dataset = ray.data.from_items( ... [{"x": x, "y": x + 1} for x in range(32)]) >>> trainer = TorchTrainer(train_loop_per_worker, ... scaling_config=ScalingConfig(num_workers=1), ... datasets={"train": train_dataset}) >>> trainer.fit()
- ray.air.session.get_node_rank() int [source]#
Get the local rank of this worker (rank of the worker on its node).
Example
>>> import ray >>> from ray.air import session >>> from ray.air.config import ScalingConfig >>> from ray.train.torch import TorchTrainer >>> >>> def train_loop_per_worker(): ... return session.get_node_rank() >>> >>> train_dataset = ray.data.from_items( ... [{"x": x, "y": x + 1} for x in range(32)]) >>> trainer = TorchTrainer(train_loop_per_worker, ... scaling_config=ScalingConfig(num_workers=1), ... datasets={"train": train_dataset}) >>> trainer.fit()
- ray.air.session.get_dataset_shard(dataset_name: Optional[str] = None) Optional[DatasetIterator] [source]#
Returns the
ray.data.DatasetIterator
shard for this worker.Call
iter_torch_batches()
orto_tf()
on this shard to convert it to the appropriate framework-specific data type.import ray from ray import train from ray.air import session from ray.air.config import ScalingConfig def train_loop_per_worker(): model = Net() for iter in range(100): # Trainer will automatically handle sharding. data_shard = session.get_dataset_shard("train") for batch in data_shard.iter_torch_batches(): # ... return model train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = TorchTrainer(train_loop_per_worker, scaling_config=ScalingConfig(num_workers=2), datasets={"train": train_dataset}) trainer.fit()
- Parameters
dataset_name β If a Dictionary of Datasets was passed to
Trainer
, then specifies which dataset shard to return.- Returns
The
DatasetIterator
shard to use for this worker. If no dataset is passed into Trainer, then return None.
Trainer Configs#
- class ray.air.config.ScalingConfig(trainer_resources: Optional[Union[Dict, Domain, Dict[str, List]]] = None, num_workers: Optional[Union[int, Domain, Dict[str, List]]] = None, use_gpu: Union[bool, Domain, Dict[str, List]] = False, resources_per_worker: Optional[Union[Dict, Domain, Dict[str, List]]] = None, placement_strategy: Union[str, Domain, Dict[str, List]] = 'PACK', _max_cpu_fraction_per_node: Optional[Union[float, Domain, Dict[str, List]]] = None)[source]#
Configuration for scaling training.
- Parameters
trainer_resources β Resources to allocate for the trainer. If None is provided, will default to 1 CPU.
num_workers β The number of workers (Ray actors) to launch. Each worker will reserve 1 CPU by default. The number of CPUs reserved by each worker can be overridden with the
resources_per_worker
argument.use_gpu β If True, training will be done on GPUs (1 per worker). Defaults to False. The number of GPUs reserved by each worker can be overridden with the
resources_per_worker
argument.resources_per_worker β If specified, the resources defined in this Dict will be reserved for each worker. The
CPU
andGPU
keys (case-sensitive) can be defined to override the number of CPU/GPUs used by each worker.placement_strategy β The placement strategy to use for the placement group of the Ray actors. See Placement Group Strategies for the possible options.
_max_cpu_fraction_per_node β [Experimental] The max fraction of CPUs per node that Train will use for scheduling training actors. The remaining CPUs can be used for dataset tasks. It is highly recommended that you set this to less than 1.0 (e.g., 0.8) when passing datasets to trainers, to avoid hangs / CPU starvation of dataset tasks. Warning: this feature is experimental and is not recommended for use with autoscaling (scale-up will not trigger properly).
PublicAPI (beta): This API is in beta and may change before becoming stable.
- property total_resources#
Map of total resources required for the trainer.
- property num_cpus_per_worker#
The number of CPUs to set per worker.
- property num_gpus_per_worker#
The number of GPUs to set per worker.
- property additional_resources_per_worker#
Resources per worker, not including CPU or GPU resources.
- as_placement_group_factory() PlacementGroupFactory [source]#
Returns a PlacementGroupFactory to specify resources for Tune.
- classmethod from_placement_group_factory(pgf: PlacementGroupFactory) ScalingConfig [source]#
Create a ScalingConfig from a Tuneβs PlacementGroupFactory
- class ray.air.config.DatasetConfig(fit: Optional[bool] = None, split: Optional[bool] = None, required: Optional[bool] = None, transform: Optional[bool] = None, max_object_store_memory_fraction: Optional[float] = None, global_shuffle: Optional[bool] = None, randomize_block_order: Optional[bool] = None, per_epoch_preprocessor: Optional[ray.data.preprocessor.Preprocessor] = None, use_stream_api: Optional[int] = None, stream_window_size: Optional[int] = None)[source]#
Configuration for ingest of a single Dataset.
See the AIR Dataset configuration guide for usage examples.
This config defines how the Dataset should be read into the DataParallelTrainer. It configures the preprocessing, splitting, and ingest strategy per-dataset.
DataParallelTrainers declare default DatasetConfigs for each dataset passed in the
datasets
argument. Users have the opportunity to selectively override these configs by passing thedataset_config
argument. Trainers can also define user customizable values (e.g., XGBoostTrainer doesnβt support streaming ingest).- Parameters
fit β Whether to fit preprocessors on this dataset. This can be set on at most one dataset at a time. True by default for the βtrainβ dataset only.
split β Whether the dataset should be split across multiple workers. True by default for the βtrainβ dataset only.
required β Whether to raise an error if the Dataset isnβt provided by the user. False by default.
transform β Whether to transform the dataset with the fitted preprocessor. This must be enabled at least for the dataset that is fit. True by default.
[Experimental] (per_epoch_preprocessor) β The maximum fraction of Rayβs shared-memory object store to use for the dataset. The default value is -1, meaning that the preprocessed dataset should be cached, which may cause spilling if its size is larger than the object storeβs capacity. Pipelined ingest (all other values, 0 or higher) is experimental. Note that the absolute memory capacity used is based on the object store capacity at invocation time; this does not currently cover autoscaling cases where the size of the cluster may change.
global_shuffle β Whether to enable global shuffle (per pipeline window in streaming mode). Note that this is an expensive all-to-all operation, and most likely you want to use local shuffle instead. See https://docs.ray.io/en/master/data/faq.html and https://docs.ray.io/en/master/ray-air/check-ingest.html. False by default.
randomize_block_order β Whether to randomize the iteration order over blocks. The main purpose of this is to prevent data fetching hotspots in the cluster when running many parallel workers / trials on the same data. We recommend enabling it always. True by default.
[Experimental] β A preprocessor to re-apply on each pass of the dataset. The main use case for this is to apply a random transform on a training dataset on each epoch. The per-epoch preprocessor will be applied after all other preprocessors and in parallel with the dataset consumer.
use_stream_api β Deprecated. Use max_object_store_memory_fraction instead.
stream_window_size β Deprecated. Use max_object_store_memory_fraction instead.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- fill_defaults() ray.air.config.DatasetConfig [source]#
Return a copy of this config with all default values filled in.
- static merge(a: Dict[str, ray.air.config.DatasetConfig], b: Optional[Dict[str, ray.air.config.DatasetConfig]]) Dict[str, ray.air.config.DatasetConfig] [source]#
Merge two given DatasetConfigs, the second taking precedence.
- Raises
ValueError β if validation fails on the merged configs.
- static validated(config: Dict[str, DatasetConfig], datasets: Dict[str, Dataset]) Dict[str, DatasetConfig] [source]#
Validate the given config and datasets are usable.
Returns dict of validated configs with defaults filled out.
- class ray.air.config.FailureConfig(max_failures: int = 0, fail_fast: Union[bool, str] = False)[source]#
Configuration related to failure handling of each training/tuning run.
- Parameters
max_failures β Tries to recover a run at least this many times. Will recover from the latest checkpoint if present. Setting to -1 will lead to infinite recovery retries. Setting to 0 will disable retries. Defaults to 0.
fail_fast β Whether to fail upon the first error. Only used for Ray Tune - this does not apply to single training runs (e.g. with
Trainer.fit()
). If fail_fast=βraiseβ provided, Ray Tune will automatically raise the exception received by the Trainable. fail_fast=βraiseβ can easily leak resources and should be used with caution (it is best used withray.init(local_mode=True)
).
PublicAPI (beta): This API is in beta and may change before becoming stable.
- class ray.air.config.CheckpointConfig(num_to_keep: Optional[int] = None, checkpoint_score_attribute: Optional[str] = None, checkpoint_score_order: str = 'max', checkpoint_frequency: int = 0, checkpoint_at_end: Optional[bool] = None)[source]#
Configurable parameters for defining the checkpointing strategy.
Default behavior is to persist all checkpoints to disk. If
num_to_keep
is set, the default retention policy is to keep the checkpoints with maximum timestamp, i.e. the most recent checkpoints.- Parameters
num_to_keep β The number of checkpoints to keep on disk for this run. If a checkpoint is persisted to disk after there are already this many checkpoints, then an existing checkpoint will be deleted. If this is
None
then checkpoints will not be deleted. Must be >= 1.checkpoint_score_attribute β The attribute that will be used to score checkpoints to determine which checkpoints should be kept on disk when there are greater than
num_to_keep
checkpoints. This attribute must be a key from the checkpoint dictionary which has a numerical value. Per default, the last checkpoints will be kept.checkpoint_score_order β Either βmaxβ or βminβ. If βmaxβ, then checkpoints with highest values of
checkpoint_score_attribute
will be kept. If βminβ, then checkpoints with lowest values ofcheckpoint_score_attribute
will be kept.checkpoint_frequency β Number of iterations between checkpoints. If 0 this will disable checkpointing. Please note that most trainers will still save one checkpoint at the end of training. This attribute is only supported by trainers that donβt take in custom training loops.
checkpoint_at_end β If True, will save a checkpoint at the end of training. This attribute is only supported by trainers that donβt take in custom training loops. Defaults to True for trainers that support it and False for generic function trainables.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- class ray.air.config.RunConfig(name: Optional[str] = None, local_dir: Optional[str] = None, callbacks: Optional[List[Callback]] = None, stop: Optional[Union[Mapping, Stopper, Callable[[str, Mapping], bool]]] = None, failure_config: Optional[ray.air.config.FailureConfig] = None, sync_config: Optional[SyncConfig] = None, checkpoint_config: Optional[ray.air.config.CheckpointConfig] = None, progress_reporter: Optional[ProgressReporter] = None, verbose: Union[int, Verbosity] = 3, log_to_file: Union[bool, str, Tuple[str, str]] = False)[source]#
Runtime configuration for training and tuning runs.
Upon resuming from a training or tuning run checkpoint, Ray Train/Tune will automatically apply the RunConfig from the previously checkpointed run.
- Parameters
name β Name of the trial or experiment. If not provided, will be deduced from the Trainable.
local_dir β Local dir to save training results to. Defaults to
~/ray_results
.stop β Stop conditions to consider. Refer to ray.tune.stopper.Stopper for more info. Stoppers should be serializable.
callbacks β Callbacks to invoke. Refer to ray.tune.callback.Callback for more info. Callbacks should be serializable. Currently only stateless callbacks are supported for resumed runs. (any state of the callback will not be checkpointed by Tune and thus will not take effect in resumed runs).
failure_config β Failure mode configuration.
sync_config β Configuration object for syncing. See tune.SyncConfig.
checkpoint_config β Checkpointing configuration.
progress_reporter β Progress reporter for reporting intermediate experiment progress. Defaults to CLIReporter if running in command-line, or JupyterNotebookReporter if running in a Jupyter notebook.
verbose β 0, 1, 2, or 3. Verbosity mode. 0 = silent, 1 = only status updates, 2 = status and brief results, 3 = status and detailed results. Defaults to 2.
log_to_file β Log stdout and stderr to files in trial directories. If this is
False
(default), no files are written. Iftrue
, outputs are written totrialdir/stdout
andtrialdir/stderr
, respectively. If this is a single string, this is interpreted as a file relative to the trialdir, to which both streams are written. If this is a Sequence (e.g. a Tuple), it has to have length 2 and the elements indicate the files to which stdout and stderr are written, respectively.
PublicAPI (beta): This API is in beta and may change before becoming stable.
Checkpoint#
- class ray.air.checkpoint.Checkpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]#
Ray AIR Checkpoint.
An AIR Checkpoint are a common interface for accessing models across different AIR components and libraries. A Checkpoint can have its data represented in one of three ways:
as a directory on local (on-disk) storage
as a directory on an external storage (e.g., cloud storage)
as an in-memory dictionary
The Checkpoint object also has methods to translate between different checkpoint storage locations. These storage representations provide flexibility in distributed environments, where you may want to recreate an instance of the same model on multiple nodes or across different Ray clusters.
Example:
from ray.air.checkpoint import Checkpoint # Create checkpoint data dict checkpoint_data = {"data": 123} # Create checkpoint object from data checkpoint = Checkpoint.from_dict(checkpoint_data) # Save checkpoint to a directory on the file system. path = checkpoint.to_directory() # This path can then be passed around, # # e.g. to a different function or a different script. # You can also use `checkpoint.to_uri/from_uri` to # read from/write to cloud storage # In another function or script, recover Checkpoint object from path checkpoint = Checkpoint.from_directory(path) # Convert into dictionary again recovered_data = checkpoint.to_dict() # It is guaranteed that the original data has been recovered assert recovered_data == checkpoint_data
Checkpoints can be used to instantiate a
Predictor
,BatchPredictor
, orPredictorDeployment
class.The constructor is a private API, instead the
from_
methods should be used to create checkpoint objects (e.g.Checkpoint.from_directory()
).Other implementation notes:
When converting between different checkpoint formats, it is guaranteed that a full round trip of conversions (e.g. directory β> dict β> β> directory) will recover the original checkpoint data. There are no guarantees made about compatibility of intermediate representations.
New data can be added to a Checkpoint during conversion. Consider the following conversion: directory β> dict (adding dict[βfooβ] = βbarβ) β> directory β> dict (expect to see dict[βfooβ] = βbarβ). Note that the second directory will contain pickle files with the serialized additional field data in them.
Similarly with a dict as a source: dict β> directory (add file βfoo.txtβ) β> dict β> directory (will have βfoo.txtβ in it again). Note that the second dict representation will contain an extra field with the serialized additional files in it.
Checkpoints can be pickled and sent to remote processes. Please note that checkpoints pointing to local directories will be pickled as data representations, so the full checkpoint data will be contained in the checkpoint object. If you want to avoid this, consider passing only the checkpoint directory to the remote task and re-construct your checkpoint object in that function. Note that this will only work if the βremoteβ task is scheduled on the same node or a node that also has access to the local data path (e.g. on a shared file system like NFS).
If you need persistence across clusters, use the
to_uri()
orto_directory()
methods to persist your checkpoints to disk.PublicAPI (beta): This API is in beta and may change before becoming stable.
- property uri: Optional[str]#
Return checkpoint URI, if available.
This will return a URI to cloud storage if this checkpoint is persisted on cloud, or a local
file://
URI if this checkpoint is persisted on local disk and available on the current node.In all other cases, this will return None. Users can then choose to persist to cloud with
Checkpoint.to_uri()
.Example
>>> from ray.air import Checkpoint >>> checkpoint = Checkpoint.from_uri("s3://some-bucket/some-location") >>> assert checkpoint.uri == "s3://some-bucket/some-location" >>> checkpoint = Checkpoint.from_dict({"data": 1}) >>> assert checkpoint.uri == None
- Returns
Checkpoint URI if this URI is reachable from the current node (e.g. cloud storage or locally available file URI).
- classmethod from_bytes(data: bytes) ray.air.checkpoint.Checkpoint [source]#
Create a checkpoint from the given byte string.
- Parameters
data β Data object containing pickled checkpoint data.
- Returns
checkpoint object.
- Return type
- to_bytes() bytes [source]#
Return Checkpoint serialized as bytes object.
- Returns
Bytes object containing checkpoint data.
- Return type
bytes
- classmethod from_dict(data: dict) ray.air.checkpoint.Checkpoint [source]#
Create checkpoint object from dictionary.
- Parameters
data β Dictionary containing checkpoint data.
- Returns
checkpoint object.
- Return type
- to_dict() dict [source]#
Return checkpoint data as dictionary.
- Returns
Dictionary containing checkpoint data.
- Return type
dict
- classmethod from_directory(path: Union[str, os.PathLike]) ray.air.checkpoint.Checkpoint [source]#
Create checkpoint object from directory.
- Parameters
path β Directory containing checkpoint data. The caller promises to not delete the directory (gifts ownership of the directory to this Checkpoint).
- Returns
checkpoint object.
- Return type
- classmethod from_checkpoint(other: ray.air.checkpoint.Checkpoint) ray.air.checkpoint.Checkpoint [source]#
Create a checkpoint from a generic
Checkpoint
.This method can be used to create a framework-specific checkpoint from a generic
Checkpoint
object.Examples
>>> result = TorchTrainer.fit(...) >>> checkpoint = TorchCheckpoint.from_checkpoint(result.checkpoint) >>> model = checkpoint.get_model() Linear(in_features=1, out_features=1, bias=True)
- to_directory(path: Optional[str] = None) str [source]#
Write checkpoint data to directory.
- Parameters
path β Target directory to restore data in. If not specified, will create a temporary directory.
- Returns
Directory containing checkpoint data.
- Return type
str
- as_directory() Iterator[str] [source]#
Return checkpoint directory path in a context.
This function makes checkpoint data available as a directory while avoiding unnecessary copies and left-over temporary data.
If the checkpoint is already a directory checkpoint, it will return the existing path. If it is not, it will create a temporary directory, which will be deleted after the context is exited.
Users should treat the returned checkpoint directory as read-only and avoid changing any data within it, as it might get deleted when exiting the context.
Example:
with checkpoint.as_directory() as checkpoint_dir: # Do some read-only processing of files within checkpoint_dir pass # At this point, if a temporary directory was created, it will have # been deleted.
- classmethod from_uri(uri: str) ray.air.checkpoint.Checkpoint [source]#
Create checkpoint object from location URI (e.g. cloud storage).
Valid locations currently include AWS S3 (
s3://
), Google cloud storage (gs://
), HDFS (hdfs://
), and local files (file://
).- Parameters
uri β Source location URI to read data from.
- Returns
checkpoint object.
- Return type
- to_uri(uri: str) str [source]#
Write checkpoint data to location URI (e.g. cloud storage).
- Parameters
uri β Target location URI to write data to.
- Returns
Cloud location containing checkpoint data.
- Return type
str
- get_internal_representation() Tuple[str, Union[dict, str, ray.ObjectRef]] [source]#
Return tuple of (type, data) for the internal representation.
The internal representation can be used e.g. to compare checkpoint objects for equality or to access the underlying data storage.
The returned type is a string and one of
["local_path", "data_dict", "uri"]
.The data is the respective data value.
Note that paths converted from
file://...
will be returned aslocal_path
(without thefile://
prefix) and not asuri
.- Returns
Tuple of type and data.
DeveloperAPI: This API may change across minor Ray releases.
Predictor#
- class ray.train.predictor.Predictor(preprocessor: Optional[ray.data.preprocessor.Preprocessor] = None)[source]#
Predictors load models from checkpoints to perform inference.
Note
The base
Predictor
class cannot be instantiated directly. Only one of its subclasses can be used.How does a Predictor work?
Predictors expose a
predict
method that accepts an input batch of typeDataBatchType
and outputs predictions of the same type as the input batch.When the
predict
method is called the following occurs:The input batch is converted into a pandas DataFrame. Tensor input (like a
np.ndarray
) will be converted into a single column Pandas Dataframe.If there is a Preprocessor saved in the provided Checkpoint, the preprocessor will be used to transform the DataFrame.
The transformed DataFrame will be passed to the model for inference (via the
predictor._predict_pandas
method).The predictions will be outputted by
predict
in the same type as the original input.
How do I create a new Predictor?
To implement a new Predictor for your particular framework, you should subclass the base
Predictor
and implement the following two methods:_predict_pandas
: Given a pandas.DataFrame input, return a pandas.DataFrame containing predictions.from_checkpoint
: Logic for creating a Predictor from an AIR Checkpoint.Optionally
_predict_numpy
for better performance when working with tensor data to avoid extra copies from Pandas conversions.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- abstract classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, **kwargs) ray.train.predictor.Predictor [source]#
Create a specific predictor from a checkpoint.
- Parameters
checkpoint β Checkpoint to load predictor data from.
kwargs β Arguments specific to predictor implementations.
- Returns
Predictor object.
- Return type
- classmethod from_pandas_udf(pandas_udf: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame]) ray.train.predictor.Predictor [source]#
Create a Predictor from a Pandas UDF.
- Parameters
pandas_udf β A function that takes a pandas.DataFrame and other optional kwargs and returns a pandas.DataFrame.
- get_preprocessor() Optional[ray.data.preprocessor.Preprocessor] [source]#
Get the preprocessor to use prior to executing predictions.
- set_preprocessor(preprocessor: Optional[ray.data.preprocessor.Preprocessor]) None [source]#
Set the preprocessor to use prior to executing predictions.
- classmethod preferred_batch_format() ray.air.util.data_batch_conversion.BatchFormat [source]#
Batch format hint for upstream producers to try yielding best block format.
The preferred batch format to use if both
_predict_pandas
and_predict_numpy
are implemented. Defaults to Pandas.Can be overriden by predictor classes depending on the framework type, e.g. TorchPredictor prefers Numpy and XGBoostPredictor prefers Pandas as native batch format.
DeveloperAPI: This API may change across minor Ray releases.
- predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], **kwargs) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]] [source]#
Perform inference on a batch of data.
- Parameters
data β A batch of input data of type
DataBatchType
.kwargs β Arguments specific to predictor implementations. These are passed
_predict_pandas. (directly to _predict_numpy or) β
- Returns
Prediction result. The return type will be the same as the input type.
- Return type
DataBatchType
Data Types#
- ray.train.predictor.DataBatchType#
alias of
Union
[numpy.ndarray
,pandas.DataFrame
,Dict
[str
,numpy.ndarray
]]
Batch Predictor#
- class ray.train.batch_predictor.BatchPredictor(checkpoint: ray.air.checkpoint.Checkpoint, predictor_cls: Type[ray.train.predictor.Predictor], **predictor_kwargs)[source]#
Batch predictor class.
Takes a predictor class and a checkpoint and provides an interface to run batch scoring on Ray datasets.
This batch predictor wraps around a predictor class and executes it in a distributed way when calling
predict()
.PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_pandas_udf(pandas_udf: Callable[[pandas.core.frame.DataFrame], pandas.core.frame.DataFrame]) ray.train.batch_predictor.BatchPredictor [source]#
Create a Predictor from a Pandas UDF.
- Parameters
pandas_udf β A function that takes a pandas.DataFrame and other optional kwargs and returns a pandas.DataFrame.
- get_preprocessor() ray.data.preprocessor.Preprocessor [source]#
Get the preprocessor to use prior to executing predictions.
- set_preprocessor(preprocessor: ray.data.preprocessor.Preprocessor) None [source]#
Set the preprocessor to use prior to executing predictions.
- predict(data: Union[ray.data.dataset.Dataset, ray.data.dataset_pipeline.DatasetPipeline], *, feature_columns: Optional[List[str]] = None, keep_columns: Optional[List[str]] = None, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: Optional[int] = None, num_gpus_per_worker: Optional[int] = None, separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) Union[ray.data.dataset.Dataset, ray.data.dataset_pipeline.DatasetPipeline] [source]#
Run batch scoring on a Dataset.
- Parameters
data β Ray dataset or pipeline to run batch prediction on.
feature_columns β List of columns in the preprocessed dataset to use for prediction. Columns not specified will be dropped from
data
before being passed to the predictor. If None, use all columns in the preprocessed dataset.keep_columns β List of columns in the preprocessed dataset to include in the prediction result. This is useful for calculating final accuracies/metrics on the result dataset. If None, the columns in the output dataset will contain just the prediction results.
batch_size β Split dataset into batches of this size for prediction.
min_scoring_workers β Minimum number of scoring actors.
max_scoring_workers β If set, specify the maximum number of scoring actors.
num_cpus_per_worker β Number of CPUs to allocate per scoring worker.
num_gpus_per_worker β Number of GPUs to allocate per scoring worker.
separate_gpu_stage β If using GPUs, specifies whether to execute GPU processing in a separate stage (enabled by default). This avoids running expensive preprocessing steps on GPU workers.
ray_remote_args β Additional resource requirements to request from ray.
predict_kwargs β Keyword arguments passed to the predictorβs
predict()
method.
- Returns
Dataset containing scoring results.
Examples
import pandas as pd import ray from ray.train.batch_predictor import BatchPredictor def calculate_accuracy(df): return pd.DataFrame({"correct": df["preds"] == df["label"]}) # Create a batch predictor that returns identity as the predictions. batch_pred = BatchPredictor.from_pandas_udf( lambda data: pd.DataFrame({"preds": data["feature_1"]})) # Create a dummy dataset. ds = ray.data.from_pandas(pd.DataFrame({ "feature_1": [1, 2, 3], "label": [1, 2, 3]})) # Execute batch prediction using this predictor. predictions = batch_pred.predict(ds, feature_columns=["feature_1"], keep_columns=["label"]) # print predictions and calculate final accuracy print(predictions) correct = predictions.map_batches(calculate_accuracy) print(f"Final accuracy: {correct.sum(on='correct') / correct.count()}")
Dataset(num_blocks=1, num_rows=3, schema={preds: int64, label: int64}) Final accuracy: 1.0
- predict_pipelined(data: ray.data.dataset.Dataset, *, blocks_per_window: Optional[int] = None, bytes_per_window: Optional[int] = None, feature_columns: Optional[List[str]] = None, keep_columns: Optional[List[str]] = None, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: Optional[int] = None, num_gpus_per_worker: Optional[int] = None, separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) ray.data.dataset_pipeline.DatasetPipeline [source]#
Setup a prediction pipeline for batch scoring.
Unlike
predict()
, this generates a DatasetPipeline object and does not perform execution. Execution can be triggered by pulling from the pipeline.This is a convenience wrapper around calling
window()
on the Dataset prior to passing itBatchPredictor.predict()
.- Parameters
data β Ray dataset to run batch prediction on.
blocks_per_window β The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.
bytes_per_window β Specify the window size in bytes instead of blocks. This will be treated as an upper bound for the window size, but each window will still include at least one block. This is mutually exclusive with
blocks_per_window
.feature_columns β List of columns in data to use for prediction. Columns not specified will be dropped from
data
before being passed to the predictor. If None, use all columns.keep_columns β List of columns in
data
to include in the prediction result. This is useful for calculating final accuracies/metrics on the result dataset. If None, the columns in the output dataset will contain just the prediction results.batch_size β Split dataset into batches of this size for prediction.
min_scoring_workers β Minimum number of scoring actors.
max_scoring_workers β If set, specify the maximum number of scoring actors.
num_cpus_per_worker β Number of CPUs to allocate per scoring worker.
num_gpus_per_worker β Number of GPUs to allocate per scoring worker.
separate_gpu_stage β If using GPUs, specifies whether to execute GPU processing in a separate stage (enabled by default). This avoids running expensive preprocessing steps on GPU workers.
ray_remote_args β Additional resource requirements to request from ray.
predict_kwargs β Keyword arguments passed to the predictorβs
predict()
method.
- Returns
DatasetPipeline that generates scoring results.
Examples
import pandas as pd import ray from ray.train.batch_predictor import BatchPredictor # Create a batch predictor that always returns `42` for each input. batch_pred = BatchPredictor.from_pandas_udf( lambda data: pd.DataFrame({"a": [42] * len(data)})) # Create a dummy dataset. ds = ray.data.range_tensor(1000, parallelism=4) # Setup a prediction pipeline. print(batch_pred.predict_pipelined(ds, blocks_per_window=1))
DatasetPipeline(num_windows=4, num_stages=3)
Tuner#
- class ray.tune.tuner.Tuner(trainable: Optional[Union[str, Callable, Type[ray.tune.trainable.trainable.Trainable], BaseTrainer]] = None, *, param_space: Optional[Dict[str, Any]] = None, tune_config: Optional[ray.tune.tune_config.TuneConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, _tuner_kwargs: Optional[Dict] = None, _tuner_internal: Optional[ray.tune.impl.tuner_internal.TunerInternal] = None)[source]#
Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune.
- Parameters
trainable β The trainable to be tuned.
param_space β Search space of the tuning job. One thing to note is that both preprocessor and dataset can be tuned here.
tune_config β Tuning algorithm specific configs. Refer to ray.tune.tune_config.TuneConfig for more info.
run_config β Runtime configuration that is specific to individual trials. If passed, this will overwrite the run config passed to the Trainer, if applicable. Refer to ray.air.config.RunConfig for more info.
Usage pattern:
from sklearn.datasets import load_breast_cancer from ray import tune from ray.data import from_pandas from ray.air.config import RunConfig, ScalingConfig from ray.train.xgboost import XGBoostTrainer from ray.tune.tuner import Tuner def get_dataset(): data_raw = load_breast_cancer(as_frame=True) dataset_df = data_raw["data"] dataset_df["target"] = data_raw["target"] dataset = from_pandas(dataset_df) return dataset trainer = XGBoostTrainer( label_column="target", params={}, datasets={"train": get_dataset()}, ) param_space = { "scaling_config": ScalingConfig( num_workers=tune.grid_search([2, 4]), resources_per_worker={ "CPU": tune.grid_search([1, 2]), }, ), # You can even grid search various datasets in Tune. # "datasets": { # "train": tune.grid_search( # [ds1, ds2] # ), # }, "params": { "objective": "binary:logistic", "tree_method": "approx", "eval_metric": ["logloss", "error"], "eta": tune.loguniform(1e-4, 1e-1), "subsample": tune.uniform(0.5, 1.0), "max_depth": tune.randint(1, 9), }, } tuner = Tuner(trainable=trainer, param_space=param_space, run_config=RunConfig(name="my_tune_run")) analysis = tuner.fit()
To retry a failed tune run, you can then do
tuner = Tuner.restore(experiment_checkpoint_dir) tuner.fit()
experiment_checkpoint_dir
can be easily located near the end of the console output of your first failed run.PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod restore(path: str, trainable: Optional[Union[str, Callable, Type[ray.tune.trainable.trainable.Trainable], BaseTrainer]] = None, resume_unfinished: bool = True, resume_errored: bool = False, restart_errored: bool = False, overwrite_trainable: Optional[Union[str, Callable, Type[ray.tune.trainable.trainable.Trainable], BaseTrainer]] = None) Tuner [source]#
Restores Tuner after a previously failed run.
All trials from the existing run will be added to the result table. The argument flags control how existing but unfinished or errored trials are resumed.
Finished trials are always added to the overview table. They will not be resumed.
Unfinished trials can be controlled with the
resume_unfinished
flag. IfTrue
(default), they will be continued. IfFalse
, they will be added as terminated trials (even if they were only created and never trained).Errored trials can be controlled with the
resume_errored
andrestart_errored
flags. The former will resume errored trials from their latest checkpoints. The latter will restart errored trials from scratch and prevent loading their last checkpoints.- Parameters
path β The path where the previous failed run is checkpointed. This information could be easily located near the end of the console output of previous run. Note: depending on whether ray client mode is used or not, this path may or may not exist on your local machine.
trainable β The trainable to use upon resuming the experiment. This should be the same trainable that was used to initialize the original Tuner. NOTE: Starting in 2.5, this will be a required parameter.
resume_unfinished β If True, will continue to run unfinished trials.
resume_errored β If True, will re-schedule errored trials and try to restore from their latest checkpoints.
restart_errored β If True, will re-schedule errored trials but force restarting them from scratch (no checkpoint will be loaded).
overwrite_trainable β Deprecated. Use the
trainable
argument instead.
- classmethod can_restore(path: Union[str, pathlib.Path]) bool [source]#
Checks whether a given directory contains a restorable Tune experiment.
Usage Pattern:
Use this utility to switch between starting a new Tune experiment and restoring when possible. This is useful for experiment fault-tolerance when re-running a failed tuning script.
import os from ray.tune import Tuner from ray.air import RunConfig def train_fn(config): # Make sure to implement checkpointing so that progress gets # saved on restore. pass name = "exp_name" local_dir = "~/ray_results" exp_dir = os.path.join(local_dir, name) if Tuner.can_restore(exp_dir): tuner = Tuner.restore(exp_dir, resume_errored=True) else: tuner = Tuner( train_fn, run_config=RunConfig(name=name, local_dir=local_dir), ) tuner.fit()
- Parameters
path β The path to the experiment directory of the Tune experiment. This can be either a local directory (e.g. ~/ray_results/exp_name) or a remote URI (e.g. s3://bucket/exp_name).
- Returns
True if this path exists and contains the Tuner state to resume from
- Return type
bool
- fit() ray.tune.result_grid.ResultGrid [source]#
Executes hyperparameter tuning job as configured and returns result.
Failure handling: For the kind of exception that happens during the execution of a trial, one may inspect it together with stacktrace through the returned result grid. See
ResultGrid
for reference. Each trial may fail up to a certain number. This is configured byRunConfig.FailureConfig.max_failures
.Exception that happens beyond trials will be thrown by this method as well. In such cases, there will be instruction like the following printed out at the end of console output to inform users on how to resume.
Please use tuner = Tuner.restore(β~/ray_results/tuner_resumeβ) to resume.
- Raises
RayTaskError β If user-provided trainable raises an exception
TuneError β General Ray Tune error.
- get_results() ray.tune.result_grid.ResultGrid [source]#
Get results of a hyperparameter tuning run.
This method returns the same results as
fit()
and can be used to retrieve the results after restoring a tuner without callingfit()
again.If the tuner has not been fit before, an error will be raised.
from ray.tune import Tuner tuner = Tuner.restore("/path/to/experiment') results = tuner.get_results()
- Returns
Result grid of a previously fitted tuning run.
TuneConfig#
- class ray.tune.tune_config.TuneConfig(mode: Optional[str] = None, metric: Optional[str] = None, search_alg: Optional[Union[ray.tune.search.searcher.Searcher, ray.tune.search.search_algorithm.SearchAlgorithm]] = None, scheduler: Optional[ray.tune.schedulers.trial_scheduler.TrialScheduler] = None, num_samples: int = 1, max_concurrent_trials: Optional[int] = None, time_budget_s: Optional[Union[int, float, datetime.timedelta]] = None, reuse_actors: Optional[bool] = None, trial_name_creator: Optional[Callable[[ray.tune.experiment.trial.Trial], str]] = None, trial_dirname_creator: Optional[Callable[[ray.tune.experiment.trial.Trial], str]] = None, chdir_to_trial_dir: bool = True)[source]#
Tune specific configs.
- Parameters
metric β Metric to optimize. This metric should be reported with
tune.report()
. If set, will be passed to the search algorithm and scheduler.mode β Must be one of [min, max]. Determines whether objective is minimizing or maximizing the metric attribute. If set, will be passed to the search algorithm and scheduler.
search_alg β Search algorithm for optimization. Default to random search.
scheduler β Scheduler for executing the experiment. Choose among FIFO (default), MedianStopping, AsyncHyperBand, HyperBand and PopulationBasedTraining. Refer to ray.tune.schedulers for more options.
num_samples β Number of times to sample from the hyperparameter space. Defaults to 1. If
grid_search
is provided as an argument, the grid will be repeatednum_samples
of times. If this is -1, (virtually) infinite samples are generated until a stopping condition is met.max_concurrent_trials β Maximum number of trials to run concurrently. Must be non-negative. If None or 0, no limit will be applied. This is achieved by wrapping the
search_alg
in aConcurrencyLimiter
, and thus setting this argument will raise an exception if thesearch_alg
is already aConcurrencyLimiter
. Defaults to None.time_budget_s β Global time budget in seconds after which all trials are stopped. Can also be a
datetime.timedelta
object.reuse_actors β Whether to reuse actors between different trials when possible. This can drastically speed up experiments that start and stop actors often (e.g., PBT in time-multiplexing mode). This requires trials to have the same resource requirements. Defaults to
True
for function trainables (including most Ray AIR trainers) andFalse
for class and registered trainables (e.g. RLlib).trial_name_creator β Optional function that takes in a Trial and returns its name (i.e. its string representation). Be sure to include some unique identifier (such as
Trial.trial_id
) in each trialβs name. NOTE: This API is in alpha and subject to change.trial_dirname_creator β Optional function that takes in a trial and generates its trial directory name as a string. Be sure to include some unique identifier (such as
Trial.trial_id
) is used in each trialβs directory name. Otherwise, trials could overwrite artifacts and checkpoints of other trials. The return value cannot be a path. NOTE: This API is in alpha and subject to change.chdir_to_trial_dir β Whether to change the working directory of each worker to its corresponding trial directory. Defaults to
True
to prevent contention between workers saving trial-level outputs. If set toFalse
, files are accessible with paths relative to the original working directory. However, all workers on the same node now share the same working directory, so be sure to usesession.get_trial_dir()
as the path to save any outputs.
PublicAPI (beta): This API is in beta and may change before becoming stable.
Tuner Results#
- class ray.tune.result_grid.ResultGrid(experiment_analysis: ray.tune.analysis.experiment_analysis.ExperimentAnalysis)[source]#
A set of
Result
objects for interacting with Ray Tune results.You can use it to inspect the trials and obtain the best result.
The constructor is a private API. This object can only be created as a result of
Tuner.fit()
.Example
>>> import random >>> from ray import air, tune >>> def random_error_trainable(config): ... if random.random() < 0.5: ... return {"loss": 0.0} ... else: ... raise ValueError("This is an error") >>> tuner = tune.Tuner( ... random_error_trainable, ... run_config=air.RunConfig(name="example-experiment"), ... tune_config=tune.TuneConfig(num_samples=10), ... ) >>> result_grid = tuner.fit() >>> for i in range(len(result_grid)): ... result = result_grid[i] ... if not result.error: ... print(f"Trial finishes successfully with metrics" ... f"{result.metrics}.") ... else: ... print(f"Trial failed with error {result.error}.")
You can also use
result_grid
for more advanced analysis.>>> # Get the best result based on a particular metric. >>> best_result = result_grid.get_best_result( ... metric="loss", mode="min") >>> # Get the best checkpoint corresponding to the best result. >>> best_checkpoint = best_result.checkpoint >>> # Get a dataframe for the last reported results of all of the trials >>> df = result_grid.get_dataframe() >>> # Get a dataframe for the minimum loss seen for each trial >>> df = result_grid.get_dataframe(metric="loss", mode="min")
Note that trials of all statuses are included in the final result grid. If a trial is not in terminated state, its latest result and checkpoint as seen by Tune will be provided.
See Analyzing Tune Experiment Results for more usage examples.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- get_best_result(metric: Optional[str] = None, mode: Optional[str] = None, scope: str = 'last', filter_nan_and_inf: bool = True) ray.air.result.Result [source]#
Get the best result from all the trials run.
- Parameters
metric β Key for trial info to order on. Defaults to the metric specified in your Tunerβs
TuneConfig
.mode β One of [min, max]. Defaults to the mode specified in your Tunerβs
TuneConfig
.scope β One of [all, last, avg, last-5-avg, last-10-avg]. If
scope=last
, only look at each trialβs final step formetric
, and compare across trials based onmode=[min,max]
. Ifscope=avg
, consider the simple average over all steps formetric
and compare across trials based onmode=[min,max]
. Ifscope=last-5-avg
orscope=last-10-avg
, consider the simple average over the last 5 or 10 steps formetric
and compare across trials based onmode=[min,max]
. Ifscope=all
, find each trialβs min/max score formetric
based onmode
, and compare trials based onmode=[min,max]
.filter_nan_and_inf β If True (default), NaN or infinite values are disregarded and these trials are never selected as the best trial.
- get_dataframe(filter_metric: Optional[str] = None, filter_mode: Optional[str] = None) pandas.core.frame.DataFrame [source]#
Return dataframe of all trials with their configs and reported results.
Per default, this returns the last reported results for each trial.
If
filter_metric
andfilter_mode
are set, the results from each trial are filtered for this metric and mode. For example, iffilter_metric="some_metric"
andfilter_mode="max"
, for each trial, every received result is checked, and the one wheresome_metric
is maximal is returned.Example
result_grid = Tuner.fit(...) # Get last reported results per trial df = result_grid.get_dataframe() # Get best ever reported accuracy per trial df = result_grid.get_dataframe( filter_metric="accuracy", filter_mode="max" )
- Parameters
filter_metric β Metric to filter best result for.
filter_mode β If
filter_metric
is given, one of["min", "max"]
to specify if we should find the minimum or maximum result.
- Returns
Pandas DataFrame with each trial as a row and their results as columns.
- property errors#
Returns the exceptions of errored trials.
- property num_errors#
Returns the number of errored trials.
- property num_terminated#
Returns the number of terminated (but not errored) trials.
Serving#
- ray.serve.air_integrations.PredictorDeployment#
alias of Deployment(name=PredictorDeployment,version=None,route_prefix=/PredictorDeployment)
- class ray.serve.air_integrations.PredictorWrapper(predictor_cls: Union[str, Type[Predictor]], checkpoint: Union[Checkpoint, str], http_adapter: Union[str, Callable[[Any], Any]] = 'ray.serve.http_adapters.json_to_ndarray', batching_params: Optional[Union[Dict[str, int], bool]] = None, predict_kwargs: Optional[Dict[str, Any]] = None, **predictor_from_checkpoint_kwargs)[source]#
Serve any Ray AIR predictor from an AIR checkpoint.
- Parameters
predictor_cls β The class or path for predictor class. The type must be a subclass of
ray.train.predictor.Predictor
.checkpoint β
The checkpoint object or a uri to load checkpoint from
The checkpoint object must be an instance of
ray.air.checkpoint.Checkpoint
.The uri string will be called to construct a checkpoint object using
Checkpoint.from_uri("uri_to_load_from")
.
http_adapter β The FastAPI input conversion function. By default, Serve will use the NdArray schema and convert to numpy array. You can pass in any FastAPI dependency resolver that returns an array. When you pass in a string, Serve will import it. Please refer to Serve HTTP adatpers documentation to learn more.
batching_params β override the default parameters to
ray.serve.batch()
. PassFalse
to disable batching.predict_kwargs β optional keyword arguments passed to the
Predictor.predict
method upon each call.**predictor_from_checkpoint_kwargs β Additional keyword arguments passed to the
Predictor.from_checkpoint()
call.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
Trainer and Predictor Integrations#
XGBoost#
- class ray.train.xgboost.XGBoostTrainer(*args, **kwargs)[source]
Bases:
ray.train.gbdt_trainer.GBDTTrainer
A Trainer for data parallel XGBoost training.
This Trainer runs the XGBoost training loop in a distributed manner using multiple Ray Actors.
Note
XGBoostTrainer
does not modify or otherwise alter the working of the XGBoost distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on XGBoost distributed training, refer to XGBoost documentation.Example
import ray from ray.train.xgboost import XGBoostTrainer from ray.air.config import ScalingConfig train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset} ) result = trainer.fit()
- Parameters
datasets β Ray Datasets to use for training and validation. Must include a βtrainβ key denoting the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.label_column β Name of the label column. A column with this name must be present in the training dataset.
params β XGBoost training parameters. Refer to XGBoost documentation for a list of possible parameters.
dmatrix_params β Dict of
dataset name:dict of kwargs
passed to respectivexgboost_ray.RayDMatrix
initializations, which in turn are passed toxgboost.DMatrix
objects created on each worker. For example, this can be used to add sample weights with theweights
parameter.scaling_config β Configuration for how to scale data parallel training.
run_config β Configuration for the execution of the training run.
preprocessor β A ray.data.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint β A checkpoint to resume training from.
**train_kwargs β Additional kwargs passed to
xgboost.train()
function.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- __init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)
- class ray.train.xgboost.XGBoostCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]
Bases:
ray.air.checkpoint.Checkpoint
A
Checkpoint
with XGBoost-specific functionality.Create this from a generic
Checkpoint
by callingXGBoostCheckpoint.from_checkpoint(ckpt)
.PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_model(booster: xgboost.core.Booster, *, preprocessor: Optional[Preprocessor] = None) XGBoostCheckpoint [source]
Create a
Checkpoint
that stores an XGBoost model.- Parameters
booster β The XGBoost model to store in the checkpoint.
preprocessor β A fitted preprocessor to be applied before inference.
- Returns
An
XGBoostCheckpoint
containing the specifiedEstimator
.
Examples
>>> import numpy as np >>> import ray >>> from ray.train.xgboost import XGBoostCheckpoint >>> import xgboost >>> >>> train_X = np.array([[1, 2], [3, 4]]) >>> train_y = np.array([0, 1]) >>> >>> model = xgboost.XGBClassifier().fit(train_X, train_y) >>> checkpoint = XGBoostCheckpoint.from_model(model.get_booster())
You can use a
XGBoostCheckpoint
to create anXGBoostPredictor
and preform inference.>>> from ray.train.xgboost import XGBoostPredictor >>> >>> predictor = XGBoostPredictor.from_checkpoint(checkpoint)
- get_model() xgboost.core.Booster [source]
Retrieve the XGBoost model stored in this checkpoint.
- class ray.train.xgboost.XGBoostPredictor(model: xgboost.core.Booster, preprocessor: Optional[Preprocessor] = None)[source]
Bases:
ray.train.predictor.Predictor
A predictor for XGBoost models.
- Parameters
model β The XGBoost booster to use for predictions.
preprocessor β A preprocessor used to transform data batches prior to prediction.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.xgboost.xgboost_predictor.XGBoostPredictor [source]
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
XGBoostTrainer
.- Parameters
checkpoint β The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
XGBoostTrainer
run.
- predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, dmatrix_kwargs: Optional[Dict[str, Any]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]] [source]
Run inference on data batch.
The data is converted into an XGBoost DMatrix before being inputted to the model.
- Parameters
data β A batch of input data.
feature_columns β The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in
data
.dmatrix_kwargs β Dict of keyword arguments passed to
xgboost.DMatrix
.**predict_kwargs β Keyword arguments passed to
xgboost.Booster.predict
.
Examples
>>> import numpy as np >>> import xgboost as xgb >>> from ray.train.xgboost import XGBoostPredictor >>> >>> train_X = np.array([[1, 2], [3, 4]]) >>> train_y = np.array([0, 1]) >>> >>> model = xgb.XGBClassifier().fit(train_X, train_y) >>> predictor = XGBoostPredictor(model=model.get_booster()) >>> >>> data = np.array([[1, 2], [3, 4]]) >>> predictions = predictor.predict(data) >>> >>> # Only use first and second column as the feature >>> data = np.array([[1, 2, 8], [3, 4, 9]]) >>> predictor.predict(data, feature_columns=[0, 1]) array([0.5, 0.5], dtype=float32)
>>> import pandas as pd >>> import xgboost as xgb >>> from ray.train.xgboost import XGBoostPredictor >>> >>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) >>> train_y = pd.Series([0, 1]) >>> >>> model = xgb.XGBClassifier().fit(train_X, train_y) >>> predictor = XGBoostPredictor(model=model.get_booster()) >>> >>> # Pandas dataframe. >>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) >>> predictions = predictor.predict(data) >>> >>> # Only use first and second column as the feature >>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"]) >>> predictor.predict(data, feature_columns=["A", "B"]) predictions 0 0.5 1 0.5
- Returns
Prediction result.
LightGBM#
- class ray.train.lightgbm.LightGBMTrainer(*args, **kwargs)[source]
Bases:
ray.train.gbdt_trainer.GBDTTrainer
A Trainer for data parallel LightGBM training.
This Trainer runs the LightGBM training loop in a distributed manner using multiple Ray Actors.
If you would like to take advantage of LightGBMβs built-in handling for features with the categorical data type, consider using the
Categorizer
preprocessor to set the dtypes in the dataset.Note
LightGBMTrainer
does not modify or otherwise alter the working of the LightGBM distributed training algorithm. Ray only provides orchestration, data ingest and fault tolerance. For more information on LightGBM distributed training, refer to LightGBM documentation.Example
import ray from ray.train.lightgbm import LightGBMTrainer from ray.air.config import ScalingConfig train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = LightGBMTrainer( label_column="y", params={"objective": "regression"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset} ) result = trainer.fit()
- Parameters
datasets β Ray Datasets to use for training and validation. Must include a βtrainβ key denoting the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.label_column β Name of the label column. A column with this name must be present in the training dataset.
params β LightGBM training parameters passed to
lightgbm.train()
. Refer to LightGBM documentation for a list of possible parameters.dmatrix_params β Dict of
dataset name:dict of kwargs
passed to respectivexgboost_ray.RayDMatrix
initializations, which in turn are passed tolightgbm.Dataset
objects created on each worker. For example, this can be used to add sample weights with theweights
parameter.scaling_config β Configuration for how to scale data parallel training.
run_config β Configuration for the execution of the training run.
preprocessor β A ray.data.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint β A checkpoint to resume training from.
**train_kwargs β Additional kwargs passed to
lightgbm.train()
function.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- __init__(*, datasets: Dict[str, Union[Dataset, Callable[[], Dataset]]], label_column: str, params: Dict[str, Any], dmatrix_params: Optional[Dict[str, Dict[str, Any]]] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, run_config: Optional[ray.air.config.RunConfig] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None, **train_kwargs)
- class ray.train.lightgbm.LightGBMCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]
Bases:
ray.air.checkpoint.Checkpoint
A
Checkpoint
with LightGBM-specific functionality.Create this from a generic
Checkpoint
by callingLightGBMCheckpoint.from_checkpoint(ckpt)
.PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_model(booster: lightgbm.basic.Booster, *, preprocessor: Optional[Preprocessor] = None) LightGBMCheckpoint [source]
Create a
Checkpoint
that stores a LightGBM model.- Parameters
booster β The LightGBM model to store in the checkpoint.
preprocessor β A fitted preprocessor to be applied before inference.
- Returns
An
LightGBMCheckpoint
containing the specifiedEstimator
.
Examples
>>> import lightgbm >>> import numpy as np >>> from ray.train.lightgbm import LightGBMCheckpoint >>> >>> train_X = np.array([[1, 2], [3, 4]]) >>> train_y = np.array([0, 1]) >>> >>> model = lightgbm.LGBMClassifier().fit(train_X, train_y) >>> checkpoint = LightGBMCheckpoint.from_model(model.booster_)
You can use a
LightGBMCheckpoint
to create anLightGBMPredictor
and preform inference.>>> from ray.train.lightgbm import LightGBMPredictor >>> >>> predictor = LightGBMPredictor.from_checkpoint(checkpoint)
- get_model() lightgbm.basic.Booster [source]
Retrieve the LightGBM model stored in this checkpoint.
- class ray.train.lightgbm.LightGBMPredictor(model: lightgbm.basic.Booster, preprocessor: Optional[Preprocessor] = None)[source]
Bases:
ray.train.predictor.Predictor
A predictor for LightGBM models.
- Parameters
model β The LightGBM booster to use for predictions.
preprocessor β A preprocessor used to transform data batches prior to prediction.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint) ray.train.lightgbm.lightgbm_predictor.LightGBMPredictor [source]
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
LightGBMTrainer
.- Parameters
checkpoint β The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
LightGBMTrainer
run.
- predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]] [source]
Run inference on data batch.
- Parameters
data β A batch of input data.
feature_columns β The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in
data
.**predict_kwargs β Keyword arguments passed to
lightgbm.Booster.predict
.
Examples
>>> import numpy as np >>> import lightgbm as lgbm >>> from ray.train.lightgbm import LightGBMPredictor >>> >>> train_X = np.array([[1, 2], [3, 4]]) >>> train_y = np.array([0, 1]) >>> >>> model = lgbm.LGBMClassifier().fit(train_X, train_y) >>> predictor = LightGBMPredictor(model=model.booster_) >>> >>> data = np.array([[1, 2], [3, 4]]) >>> predictions = predictor.predict(data) >>> >>> # Only use first and second column as the feature >>> data = np.array([[1, 2, 8], [3, 4, 9]]) >>> predictions = predictor.predict(data, feature_columns=[0, 1])
>>> import pandas as pd >>> import lightgbm as lgbm >>> from ray.train.lightgbm import LightGBMPredictor >>> >>> train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) >>> train_y = pd.Series([0, 1]) >>> >>> model = lgbm.LGBMClassifier().fit(train_X, train_y) >>> predictor = LightGBMPredictor(model=model.booster_) >>> >>> # Pandas dataframe. >>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) >>> predictions = predictor.predict(data) >>> >>> # Only use first and second column as the feature >>> data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"]) >>> predictions = predictor.predict(data, feature_columns=["A", "B"])
- Returns
Prediction result.
TensorFlow#
- class ray.train.tensorflow.TensorflowTrainer(*args, **kwargs)[source]
Bases:
ray.train.data_parallel_trainer.DataParallelTrainer
A Trainer for data parallel Tensorflow training.
This Trainer runs the function
train_loop_per_worker
on multiple Ray Actors. These actors already have the necessary TensorFlow process group already configured for distributed TensorFlow training.The
train_loop_per_worker
function is expected to take in either 0 or 1 arguments:def train_loop_per_worker(): ...
def train_loop_per_worker(config: Dict): ...
If
train_loop_per_worker
accepts an argument, thentrain_loop_config
will be passed in as the argument. This is useful if you want to tune the values intrain_loop_config
as hyperparameters.If the
datasets
dict contains a training dataset (denoted by the βtrainβ key), then it will be split into multiple dataset shards that can then be accessed bysession.get_dataset_shard("train")
insidetrain_loop_per_worker
. All the other datasets will not be split andsession.get_dataset_shard(...)
will return the the entire Dataset.Inside the
train_loop_per_worker
function, you can use any of the Ray AIR session methods.Warning
Ray will not automatically set any environment variables or configuration related to local parallelism / threading aside from βOMP_NUM_THREADSβ. If you desire greater control over TensorFlow threading, use the
tf.config.threading
module (eg.tf.config.threading.set_inter_op_parallelism_threads(num_cpus)
) at the beginning of yourtrain_loop_per_worker
function.def train_loop_per_worker(): # Report intermediate results for callbacks or logging and # checkpoint data. session.report(...) # Returns dict of last saved checkpoint. session.get_checkpoint() # Returns the Ray Dataset shard for the given key. session.get_dataset_shard("my_dataset") # Returns the total number of workers executing training. session.get_world_size() # Returns the rank of this worker. session.get_world_rank() # Returns the rank of the worker on the current node. session.get_local_rank()
Any returns from the
train_loop_per_worker
will be discarded and not used or persisted anywhere.To save a model to use for the
TensorflowPredictor
, you must save it under the βmodelβ kwarg inCheckpoint
passed tosession.report()
.Example:
import tensorflow as tf import ray from ray.air import session, Checkpoint from ray.air.config import ScalingConfig from ray.train.tensorflow import TensorflowTrainer # If using GPUs, set this to True. use_gpu = False def build_model(): # toy neural network : 1-layer return tf.keras.Sequential( [tf.keras.layers.Dense( 1, activation="linear", input_shape=(1,))] ) def train_loop_per_worker(config): dataset_shard = session.get_dataset_shard("train") strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = build_model() model.compile( optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) tf_dataset = dataset_shard.to_tf( feature_columns="x", label_columns="y", batch_size=1 ) for epoch in range(config["num_epochs"]): model.fit(tf_dataset) # You can also use ray.air.integrations.keras.Callback # for reporting and checkpointing instead of reporting manually. session.report( {}, checkpoint=Checkpoint.from_dict( dict(epoch=epoch, model=model.get_weights()) ), ) train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)]) trainer = TensorflowTrainer( train_loop_per_worker=train_loop_per_worker, scaling_config=ScalingConfig(num_workers=3, use_gpu=use_gpu), datasets={"train": train_dataset}, train_loop_config={"num_epochs": 2}, ) result = trainer.fit()
- Parameters
train_loop_per_worker β The training function to execute. This can either take in no arguments or a
config
dict.train_loop_config β Configurations to pass into
train_loop_per_worker
if it accepts an argument.tensorflow_config β Configuration for setting up the TensorFlow backend. If set to None, use the default configuration. This replaces the
backend_config
arg ofDataParallelTrainer
.scaling_config β Configuration for how to scale data parallel training.
dataset_config β Configuration for dataset ingest.
run_config β Configuration for the execution of the training run.
datasets β Any Ray Datasets to use for training. Use the key βtrainβ to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor β A ray.data.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint β A checkpoint to resume training from.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- __init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, tensorflow_config: Optional[ray.train.tensorflow.config.TensorflowConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
- class ray.train.tensorflow.TensorflowCheckpoint(*args, **kwargs)[source]
Bases:
ray.air.checkpoint.Checkpoint
A
Checkpoint
with TensorFlow-specific functionality.Create this from a generic
Checkpoint
by callingTensorflowCheckpoint.from_checkpoint(ckpt)
.PublicAPI (beta): This API is in beta and may change before becoming stable.
- class Flavor(value)[source]
Bases:
enum.Enum
An enumeration.
- classmethod from_model(model: keras.engine.training.Model, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint [source]
Create a
Checkpoint
that stores a Keras model.The checkpoint created with this method needs to be paired with
model
when used.- Parameters
model β The Keras model, whose weights are stored in the checkpoint.
preprocessor β A fitted preprocessor to be applied before inference.
- Returns
A
TensorflowCheckpoint
containing the specified model.
Examples
>>> from ray.train.tensorflow import TensorflowCheckpoint >>> import tensorflow as tf >>> >>> model = tf.keras.applications.resnet.ResNet101() >>> checkpoint = TensorflowCheckpoint.from_model(model)
- classmethod from_h5(file_path: str, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint [source]
Create a
Checkpoint
that stores a Keras model from H5 format.The checkpoint generated by this method contains all the information needed. Thus no
model
is needed to be supplied when using this checkpoint.file_path
must maintain validity even after this function returns. Some new files/directories may be added to the parent directory offile_path
, as a side effect of this method.- Parameters
file_path β The path to the .h5 file to load model from. This is the same path that is used for
model.save(path)
.preprocessor β A fitted preprocessor to be applied before inference.
- Returns
A
TensorflowCheckpoint
converted from h5 format.
Examples
>>> import tensorflow as tf
>>> import ray >>> from ray.train.batch_predictor import BatchPredictor >>> from ray.train.tensorflow import ( ... TensorflowCheckpoint, TensorflowTrainer, TensorflowPredictor ... ) >>> from ray.air import session >>> from ray.air.config import ScalingConfig
>>> def train_func(): ... model = tf.keras.Sequential( ... [ ... tf.keras.layers.InputLayer(input_shape=()), ... tf.keras.layers.Flatten(), ... tf.keras.layers.Dense(10), ... tf.keras.layers.Dense(1), ... ] ... ) ... model.save("my_model.h5") ... checkpoint = TensorflowCheckpoint.from_h5("my_model.h5") ... session.report({"my_metric": 1}, checkpoint=checkpoint)
>>> trainer = TensorflowTrainer( ... train_loop_per_worker=train_func, ... scaling_config=ScalingConfig(num_workers=2))
>>> result_checkpoint = trainer.fit().checkpoint
>>> batch_predictor = BatchPredictor.from_checkpoint( ... result_checkpoint, TensorflowPredictor) >>> batch_predictor.predict(ray.data.range(3))
- classmethod from_saved_model(dir_path: str, *, preprocessor: Optional[Preprocessor] = None) TensorflowCheckpoint [source]
Create a
Checkpoint
that stores a Keras model from SavedModel format.The checkpoint generated by this method contains all the information needed. Thus no
model
is needed to be supplied when using this checkpoint.dir_path
must maintain validity even after this function returns. Some new files/directories may be added todir_path
, as a side effect of this method.- Parameters
dir_path β The directory containing the saved model. This is the same directory as used by
model.save(dir_path)
.preprocessor β A fitted preprocessor to be applied before inference.
- Returns
A
TensorflowCheckpoint
converted from SavedModel format.
Examples
>>> import tensorflow as tf
>>> import ray >>> from ray.train.batch_predictor import BatchPredictor >>> from ray.train.tensorflow import ( ... TensorflowCheckpoint, TensorflowTrainer, TensorflowPredictor) >>> from ray.air import session >>> from ray.air.config import ScalingConfig
>>> def train_fn(): ... model = tf.keras.Sequential( ... [ ... tf.keras.layers.InputLayer(input_shape=()), ... tf.keras.layers.Flatten(), ... tf.keras.layers.Dense(10), ... tf.keras.layers.Dense(1), ... ]) ... model.save("my_model") ... checkpoint = TensorflowCheckpoint.from_saved_model("my_model") ... session.report({"my_metric": 1}, checkpoint=checkpoint)
>>> trainer = TensorflowTrainer( ... train_loop_per_worker=train_fn, ... scaling_config=ScalingConfig(num_workers=2))
>>> result_checkpoint = trainer.fit().checkpoint
>>> batch_predictor = BatchPredictor.from_checkpoint( ... result_checkpoint, TensorflowPredictor) >>> batch_predictor.predict(ray.data.range(3))
- get_model(model: Optional[Union[keras.engine.training.Model, Callable[[], keras.engine.training.Model]]] = None, model_definition: Optional[Callable[[], keras.engine.training.Model]] = None) keras.engine.training.Model [source]
Retrieve the model stored in this checkpoint.
- Parameters
model β This arg is expected only if the original checkpoint was created via
TensorflowCheckpoint.from_model
.model_definition β This parameter is deprecated. Use
model
instead.
- Returns
The Tensorflow Keras model stored in the checkpoint.
- class ray.train.tensorflow.TensorflowConfig[source]
Bases:
ray.train.backend.BackendConfig
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.tensorflow.prepare_dataset_shard(tf_dataset_shard: tensorflow.python.data.ops.dataset_ops.DatasetV2)[source]
A utility function that overrides default config for Tensorflow Dataset.
This should be used on a TensorFlow
Dataset
created by callingiter_tf_batches()
on aray.data.Dataset
returned byray.air.session.get_dataset_shard()
since the dataset has already been sharded across the workers.- Parameters
tf_dataset_shard (tf.data.Dataset) β A TensorFlow Dataset.
- Returns
autosharding turned off
prefetching turned on with autotune enabled
- Return type
A TensorFlow Dataset with
PublicAPI (beta): This API is in beta and may change before becoming stable.
- class ray.train.tensorflow.TensorflowPredictor(*, model: Optional[keras.engine.training.Model] = None, preprocessor: Optional[Preprocessor] = None, use_gpu: bool = False)[source]
Bases:
ray.train._internal.dl_predictor.DLPredictor
A predictor for TensorFlow models.
- Parameters
model β A Tensorflow Keras model to use for predictions.
preprocessor β A preprocessor used to transform data batches prior to prediction.
model_weights β List of weights to use for the model.
use_gpu β If set, the model will be moved to GPU on instantiation and prediction happens on GPU.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model_definition: Optional[Union[Callable[[], keras.engine.training.Model], Type[keras.engine.training.Model]]] = None, use_gpu: Optional[bool] = False) ray.train.tensorflow.tensorflow_predictor.TensorflowPredictor [source]
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
TensorflowTrainer
.- Parameters
checkpoint β The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
TensorflowTrainer
run.model_definition β A callable that returns a TensorFlow Keras model to use. Model weights will be loaded from the checkpoint. This is only needed if the
checkpoint
was created fromTensorflowCheckpoint.from_model
.use_gpu β Whether GPU should be used during prediction.
- call_model(inputs: Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]]) Union[tensorflow.python.framework.ops.Tensor, Dict[str, tensorflow.python.framework.ops.Tensor]] [source]
Runs inference on a single batch of tensor data.
This method is called by
TorchPredictor.predict
after converting the original data batch to torch tensors.Override this method to add custom logic for processing the model input or output.
Example
# List outputs are not supported by default TensorflowPredictor. def build_model() -> tf.keras.Model: input = tf.keras.layers.Input(shape=1) model = tf.keras.models.Model(inputs=input, outputs=[input, input]) return model # Use a custom predictor to format model output as a dict. class CustomPredictor(TensorflowPredictor): def call_model(self, inputs): model_output = super().call_model(inputs) return { str(i): model_output[i] for i in range(len(model_output)) } predictor = CustomPredictor(model_definition=build_model) predictions = predictor.predict(data_batch)
- Parameters
inputs β A batch of data to predict on, represented as either a single TensorFlow tensor or for multi-input models, a dictionary of tensors.
- Returns
The model outputs, either as a single tensor or a dictionary of tensors.
DeveloperAPI: This API may change across minor Ray releases.
- predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], dtype: Optional[Union[tensorflow.python.framework.dtypes.DType, Dict[str, tensorflow.python.framework.dtypes.DType]]] = None) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]] [source]
Run inference on data batch.
If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single Tensorflow tensor before being inputted to the model.
If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).
- Parameters
data β A batch of input data. Either a pandas DataFrame or numpy array.
dtype β The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.
Examples
>>> import numpy as np >>> import tensorflow as tf >>> from ray.train.tensorflow import TensorflowPredictor >>> >>> def build_model(): ... return tf.keras.Sequential( ... [ ... tf.keras.layers.InputLayer(input_shape=()), ... tf.keras.layers.Flatten(), ... tf.keras.layers.Dense(1), ... ] ... ) >>> >>> weights = [np.array([[2.0]]), np.array([0.0])] >>> predictor = TensorflowPredictor(model=build_model()) >>> >>> data = np.asarray([1, 2, 3]) >>> predictions = predictor.predict(data)
>>> import pandas as pd >>> import tensorflow as tf >>> from ray.train.tensorflow import TensorflowPredictor >>> >>> def build_model(): ... input1 = tf.keras.layers.Input(shape=(1,), name="A") ... input2 = tf.keras.layers.Input(shape=(1,), name="B") ... merged = tf.keras.layers.Concatenate(axis=1)([input1, input2]) ... output = tf.keras.layers.Dense(2, input_dim=2)(merged) ... return tf.keras.models.Model( ... inputs=[input1, input2], outputs=output) >>> >>> predictor = TensorflowPredictor(model=build_model()) >>> >>> # Pandas dataframe. >>> data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) >>> >>> predictions = predictor.predict(data)
- Returns
- Prediction result. The return type will be the same as the
input type.
- Return type
DataBatchType
PyTorch#
- class ray.train.torch.TorchTrainer(*args, **kwargs)[source]
Bases:
ray.train.data_parallel_trainer.DataParallelTrainer
A Trainer for data parallel PyTorch training.
This Trainer runs the function
train_loop_per_worker
on multiple Ray Actors. These actors already have the necessary torch process group configured for distributed PyTorch training.The
train_loop_per_worker
function is expected to take in either 0 or 1 arguments:def train_loop_per_worker(): ...
from typing import Dict, Any def train_loop_per_worker(config: Dict[str, Any]): ...
If
train_loop_per_worker
accepts an argument, thentrain_loop_config
will be passed in as the argument. This is useful if you want to tune the values intrain_loop_config
as hyperparameters.If the
datasets
dict contains a training dataset (denoted by the βtrainβ key), then it will be split into multiple dataset shards that can then be accessed bysession.get_dataset_shard("train")
insidetrain_loop_per_worker
. All the other datasets will not be split andsession.get_dataset_shard(...)
will return the the entire Dataset.Inside the
train_loop_per_worker
function, you can use any of the Ray AIR session methods. See full example code below.def train_loop_per_worker(): # Report intermediate results for callbacks or logging and # checkpoint data. session.report(...) # Get dict of last saved checkpoint. session.get_checkpoint() # Session returns the Ray Dataset shard for the given key. session.get_dataset_shard("my_dataset") # Get the total number of workers executing training. session.get_world_size() # Get the rank of this worker. session.get_world_rank() # Get the rank of the worker on the current node. session.get_local_rank()
You can also use any of the Torch specific function utils, such as
ray.train.torch.get_device()
andray.train.torch.prepare_model()
def train_loop_per_worker(): # Prepares model for distribted training by wrapping in # `DistributedDataParallel` and moving to correct device. train.torch.prepare_model(...) # Configures the dataloader for distributed training by adding a # `DistributedSampler`. # You should NOT use this if you are doing # `session.get_dataset_shard(...).iter_torch_batches(...)` train.torch.prepare_data_loader(...) # Get the current torch device. train.torch.get_device()
Any returns from the
train_loop_per_worker
will be discarded and not used or persisted anywhere.To save a model to use for the
TorchPredictor
, you must save it under the βmodelβ kwarg inCheckpoint
passed tosession.report()
.Note
When you wrap the
model
withprepare_model
, the keys of itsstate_dict
are prefixed bymodule.
. For example,layer1.0.bn1.bias
becomesmodule.layer1.0.bn1.bias
. However, when savingmodel
throughsession.report()
allmodule.
prefixes are stripped. As a result, when you load from a saved checkpoint, make sure that you first loadstate_dict
to the model before callingprepare_model
. Otherwise, you will run into errors likeError(s) in loading state_dict for DistributedDataParallel: Missing key(s) in state_dict: "module.conv1.weight", ...
. See snippet below.from torchvision.models import resnet18 from ray.air import session from ray.air.checkpoint import Checkpoint import ray.train as train def train_func(): ... model = resnet18() model = train.torch.prepare_model(model) for epoch in range(3): ... ckpt = Checkpoint.from_dict({ "epoch": epoch, "model": model.state_dict(), # "model": model.module.state_dict(), # ** The above two are equivalent ** }) session.report({"foo": "bar"}, ckpt)
Example
import torch import torch.nn as nn import ray from ray import train from ray.air import session, Checkpoint from ray.train.torch import TorchTrainer from ray.air.config import ScalingConfig from ray.air.config import RunConfig from ray.air.config import CheckpointConfig # If using GPUs, set this to True. use_gpu = False # Define NN layers archicture, epochs, and number of workers input_size = 1 layer_size = 32 output_size = 1 num_epochs = 200 num_workers = 3 # Define your network structure class NeuralNetwork(nn.Module): def __init__(self): super(NeuralNetwork, self).__init__() self.layer1 = nn.Linear(input_size, layer_size) self.relu = nn.ReLU() self.layer2 = nn.Linear(layer_size, output_size) def forward(self, input): return self.layer2(self.relu(self.layer1(input))) # Define your train worker loop def train_loop_per_worker(): # Fetch training set from the session dataset_shard = session.get_dataset_shard("train") model = NeuralNetwork() # Loss function, optimizer, prepare model for training. # This moves the data and prepares model for distributed # execution loss_fn = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=0.01) model = train.torch.prepare_model(model) # Iterate over epochs and batches for epoch in range(num_epochs): for batches in dataset_shard.iter_torch_batches(batch_size=32, dtypes=torch.float, device=train.torch.get_device()): # Add batch or unsqueeze as an additional dimension [32, x] inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"] output = model(inputs) # Make output shape same as the as labels loss = loss_fn(output.squeeze(), labels) # Zero out grads, do backward, and update optimizer optimizer.zero_grad() loss.backward() optimizer.step() # Print what's happening with loss per 30 epochs if epoch % 20 == 0: print(f"epoch: {epoch}/{num_epochs}, loss: {loss:.3f}") # Report and record metrics, checkpoint model at end of each # epoch session.report({"loss": loss.item(), "epoch": epoch}, checkpoint=Checkpoint.from_dict( dict(epoch=epoch, model=model.state_dict())) ) torch.manual_seed(42) train_dataset = ray.data.from_items( [{"x": x, "y": 2 * x + 1} for x in range(200)] ) # Define scaling and run configs scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1)) trainer = TorchTrainer( train_loop_per_worker=train_loop_per_worker, scaling_config=scaling_config, run_config=run_config, datasets={"train": train_dataset}) result = trainer.fit() best_checkpoint_loss = result.metrics['loss'] # Assert loss is less 0.09 assert best_checkpoint_loss <= 0.09
- Parameters
train_loop_per_worker β The training function to execute. This can either take in no arguments or a
config
dict.train_loop_config β Configurations to pass into
train_loop_per_worker
if it accepts an argument.torch_config β Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the
backend_config
arg ofDataParallelTrainer
.scaling_config β Configuration for how to scale data parallel training.
dataset_config β Configuration for dataset ingest.
run_config β Configuration for the execution of the training run.
datasets β Any Ray Datasets to use for training. Use the key βtrainβ to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor β A
ray.data.Preprocessor
to preprocess the provided datasets.resume_from_checkpoint β A checkpoint to resume training from.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- __init__(train_loop_per_worker: Union[Callable[[], None], Callable[[Dict], None]], *, train_loop_config: Optional[Dict] = None, torch_config: Optional[ray.train.torch.config.TorchConfig] = None, scaling_config: Optional[ray.air.config.ScalingConfig] = None, dataset_config: Optional[Dict[str, ray.air.config.DatasetConfig]] = None, run_config: Optional[ray.air.config.RunConfig] = None, datasets: Optional[Dict[str, Union[Dataset, Callable[[], Dataset]]]] = None, preprocessor: Optional[Preprocessor] = None, resume_from_checkpoint: Optional[ray.air.checkpoint.Checkpoint] = None)[source]
- class ray.train.torch.TorchCheckpoint(local_path: Optional[Union[str, os.PathLike]] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None)[source]
Bases:
ray.air.checkpoint.Checkpoint
A
Checkpoint
with Torch-specific functionality.Create this from a generic
Checkpoint
by callingTorchCheckpoint.from_checkpoint(ckpt)
.PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_state_dict(state_dict: Dict[str, Any], *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint [source]
Create a
Checkpoint
that stores a model state dictionary.Tip
This is the recommended method for creating
TorchCheckpoints
.- Parameters
state_dict β The model state dictionary to store in the checkpoint.
preprocessor β A fitted preprocessor to be applied before inference.
- Returns
A
TorchCheckpoint
containing the specified state dictionary.
Examples
import torch import torch.nn as nn from ray.train.torch import TorchCheckpoint # Set manual seed torch.manual_seed(42) # Function to create a NN model def create_model() -> nn.Module: model = nn.Sequential(nn.Linear(1, 10), nn.ReLU(), nn.Linear(10,1)) return model # Create a TorchCheckpoint from our model's state_dict model = create_model() checkpoint = TorchCheckpoint.from_state_dict(model.state_dict()) # Now load the model from the TorchCheckpoint by providing the # model architecture model_from_chkpt = checkpoint.get_model(create_model()) # Assert they have the same state dict assert str(model.state_dict()) == str(model_from_chkpt.state_dict()) print("worked")
- classmethod from_model(model: torch.nn.modules.module.Module, *, preprocessor: Optional[Preprocessor] = None) TorchCheckpoint [source]
Create a
Checkpoint
that stores a Torch model.Note
PyTorch recommends storing state dictionaries. To create a
TorchCheckpoint
from a state dictionary, callfrom_state_dict()
. To learn more about state dictionaries, read Saving and Loading Models. # noqa: E501- Parameters
model β The Torch model to store in the checkpoint.
preprocessor β A fitted preprocessor to be applied before inference.
- Returns
A
TorchCheckpoint
containing the specified model.
Examples
from ray.train.torch import TorchCheckpoint from ray.train.torch import TorchPredictor import torch # Set manual seed torch.manual_seed(42) # Create model identity and send a random tensor to it model = torch.nn.Identity() input = torch.randn(2, 2) output = model(input) # Create a checkpoint checkpoint = TorchCheckpoint.from_model(model) # You can use a class TorchCheckpoint to create an # a class ray.train.torch.TorchPredictor and perform inference. predictor = TorchPredictor.from_checkpoint(checkpoint) pred = predictor.predict(input.numpy()) # Convert prediction dictionary value into a tensor pred = torch.tensor(pred['predictions']) # Assert the output from the original and checkoint model are the same assert torch.equal(output, pred) print("worked")
- get_model(model: Optional[torch.nn.modules.module.Module] = None) torch.nn.modules.module.Module [source]
Retrieve the model stored in this checkpoint.
- Parameters
model β If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this
model
. Otherwise, the model will be discarded.
- class ray.train.torch.TorchConfig(backend: Optional[str] = None, init_method: str = 'env', timeout_s: int = 1800)[source]
Bases:
ray.train.backend.BackendConfig
Configuration for torch process group setup.
See https://pytorch.org/docs/stable/distributed.html for more info.
- Parameters
backend β The backend to use for training. See
torch.distributed.init_process_group
for more info and valid values. If set to None, nccl will be used if GPUs are requested, else gloo will be used.init_method β The initialization method to use. Either βenvβ for environment variable initialization or βtcpβ for TCP initialization. Defaults to βenvβ.
timeout_s β Seconds for process group operations to timeout.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.torch.accelerate(amp: bool = False) None [source]
Enables training optimizations.
- Parameters
amp β If true, perform training with automatic mixed precision. Otherwise, use full precision.
Warning
train.torch.accelerate
cannot be called more than once, and it must be called before any othertrain.torch
utility function.PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.torch.get_device() torch.device [source]
Gets the correct torch device to use for training.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.torch.prepare_model(model: torch.nn.modules.module.Module, move_to_device: bool = True, parallel_strategy: Optional[str] = 'ddp', parallel_strategy_kwargs: Optional[Dict[str, Any]] = None) torch.nn.modules.module.Module [source]
Prepares the model for distributed execution.
This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).
- Parameters
model (torch.nn.Module) β A torch model to prepare.
move_to_device β Whether to move the model to the correct device. If set to False, the model needs to manually be moved to the correct device.
parallel_strategy ("ddp", "fsdp", or None) β Whether to wrap models in
DistributedDataParallel
,FullyShardedDataParallel
, or neither.parallel_strategy_kwargs (Dict[str, Any]) β Args to pass into
DistributedDataParallel
orFullyShardedDataParallel
initialization ifparallel_strategy
is set to βddpβ or βfsdpβ, respectively.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.torch.prepare_optimizer(optimizer: torch.optim.optimizer.Optimizer) torch.optim.optimizer.Optimizer [source]
Wraps optimizer to support automatic mixed precision.
- Parameters
optimizer (torch.optim.Optimizer) β The DataLoader to prepare.
- Returns
A wrapped optimizer.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.torch.prepare_data_loader(data_loader: torch.utils.data.dataloader.DataLoader, add_dist_sampler: bool = True, move_to_device: bool = True, auto_transfer: bool = True) torch.utils.data.dataloader.DataLoader [source]
Prepares DataLoader for distributed execution.
This allows you to use the same exact code regardless of number of workers or the device type being used (CPU, GPU).
- Parameters
data_loader (torch.utils.data.DataLoader) β The DataLoader to prepare.
add_dist_sampler β Whether to add a DistributedSampler to the provided DataLoader.
move_to_device β If set, automatically move the data returned by the data loader to the correct device.
auto_transfer β If set and device is GPU, another CUDA stream is created to automatically copy data from host (CPU) memory to device (GPU) memory (the default CUDA stream still runs the training procedure). If device is CPU, it will be disabled regardless of the setting. This configuration will be ignored if
move_to_device
is False.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.torch.backward(tensor: torch.Tensor) None [source]
Computes the gradient of the specified tensor w.r.t. graph leaves.
- Parameters
tensor (torch.Tensor) β Tensor of which the derivative will be computed.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.train.torch.enable_reproducibility(seed: int = 0) None [source]
Limits sources of nondeterministic behavior.
This function:
Seeds PyTorch, Python, and NumPy.
Disables CUDA convolution benchmarking.
Configures PyTorch to use determinstic algorithms.
Seeds workers spawned for multi-process data loading.
- Parameters
seed β The number to seed libraries and data workers with.
Warning
train.torch.enable_reproducibility()
canβt guarantee completely reproducible results across executions. To learn more, read the PyTorch notes on randomness.PublicAPI (beta): This API is in beta and may change before becoming stable.
- class ray.train.torch.TorchPredictor(model: torch.nn.modules.module.Module, preprocessor: Optional[Preprocessor] = None, use_gpu: bool = False)[source]
Bases:
ray.train._internal.dl_predictor.DLPredictor
A predictor for PyTorch models.
- Parameters
model β The torch module to use for predictions.
preprocessor β A preprocessor used to transform data batches prior to prediction.
use_gpu β If set, the model will be moved to GPU on instantiation and prediction happens on GPU.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- classmethod from_checkpoint(checkpoint: ray.air.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None, use_gpu: bool = False) ray.train.torch.torch_predictor.TorchPredictor [source]
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
TorchTrainer
.- Parameters
checkpoint β The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
TorchTrainer
run.model β If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this
model
. If the checkpoint already contains the model itself, this model argument will be discarded.use_gpu β If set, the model will be moved to GPU on instantiation and prediction happens on GPU.
- call_model(inputs: Union[torch.Tensor, Dict[str, torch.Tensor]]) Union[torch.Tensor, Dict[str, torch.Tensor]] [source]
Runs inference on a single batch of tensor data.
This method is called by
TorchPredictor.predict
after converting the original data batch to torch tensors.Override this method to add custom logic for processing the model input or output.
- Parameters
inputs β A batch of data to predict on, represented as either a single PyTorch tensor or for multi-input models, a dictionary of tensors.
- Returns
The model outputs, either as a single tensor or a dictionary of tensors.
Example
# List outputs are not supported by default TorchPredictor. # So let's define a custom TorchPredictor and override call_model class MyModel(torch.nn.Module): def forward(self, input_tensor): return [input_tensor, input_tensor] # Use a custom predictor to format model output as a dict. class CustomPredictor(TorchPredictor): def call_model(self, inputs): model_output = super().call_model(inputs) return { str(i): model_output[i] for i in range(len(model_output)) } # create our data batch data_batch = np.array([1, 2]) # create custom predictor and predict predictor = CustomPredictor(model=MyModel()) predictions = predictor.predict(data_batch) print(f"Predictions: {predictions.get('0')}, {predictions.get('1')}")
Predictions: [1 2], [1 2]
DeveloperAPI: This API may change across minor Ray releases.
- predict(data: Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]], dtype: Optional[Union[torch.dtype, Dict[str, torch.dtype]]] = None) Union[numpy.ndarray, pandas.DataFrame, Dict[str, numpy.ndarray]] [source]
Run inference on data batch.
If the provided data is a single array or a dataframe/table with a single column, it will be converted into a single PyTorch tensor before being inputted to the model.
If the provided data is a multi-column table or a dict of numpy arrays, it will be converted into a dict of tensors before being inputted to the model. This is useful for multi-modal inputs (for example your model accepts both image and text).
- Parameters
data β A batch of input data of
DataBatchType
.dtype β The dtypes to use for the tensors. Either a single dtype for all tensors or a mapping from column name to dtype.
- Returns
- Prediction result. The return type will be the same as the
input type.
- Return type
DataBatchType
Example
import numpy as np import pandas as pd import torch import ray from ray.train.torch import TorchPredictor # Define a custom PyTorch module class CustomModule(torch.nn.Module): def __init__(self): super().__init__() self.linear1 = torch.nn.Linear(1, 1) self.linear2 = torch.nn.Linear(1, 1) def forward(self, input_dict: dict): out1 = self.linear1(input_dict["A"].unsqueeze(1)) out2 = self.linear2(input_dict["B"].unsqueeze(1)) return out1 + out2 # Set manul seed so we get consistent output torch.manual_seed(42) # Use Standard PyTorch model model = torch.nn.Linear(2, 1) predictor = TorchPredictor(model=model) # Define our data data = np.array([[1, 2], [3, 4]]) predictions = predictor.predict(data, dtype=torch.float) print(f"Standard model predictions: {predictions}") print("---") # Use Custom PyTorch model with TorchPredictor predictor = TorchPredictor(model=CustomModule()) # Define our data and predict Customer model with TorchPredictor data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) predictions = predictor.predict(data, dtype=torch.float) print(f"Custom model predictions: {predictions}")
Standard model predictions: {'predictions': array([[1.5487633], [3.8037925]], dtype=float32)} --- Custom model predictions: predictions 0 [0.61623406] 1 [2.857038]
Horovod#
- class ray.train.horovod.HorovodTrainer(*args, **kwargs)[source]
Bases:
ray.train.data_parallel_trainer.DataParallelTrainer
A Trainer for data parallel Horovod training.
This Trainer runs the function
train_loop_per_worker
on multiple Ray Actors. These actors already have the necessary Horovod setup already configured for distributed Horovod training.The
train_loop_per_worker
function is expected to take in either 0 or 1 arguments:def train_loop_per_worker(): ...
def train_loop_per_worker(config: Dict): ...
If
train_loop_per_worker
accepts an argument, thentrain_loop_config
will be passed in as the argument. This is useful if you want to tune the values intrain_loop_config
as hyperparameters.If the
datasets
dict contains a training dataset (denoted by the βtrainβ key), then it will be split into multiple dataset shards that can then be accessed bysession.get_dataset_shard("train")
insidetrain_loop_per_worker
. All the other datasets will not be split andsession.get_dataset_shard(...)
will return the the entire Dataset.Inside the
train_loop_per_worker
function, you can use any of the Ray AIR session methods.def train_loop_per_worker(): # Report intermediate results for callbacks or logging and # checkpoint data. session.report(...) # Returns dict of last saved checkpoint. session.get_checkpoint() # Returns the Ray Dataset shard for the given key. session.get_dataset_shard("my_dataset") # Returns the total number of workers executing training. session.get_world_size() # Returns the rank of this worker. session.get_world_rank() # Returns the rank of the worker on the current node. session.get_local_rank()
Any returns from the
train_loop_per_worker
will be discarded and not used or persisted anywhere.You could use
TensorflowPredictor
orTorchPredictor
in conjunction with HorovodTrainer. You must save the model under the βmodelβ kwarg in theCheckpoint
passed tosession.report()
, so that it can be used by corresponding predictors.Example:
import ray import ray.train as train