AIR API¶
Components¶
Preprocessors¶
- class ray.ml.preprocessor.Preprocessor[source]¶
Implements an ML preprocessing operation.
Preprocessors are stateful objects that can be fitted against a Dataset and used to transform both local data batches and distributed datasets. For example, a Normalization preprocessor may calculate the mean and stdev of a field during fitting, and uses these attributes to implement its normalization transform.
- transform_stats() Optional[str] [source]¶
Return Dataset stats for the most recent transform call, if any.
TODO(ekl) we should also be able to provide stats for fit().
- fit(dataset: ray.data.dataset.Dataset) ray.ml.preprocessor.Preprocessor [source]¶
Fit this Preprocessor to the Dataset.
Fitted state attributes will be directly set in the Preprocessor.
Calling it more than once will overwrite all previously fitted state:
preprocessor.fit(A).fit(B)
is equivalent topreprocessor.fit(B)
.- Parameters
dataset – Input dataset.
- Returns
The fitted Preprocessor with state attributes.
- Return type
- fit_transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset [source]¶
Fit this Preprocessor to the Dataset and then transform the Dataset.
Calling it more than once will overwrite all previously fitted state:
preprocessor.fit_transform(A).fit_transform(B)
is equivalent topreprocessor.fit_transform(B)
.- Parameters
dataset – Input Dataset.
- Returns
The transformed Dataset.
- Return type
- transform(dataset: ray.data.dataset.Dataset) ray.data.dataset.Dataset [source]¶
Transform the given dataset.
- Parameters
dataset – Input Dataset.
- Returns
The transformed Dataset.
- Return type
- Raises
PreprocessorNotFittedException, if fit is not called yet. –
- class ray.ml.preprocessors.BatchMapper(fn: Callable[[pandas.DataFrame], pandas.DataFrame])[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Apply
fn
to batches of records of given dataset.This is meant to be generic and supports low level operation on records. One could easily leverage this preprocessor to achieve operations like adding a new column or modifying a column in place.
- Parameters
fn – The udf function for batch operation.
- class ray.ml.preprocessors.Categorizer(columns: Union[List[str], Dict[str, Optional[pandas.core.dtypes.dtypes.CategoricalDtype]]])[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Transform Dataset columns to Categorical data type.
Note that in case of automatic inferrence, you will most likely want to run this preprocessor on the entire dataset before splitting it (e.g. into train and test sets), so that all of the categories are inferred. There is no risk of data leakage when using this preprocessor.
- Parameters
columns – The columns whose data type to change. Can be either a list of columns, in which case the categories will be inferred automatically from the data, or a dict of column:pd.CategoricalDtype or None - if specified, the dtype will be applied, and if not, it will be automatically inferred.
- class ray.ml.preprocessors.CountVectorizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None, max_features: Optional[int] = None)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Tokenize string columns and convert into token columns.
The created columns will have names in the format
{column_name}_{token}
.Token features will be sorted by count in descending order.
- Parameters
columns – The columns that will individually be tokenized.
tokenization_fn – The tokenization function to use. If not specified, a simple
string.split(" ")
will be used.max_features – If specified, limit the number of tokens. The tokens with the largest counts will be kept.
- class ray.ml.preprocessors.Chain(*preprocessors: ray.ml.preprocessor.Preprocessor)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Chain multiple Preprocessors into a single Preprocessor.
Calling
fit
will invokefit_transform
on the input preprocessors, so that one preprocessor canfit
based on columns/values produced by thetransform
of a preceding preprocessor.- Parameters
preprocessors – The preprocessors that should be executed sequentially.
- fit_transform(ds: ray.data.dataset.Dataset) ray.data.dataset.Dataset [source]¶
Fit this Preprocessor to the Dataset and then transform the Dataset.
Calling it more than once will overwrite all previously fitted state:
preprocessor.fit_transform(A).fit_transform(B)
is equivalent topreprocessor.fit_transform(B)
.- Parameters
dataset – Input Dataset.
- Returns
The transformed Dataset.
- Return type
- class ray.ml.preprocessors.FeatureHasher(columns: List[str], num_features: int)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Hash the features of the specified columns.
The created columns will have names in the format
hash_{column_names}_{hash}
, e.g.hash_column1_column2_0
,hash_column1_column2_1
, …Note: Currently sparse matrices are not supported. Therefore, it is recommended to not use a large
num_features
.- Parameters
columns – The columns of features that should be projected onto a single hashed feature vector.
num_features – The size of the hashed feature vector.
- class ray.ml.preprocessors.HashingVectorizer(columns: List[str], num_features: int, tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Tokenize and hash string columns into token hash columns.
The created columns will have names in the format
hash_{column_name}_{hash}
.- Parameters
columns – The columns that will individually be hashed.
num_features – The size of each hashed feature vector.
tokenization_fn – The tokenization function to use. If not specified, a simple
string.split(" ")
will be used.
- class ray.ml.preprocessors.LabelEncoder(label_column: str)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Encode values within a label column as ordered integer values.
Currently, order within a column is based on the values from the fitted dataset in sorted order.
Transforming values not included in the fitted dataset will be encoded as
None
.- Parameters
label_column – The label column that will be encoded.
- class ray.ml.preprocessors.MaxAbsScaler(columns: List[str])[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Scale values within columns based on the absolute max value.
For each column, each value will be transformed to
value / abs_max
, whereabs_max
is calculated from the fitted dataset.When transforming the fitted dataset, transformed values will be in the range [-1, 1].
- Parameters
columns – The columns that will individually be scaled.
- class ray.ml.preprocessors.MinMaxScaler(columns: List[str])[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Scale values within columns based on min and max values.
For each column, each value will be transformed to
(value - min) / (max - min)
, wheremin
andmax
are calculated from the fitted dataset.When transforming the fitted dataset, transformed values will be in the range [0, 1].
- Parameters
columns – The columns that will individually be scaled.
- class ray.ml.preprocessors.Normalizer(columns: List[str], norm='l2')[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Normalize each record to have unit norm.
- Supports the following normalization types:
l1: Sum of the absolute values.
l2: Square root of the sum of the squared values.
max: Maximum value.
- Parameters
columns – The columns that in combination define the record to normalize.
norm – “l1”, “l2”, or “max”. Defaults to “l2”
- class ray.ml.preprocessors.OneHotEncoder(columns: List[str], limit: Optional[Dict[str, int]] = None)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Encode columns as new columns using one-hot encoding.
The transformed dataset will have a new column in the form
{column}_{value}
for each of the values from the fitted dataset. The value of a column will be set to 1 if the value matches, otherwise 0.Transforming values not included in the fitted dataset or not among the top popular values (see
limit
) will result in all of the encoded column values being 0.Example:
ohe = OneHotEncoder( columns=[ "trip_start_hour", "trip_start_day", "trip_start_month", "dropoff_census_tract", "pickup_community_area", "dropoff_community_area", "payment_type", "company", ], limit={ "dropoff_census_tract": 25, "pickup_community_area": 20, "dropoff_community_area": 20, "payment_type": 2, "company": 7, }, )
- Parameters
columns – The columns that will individually be encoded.
limit – If set, only the top “limit” number of most popular values become categorical variables. The less frequent ones will result in all the encoded column values being 0. This is a dict of column to its corresponding limit. The column in this dictionary has to be in
columns
.
- class ray.ml.preprocessors.OrdinalEncoder(columns: List[str])[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Encode values within columns as ordered integer values.
Currently, order within a column is based on the values from the fitted dataset in sorted order.
Transforming values not included in the fitted dataset will be encoded as
None
.- Parameters
columns – The columns that will individually be encoded.
- class ray.ml.preprocessors.PowerTransformer(columns: List[str], power: float, method: str = 'yeo-johnson')[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Apply power function to make data more normally distributed.
See https://en.wikipedia.org/wiki/Power_transform.
- Supports the following methods:
Yeo-Johnson (positive and negative numbers)
Box-Cox (positive numbers only)
Currently, this requires the user to specify the
power
parameter. In the future, an optimal value can be determined infit()
.- Parameters
columns – The columns that will individually be transformed.
power – The power parameter which is used as the exponent.
method – Supports “yeo-johnson” and “box-cox”. Defaults to “yeo-johnson”.
- class ray.ml.preprocessors.RobustScaler(columns: List[str], quantile_range: Tuple[float, float] = (0.25, 0.75))[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Scale values within columns based on their quantile range.
For each column, each value will be transformed to
(value - median) / (high_quantile - low_quantile)
, wheremedian
,high_quantile
, andlow_quantile
are calculated from the fitted dataset.- Parameters
columns – The columns that will be scaled individually.
quantile_range – A tuple that defines the lower and upper quantile to scale to. Defaults to the 1st and 3rd quartiles: (0.25, 0.75).
- class ray.ml.preprocessors.SimpleImputer(columns: List[str], strategy: str = 'mean', fill_value: Optional[Union[str, numbers.Number]] = None)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Populate missing values within columns.
- Parameters
columns – The columns that will individually be imputed.
strategy – The strategy to compute the value to impute. - “mean”: The mean of the column (numeric only). - “most_frequent”: The most used value of the column (string or numeric). - “constant”: The value of fill_value (string or numeric).
fill_value – The value to use when strategy is “constant”.
- class ray.ml.preprocessors.StandardScaler(columns: List[str], ddof=0)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Scale values within columns based on mean and standard deviation.
For each column, each value will be transformed to
(value-mean)/std
, wheremean
andstd
are calculated from the fitted dataset.- Parameters
columns – The columns that will individually be scaled.
ddof – The delta degrees of freedom used to calculate standard deviation.
- class ray.ml.preprocessors.Tokenizer(columns: List[str], tokenization_fn: Optional[Callable[[str], List[str]]] = None)[source]¶
Bases:
ray.ml.preprocessor.Preprocessor
Tokenize string columns.
Each string entry will be replaced with a list of tokens.
- Parameters
columns – The columns that will individually be tokenized.
tokenization_fn – The tokenization function to use. If not specified, a simple
string.split(" ")
will be used.
- ray.ml.train_test_split(dataset: ray.data.dataset.Dataset, test_size: Union[int, float], *, shuffle: bool = False, seed: Optional[int] = None) Tuple[ray.data.dataset.Dataset, ray.data.dataset.Dataset] [source]¶
Split a Dataset into train and test subsets.
Example
import ray from ray.ml import train_test_split ds = ray.data.range(8) train, test = train_test_split(ds, test_size=0.25) print(train.take()) # [0, 1, 2, 3, 4, 5] print(test.take()) # [6, 7]
- Parameters
dataset – Dataset to split.
test_size – If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. The train split will always be the compliment of the test split.
shuffle – Whether or not to globally shuffle the dataset before splitting. Defaults to False. This may be a very expensive operation with large datasets.
seed – Fix the random seed to use for shuffle, otherwise one will be chosen based on system randomness. Ignored if
shuffle=False
.
- Returns
Train and test subsets as two Datasets.
Trainer¶
- class ray.ml.trainer.Trainer(*args, **kwargs)[source]¶
Defines interface for distributed training on Ray.
Note: The base
Trainer
class cannot be instantiated directly. Only one of its subclasses can be used.How does a trainer work?
First, initialize the Trainer. The initialization runs locally, so heavyweight setup should not be done in __init__.
Then, when you call
trainer.fit()
, the Trainer is serialized and copied to a remote Ray actor. The following methods are then called in sequence on the remote actor.trainer.setup()
: Any heavyweight Trainer setup should be specified here.trainer.preprocess_datasets()
: The provided ray.data.Dataset are preprocessed with the provided ray.ml.preprocessor.trainer.train_loop()
: Executes the main training logic.Calling
trainer.fit()
will return aray.result.Result
object where you can access metrics from your training run, as well as any checkpoints that may have been saved.
How do I create a new Trainer?
Subclass
ray.train.Trainer
, and override thetraining_loop
method, and optionallysetup
.import torch from ray.ml.train import Trainer from ray import tune class MyPytorchTrainer(Trainer): def setup(self): self.model = torch.nn.Linear(1, 1) self.optimizer = torch.optim.SGD( self.model.parameters(), lr=0.1) def training_loop(self): # You can access any Trainer attributes directly in this method. # self.datasets["train"] has already been # preprocessed by self.preprocessor dataset = self.datasets["train"] torch_ds = dataset.to_torch(label_column="y") loss_fn = torch.nn.MSELoss() for epoch_idx in range(10): loss = 0 num_batches = 0 for X, y in iter(torch_ds): # Compute prediction error pred = self.model(X) batch_loss = loss_fn(pred, y.float()) # Backpropagation self.optimizer.zero_grad() batch_loss.backward() self.optimizer.step() loss += batch_loss.item() num_batches += 1 loss /= num_batches # Use Tune functions to report intermediate # results. tune.report(loss=loss, epoch=epoch_idx)
How do I use an existing Trainer or one of my custom Trainers?
Initialize the Trainer, and call Trainer.fit()
import ray train_dataset = ray.data.from_items( [{"x": i, "y": i} for i in range(3)]) my_trainer = MyPytorchTrainer(datasets={"train": train_dataset}) result = my_trainer.fit()
- Parameters
scaling_config – Configuration for how to scale training.
run_config – Configuration for the execution of the training run.
datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor – A preprocessor to preprocess the provided datasets.
resume_from_checkpoint – A checkpoint to resume training from.
DeveloperAPI: This API may change across minor Ray releases.
- setup() None [source]¶
Called during fit() to perform initial setup on the Trainer.
Note: this method is run on a remote process.
This method will not be called on the driver, so any expensive setup operations should be placed here and not in
__init__
.This method is called prior to
preprocess_datasets
andtraining_loop
.
- preprocess_datasets() None [source]¶
Called during fit() to preprocess dataset attributes with preprocessor.
Note: This method is run on a remote process.
This method is called prior to entering the training_loop.
If the
Trainer
has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.Then, all Trainer’s datasets will be transformed by the preprocessor.
The transformed datasets will be set back in the
self.datasets
attribute of the Trainer to be used when overridingtraining_loop
.
- abstract training_loop() None [source]¶
Loop called by fit() to run training and report results to Tune.
Note: this method runs on a remote process.
self.datasets
have already been preprocessed byself.preprocessor
.You can use the Tune Function API functions (
tune.report()
andtune.save_checkpoint()
) inside this training loop.Example
- fit() ray.ml.result.Result [source]¶
Runs training.
- Returns:
A Result object containing the training result.
- Raises:
TrainingFailedError: If any failures during the execution of
self.as_trainable()
.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- as_trainable() Type[ray.tune.trainable.Trainable] [source]¶
Convert self to a
tune.Trainable
class.
- class ray.ml.train.integrations.xgboost.XGBoostTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.train.gbdt_trainer.GBDTTrainer
A Trainer for data parallel XGBoost training.
This Trainer runs the XGBoost training loop in a distributed manner using multiple Ray Actors.
Example
import ray from ray.ml.train.integrations.xgboost import XGBoostTrainer train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config={"num_workers": 3}, datasets={"train": train_dataset} ) result = trainer.fit()
- Parameters
datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.label_column – Name of the label column. A column with this name must be present in the training dataset.
params – XGBoost training parameters. Refer to XGBoost documentation for a list of possible parameters.
dmatrix_params – Dict of
dataset name:dict of kwargs
passed to respectivexgboost_ray.RayDMatrix
initializations, which in turn are passed toxgboost.DMatrix
objects created on each worker. For example, this can be used to add sample weights with theweights
parameter.scaling_config – Configuration for how to scale data parallel training.
run_config – Configuration for the execution of the training run.
preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint – A checkpoint to resume training from.
**train_kwargs – Additional kwargs passed to
xgboost.train()
function.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.ml.train.integrations.xgboost.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) Tuple[xgboost.core.Booster, Optional[ray.ml.preprocessor.Preprocessor]] [source]¶
Load a Checkpoint from
XGBoostTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
XGBoostTrainer
run.- Returns
The model and AIR preprocessor contained within.
- class ray.ml.train.integrations.lightgbm.LightGBMTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.train.gbdt_trainer.GBDTTrainer
A Trainer for data parallel LightGBM training.
This Trainer runs the LightGBM training loop in a distributed manner using multiple Ray Actors.
If you would like to take advantage of LightGBM’s built-in handling for features with the categorical data type, consider using the
Categorizer
preprocessor to set the dtypes in the dataset.Example
import ray from ray.ml.train.integrations.lightgbm import LightGBMTrainer train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = LightGBMTrainer( label_column="y", params={"objective": "regression"}, scaling_config={"num_workers": 3}, datasets={"train": train_dataset} ) result = trainer.fit()
- Parameters
datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.label_column – Name of the label column. A column with this name must be present in the training dataset.
params – LightGBM training parameters passed to
lightgbm.train()
. Refer to LightGBM documentation for a list of possible parameters.dmatrix_params – Dict of
dataset name:dict of kwargs
passed to respectivexgboost_ray.RayDMatrix
initializations, which in turn are passed tolightgbm.Dataset
objects created on each worker. For example, this can be used to add sample weights with theweights
parameter.scaling_config – Configuration for how to scale data parallel training.
run_config – Configuration for the execution of the training run.
preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint – A checkpoint to resume training from.
**train_kwargs – Additional kwargs passed to
lightgbm.train()
function.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.ml.train.integrations.lightgbm.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) Tuple[lightgbm.basic.Booster, Optional[ray.ml.preprocessor.Preprocessor]] [source]¶
Load a Checkpoint from
LightGBMTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
LightGBMTrainer
run.- Returns
The model and AIR preprocessor contained within.
- class ray.ml.train.integrations.tensorflow.TensorflowTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.train.data_parallel_trainer.DataParallelTrainer
A Trainer for data parallel Tensorflow training.
This Trainer runs the function
train_loop_per_worker
on multiple Ray Actors. These actors already have the necessary TensorFlow process group already configured for distributed TensorFlow training.The
train_loop_per_worker
function is expected to take in either 0 or 1 arguments:def train_loop_per_worker(): ...
def train_loop_per_worker(config: Dict): ...
If
train_loop_per_worker
accepts an argument, thentrain_loop_config
will be passed in as the argument. This is useful if you want to tune the values intrain_loop_config
as hyperparameters.If the
datasets
dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed byray.train.get_dataset_shard("train")
insidetrain_loop_per_worker
. All the other datasets will not be split andray.train.get_dataset_shard(...)
will return the the entire Dataset.Inside the
train_loop_per_worker
function, you can use any of the Ray Train function utils.def train_loop_per_worker(): # Report intermediate results for callbacks or logging. train.report(...) # Checkpoints the provided args as restorable state. train.save_checkpoint(...) # Returns dict of last saved checkpoint. train.load_checkpoint() # Returns the Ray Dataset shard for the given key. train.get_dataset_shard("my_dataset") # Returns the total number of workers executing training. train.get_world_size() # Returns the rank of this worker. train.get_world_rank() # Returns the rank of the worker on the current node. train.get_local_rank()
You can also use any of the TensorFlow specific function utils.
def train_loop_per_worker(): # Turns off autosharding for a dataset. # You should use this if you are doing # `train.get_dataset_shard(...).to_tf(...)` # as the data will be already sharded. train.tensorflow.prepare_dataset_shard(...)
To save a model to use for the
TensorflowPredictor
, you must save it under the “model” kwarg intrain.save_checkpoint()
.Example:
import tensorflow as tf import ray from ray import train from ray.train.tensorflow import prepare_dataset_shard from ray.ml.train.integrations.tensorflow import TensorflowTrainer input_size = 1 def build_model(): # toy neural network : 1-layer return tf.keras.Sequential( [tf.keras.layers.Dense( 1, activation="linear", input_shape=(input_size,))] ) def train_loop_for_worker(config): dataset_shard = train.get_dataset_shard("train") strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = build_model() model.compile( optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) for epoch in range(config["num_epochs"]): tf_dataset = prepare_dataset_shard( dataset_shard.to_tf( label_column="y", output_signature=( tf.TensorSpec(shape=(None, 1), dtype=tf.float32), tf.TensorSpec(shape=(None), dtype=tf.float32), ), batch_size=1, ) ) model.fit(tf_dataset) train.save_checkpoint( epoch=epoch, model=model.get_weights()) train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = TensorflowTrainer(scaling_config={"num_workers": 3}, datasets={"train": train_dataset}, train_loop_config={"num_epochs": 2}) result = trainer.fit()
- Parameters
train_loop_per_worker – The training function to execute. This can either take in no arguments or a
config
dict.train_loop_config – Configurations to pass into
train_loop_per_worker
if it accepts an argument.tensorflow_config – Configuration for setting up the TensorFlow backend. If set to None, use the default configuration. This replaces the
backend_config
arg ofDataParallelTrainer
.scaling_config – Configuration for how to scale data parallel training.
run_config – Configuration for the execution of the training run.
datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint – A checkpoint to resume training from.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.ml.train.integrations.tensorflow.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Union[Callable[[], <MagicMock name='mock.keras.Model' id='140040349448656'>], Type[<MagicMock name='mock.keras.Model' id='140040349448656'>], <MagicMock name='mock.keras.Model' id='140040349448656'>]) Tuple[<MagicMock name='mock.keras.Model' id='140040349448656'>, Optional[ray.ml.preprocessor.Preprocessor]] [source]¶
Load a Checkpoint from
TensorflowTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
TensorflowTrainer
run.model – A callable that returns a TensorFlow Keras model to use, or an instantiated model. Model weights will be loaded from the checkpoint.
- Returns
The model with set weights and AIR preprocessor contained within.
- class ray.ml.train.integrations.torch.TorchTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.train.data_parallel_trainer.DataParallelTrainer
A Trainer for data parallel PyTorch training.
This Trainer runs the function
train_loop_per_worker
on multiple Ray Actors. These actors already have the necessary torch process group already configured for distributed PyTorch training.The
train_loop_per_worker
function is expected to take in either 0 or 1 arguments:def train_loop_per_worker(): ...
def train_loop_per_worker(config: Dict): ...
If
train_loop_per_worker
accepts an argument, thentrain_loop_config
will be passed in as the argument. This is useful if you want to tune the values intrain_loop_config
as hyperparameters.If the
datasets
dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed byray.train.get_dataset_shard("train")
insidetrain_loop_per_worker
. All the other datasets will not be split andray.train.get_dataset_shard(...)
will return the the entire Dataset.Inside the
train_loop_per_worker
function, you can use any of the Ray Train function utils.def train_loop_per_worker(): # Report intermediate results for callbacks or logging. train.report(...) # Checkpoints the provided args as restorable state. train.save_checkpoint(...) # Returns dict of last saved checkpoint. train.load_checkpoint() # Returns the Ray Dataset shard for the given key. train.get_dataset_shard("my_dataset") # Returns the total number of workers executing training. train.get_world_size() # Returns the rank of this worker. train.get_world_rank() # Returns the rank of the worker on the current node. train.get_local_rank()
You can also use any of the Torch specific function utils.
def train_loop_per_worker(): # Prepares model for distribted training by wrapping in # `DistributedDataParallel` and moving to correct device. train.torch.prepare_model(...) # Configures the dataloader for distributed training by adding a # `DistributedSampler`. # You should NOT use this if you are doing # `train.get_dataset_shard(...).to_torch(...)` train.torch.prepare_data_loader(...) # Returns the current torch device. train.torch.get_device()
To save a model to use for the
TorchPredictor
, you must save it under the “model” kwarg intrain.save_checkpoint()
.Example
import torch import torch.nn as nn import ray from ray import train from ray.ml.train.integrations.torch import TorchTrainer input_size = 1 layer_size = 15 output_size = 1 num_epochs = 3 class NeuralNetwork(nn.Module): def __init__(self): super(NeuralNetwork, self).__init__() self.layer1 = nn.Linear(input_size, layer_size) self.relu = nn.ReLU() self.layer2 = nn.Linear(layer_size, output_size) def forward(self, input): return self.layer2(self.relu(self.layer1(input))) def train_loop_per_worker(): dataset_shard = train.get_dataset_shard("train") model = NeuralNetwork() loss_fn = nn.MSELoss() optimizer = optim.SGD(model.parameters(), lr=0.1) model = train.torch.prepare_model(model) for epoch in range(num_epochs): for batch in iter(dataset_shard.to_torch(batch_size=32)): output = model(input) loss = loss_fn(output, labels) optimizer.zero_grad() loss.backward() optimizer.step() print(f"epoch: {epoch}, loss: {loss.item()}") train.save_checkpoint(model=model.state_dict()) train_dataset = ray.data.from_items([1, 2, 3]) scaling_config = {"num_workers": 3} # If using GPUs, use the below scaling config instead. # scaling_config = {"num_workers": 3, "use_gpu": True} trainer = TorchTrainer( train_loop_per_worker=train_loop_per_worker, scaling_config=scaling_config, datasets={"train": train_dataset}) result = trainer.fit()
- Parameters
train_loop_per_worker – The training function to execute. This can either take in no arguments or a
config
dict.train_loop_config – Configurations to pass into
train_loop_per_worker
if it accepts an argument.torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the
backend_config
arg ofDataParallelTrainer
.scaling_config – Configuration for how to scale data parallel training.
run_config – Configuration for the execution of the training run.
datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor – A
ray.ml.preprocessor.Preprocessor
to preprocess the provided datasets.resume_from_checkpoint – A checkpoint to resume training from.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.ml.train.integrations.torch.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None) Tuple[torch.nn.modules.module.Module, Optional[ray.ml.preprocessor.Preprocessor]] [source]¶
Load a Checkpoint from
TorchTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
TorchTrainer
run.model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this
model
.
- Returns
The model with set weights and AIR preprocessor contained within.
- class ray.ml.train.integrations.huggingface.HuggingFaceTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.train.integrations.torch.torch_trainer.TorchTrainer
A Trainer for data parallel HuggingFace Transformers on PyTorch training.
This Trainer runs the
transformers.Trainer.train()
method on multiple Ray Actors. The training is carried out in a distributed fashion through PyTorch DDP. These actors already have the necessary torch process group already configured for distributed PyTorch training.The training function ran on every Actor will first run the specified
trainer_init_per_worker
function to obtain an instantiatedtransformers.Trainer
object. Thetrainer_init_per_worker
function will have access to preprocessed train and evaluation datsets.If the
datasets
dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards, with each Actor training on a single shard. All the other datasets will not be split.Please note that if you use a custom
transformers.Trainer
subclass, theget_train_dataloader
method will be wrapped around to disable sharding bytransformers.IterableDatasetShard
, as the dataset will already be sharded on the Ray AIR side.HuggingFace loggers will be automatically disabled, and the
local_rank
argument inTrainingArguments
will be automatically set. Please note that if you want to use CPU training, you will need to set theno_cuda
argument inTrainingArguments
manually - otherwise, an exception (segfault) may be thrown. Furthermore, ‘steps’ value forsave_strategy
,logging_strategy
andevaluation_strategy
is not yet supported.This Trainer requires
transformers>=4.19.0
package.Example
# Based on # huggingface/notebooks/examples/language_modeling_from_scratch.ipynb # Hugging Face imports from datasets import load_dataset import transformers from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer import ray from ray.ml.train.integrations.huggingface import HuggingFaceTrainer model_checkpoint = "gpt2" tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer" block_size = 128 datasets = load_dataset("wikitext", "wikitext-2-raw-v1") tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint) def tokenize_function(examples): return tokenizer(examples["text"]) tokenized_datasets = datasets.map( tokenize_function, batched=True, num_proc=1, remove_columns=["text"] ) def group_texts(examples): # Concatenate all texts. concatenated_examples = { k: sum(examples[k], []) for k in examples.keys() } total_length = len(concatenated_examples[list(examples.keys())[0]]) # We drop the small remainder, we could add padding if the model # supported it. # instead of this drop, you can customize this part to your needs. total_length = (total_length // block_size) * block_size # Split by chunks of max_len. result = { k: [ t[i : i + block_size] for i in range(0, total_length, block_size) ] for k, t in concatenated_examples.items() } result["labels"] = result["input_ids"].copy() return result lm_datasets = tokenized_datasets.map( group_texts, batched=True, batch_size=1000, num_proc=1, ) ray_train_ds = ray.data.from_huggingface(lm_datasets["train"]) ray_evaluation_ds = ray.data.from_huggingface( lm_datasets["evaluation"] ) def trainer_init_per_worker(train_dataset, eval_dataset, **config): model_config = AutoConfig.from_pretrained(model_checkpoint) model = AutoModelForCausalLM.from_config(model_config) args = transformers.TrainingArguments( output_dir=f"{model_checkpoint}-wikitext2", evaluation_strategy="epoch", learning_rate=2e-5, weight_decay=0.01, ) return transformers.Trainer( model=model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, ) scaling_config = {"num_workers": 3} # If using GPUs, use the below scaling config instead. # scaling_config = {"num_workers": 3, "use_gpu": True} trainer = HuggingFaceTrainer( trainer_init_per_worker=trainer_init_per_worker, scaling_config=scaling_config, datasets={"train": ray_train_ds, "evaluation": ray_evaluation_ds}, ) result = trainer.fit()
- Parameters
trainer_init_per_worker – The function that returns an instantiated
transformers.Trainer
object and takes in the following arguments: trainTorch.Dataset
, optional evaluationTorch.Dataset
and config as kwargs. The Torch Datasets are automatically created by converting the Ray Datasets internally before they are passed into the function.datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset and (optionally) key “evaluation” to denote the evaluation dataset. Can only contain a training dataset and up to one extra dataset to be used for evaluation. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.trainer_init_config – Configurations to pass into
trainer_init_per_worker
as kwargs.torch_config – Configuration for setting up the PyTorch backend. If set to None, use the default configuration. This replaces the
backend_config
arg ofDataParallelTrainer
. Same as inTorchTrainer
.scaling_config – Configuration for how to scale data parallel training.
run_config – Configuration for the execution of the training run.
preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint – A checkpoint to resume training from.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- setup() None [source]¶
Called during fit() to perform initial setup on the Trainer.
Note: this method is run on a remote process.
This method will not be called on the driver, so any expensive setup operations should be placed here and not in
__init__
.This method is called prior to
preprocess_datasets
andtraining_loop
.
- as_trainable() Type[ray.tune.trainable.Trainable] [source]¶
Convert self to a
tune.Trainable
class.
- ray.ml.train.integrations.huggingface.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Union[Type[<MagicMock name='mock.modeling_utils.PreTrainedModel' id='140038625055632'>], torch.nn.modules.module.Module], tokenizer: Optional[Type[<MagicMock name='mock.PreTrainedTokenizer' id='140038624607184'>]] = None, *, tokenizer_kwargs: Optional[Dict[str, Any]] = None, **pretrained_model_kwargs) Tuple[Union[<MagicMock name='mock.modeling_utils.PreTrainedModel' id='140038625055632'>, torch.nn.modules.module.Module], <MagicMock name='mock.training_args.TrainingArguments' id='140038625177040'>, Optional[<MagicMock name='mock.PreTrainedTokenizer' id='140038624607184'>], Optional[ray.ml.preprocessor.Preprocessor]] [source]¶
Load a Checkpoint from
HuggingFaceTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
HuggingFaceTrainer
run.model – Either a
transformers.PreTrainedModel
class (eg.AutoModelForCausalLM
), or a PyTorch model to load the weights to. This should be the same model used for training.tokenizer – A
transformers.PreTrainedTokenizer
class to load the model tokenizer to. If not specified, the tokenizer will not be loaded. Will throw an exception if specified, but no tokenizer was found in the checkpoint.tokenizer_kwargs – Dict of kwargs to pass to
tokenizer.from_pretrained
call. Ignored iftokenizer
is None.**pretrained_model_kwargs – Kwargs to pass to
mode.from_pretrained
call. Ignored ifmodel
is not atransformers.PreTrainedModel
class.
- Returns
The model,
TrainingArguments
, tokenizer and AIR preprocessor contained within. Those can be used to initialize atransformers.Trainer
object locally.
- class ray.ml.train.integrations.sklearn.SklearnTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.trainer.Trainer
A Trainer for scikit-learn estimator training.
This Trainer runs the
fit
method of the given estimator in a non-distributed manner on a single Ray Actor.By default, the
n_jobs
(orthread_count
) estimator parameters will be set to match the number of CPUs assigned to the Ray Actor. This behavior can be disabled by settingset_estimator_cpus=False
.If you wish to use GPU-enabled estimators (eg. cuML), make sure to set
"GPU": 1
inscaling_config.trainer_resources
.The results are reported all at once and not in an iterative fashion. No checkpointing is done during training. This may be changed in the future.
Example
import ray from ray.ml.train.integrations.sklearn import SklearnTrainer from sklearn.ensemble import RandomForestRegressor train_dataset = ray.data.from_items( [{"x": x, "y": x + 1} for x in range(32)]) trainer = SklearnTrainer( sklearn_estimator=RandomForestRegressor, label_column="y", scaling_config={ "trainer_resources": {"CPU": 4} }, datasets={"train": train_dataset} ) result = trainer.fit()
- Parameters
estimator – A scikit-learn compatible estimator to use.
datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided. All non-training datasets will be used as separate validation sets, each reporting separate metrics.label_column – Name of the label column. A column with this name must be present in the training dataset. If None, no validation will be performed.
params – Optional dict of params to be set on the estimator before fitting. Useful for hyperparameter tuning.
scoring –
Strategy to evaluate the performance of the model on the validation sets and for cross-validation. Same as in
sklearn.model_selection.cross_validation
. Ifscoring
represents a single score, one can use:a single string;
a callable that returns a single value.
If
scoring
represents multiple scores, one can use:a list or tuple of unique strings;
a callable returning a dictionary where the keys are the metric names and the values are the metric scores;
a dictionary with metric names as keys and callables a values.
cv –
Determines the cross-validation splitting strategy. If specified, cross-validation will be run on the train dataset, in addition to computing metrics for validation datasets. Same as in
sklearn.model_selection.cross_validation
, with the exception of None. Possible inputs forcv
are:None, to skip cross-validation.
int, to specify the number of folds in a
(Stratified)KFold
,CV splitter
,An iterable yielding (train, test) splits as arrays of indices.
For int/None inputs, if the estimator is a classifier and
y
is either binary or multiclass,StratifiedKFold
is used. In all other cases,KFold
is used. These splitters are instantiated withshuffle=False
so the splits will be the same across calls.If you provide a “cv_groups” column in the train dataset, it will be used as group labels for the samples used while splitting the dataset into train/test set. Only used in conjunction with a “Group”
cv
instance (e.g.,GroupKFold
). This corresponds to thegroups
argument insklearn.model_selection.cross_validation
.return_train_score_cv – Whether to also return train scores during cross-validation. Ignored if
cv
is None.parallelize_cv – If set to True, will parallelize cross-validation instead of the estimator. If set to None, will detect if the estimator has any parallelism-related params (
n_jobs
orthread_count
) and parallelize cross-validation if there are none. If False, will not parallelize cross-validation. Cannot be set to True if there are any GPUs assigned to the trainer. Ignored ifcv
is None.set_estimator_cpus – If set to True, will automatically set the values of all
n_jobs
andthread_count
parameters in the estimator (including in nested objects) to match the number of available CPUs.scaling_config – Configuration for how to scale training. Only the
trainer_resources
key can be provided, as the training is not distributed.run_config – Configuration for the execution of the training run.
preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.
**fit_params – Additional kwargs passed to
estimator.fit()
method.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- training_loop() None [source]¶
Loop called by fit() to run training and report results to Tune.
Note: this method runs on a remote process.
self.datasets
have already been preprocessed byself.preprocessor
.You can use the Tune Function API functions (
tune.report()
andtune.save_checkpoint()
) inside this training loop.Example
- ray.ml.train.integrations.sklearn.load_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) Tuple[sklearn.base.BaseEstimator, Optional[ray.ml.preprocessor.Preprocessor]] [source]¶
Load a Checkpoint from
SklearnTrainer
.- Parameters
checkpoint – The checkpoint to load the estimator and preprocessor from. It is expected to be from the result of a
SklearnTrainer
run.- Returns
The estimator and AIR preprocessor contained within.
- class ray.ml.train.data_parallel_trainer.DataParallelTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.trainer.Trainer
A Trainer for data parallel training.
You should subclass this Trainer if your Trainer follows SPMD (single program, multiple data) programming paradigm - you want multiple processes to run the same function, but on different data.
This Trainer runs the function
train_loop_per_worker
on multiple Ray Actors.The
train_loop_per_worker
function is expected to take in either 0 or 1 arguments:def train_loop_per_worker(): ...
def train_loop_per_worker(config: Dict): ...
If
train_loop_per_worker
accepts an argument, thentrain_loop_config
will be passed in as the argument. This is useful if you want to tune the values intrain_loop_config
as hyperparameters.If the
datasets
dict contains a training dataset (denoted by the “train” key), then it will be split into multiple dataset shards that can then be accessed byray.train.get_dataset_shard("train")
insidetrain_loop_per_worker
. All the other datasets will not be split andray.train.get_dataset_shard(...)
will return the the entire Dataset.Inside the
train_loop_per_worker
function, you can use any of the Ray Train function utils.def train_loop_per_worker(): # Report intermediate results for callbacks or logging. train.report(...) # Checkpoints the provided args as restorable state. train.save_checkpoint(...) # Returns dict of last saved checkpoint. train.load_checkpoint() # Returns the Ray Dataset shard for the given key. train.get_dataset_shard("my_dataset") # Returns the total number of workers executing training. train.get_world_size() # Returns the rank of this worker. train.get_world_rank() # Returns the rank of the worker on the current node. train.get_local_rank()
How do I use ``DataParallelTrainer`` or any of its subclasses?
Example:
import ray from ray import train def train_loop_for_worker(): dataset_shard_for_this_worker = train.get_dataset_shard("train") assert len(dataset_shard_for_this_worker) == 1 train_dataset = ray.data.from_items([1, 2, 3]) assert len(train_dataset) == 3 trainer = DataParallelTrainer(scaling_config={"num_workers": 3}, datasets={"train": train_dataset}) result = trainer.fit()
How do I develop on top of ``DataParallelTrainer``?
In many cases, using DataParallelTrainer directly is sufficient to execute functions on multiple actors.
However, you may want to subclass
DataParallelTrainer
and create a custom Trainer for the following 2 use cases:Use Case 1: You want to do data parallel training, but want to have a predefined
training_loop_per_worker
.Use Case 2: You want to implement a custom Training backend that automatically handles additional setup or teardown logic on each actor, so that the users of this new trainer do not have to implement this logic. For example, a
TensorflowTrainer
can be built on top ofDataParallelTrainer
that automatically handles setting the proper environment variables for distributed Tensorflow on each actor.
For 1, you can set a predefined training loop in __init__
from ray.ml.train.data_parallel_trainer import DataParallelTrainer class MyDataParallelTrainer(DataParallelTrainer): def __init__(self, *args, **kwargs): predefined_train_loop_per_worker = lambda: 1 super().__init__(predefined_train_loop_per_worker, *args, **kwargs)
For 2, you can implement the
ray.train.Backend
andray.train.BackendConfig
interfaces.from dataclasses import dataclass from ray.train.backend import Backend, BackendConfig class MyBackend(Backend): def on_start(self, worker_group, backend_config): def set_env_var(env_var_value): import os os.environ["MY_ENV_VAR"] = env_var_value worker_group.execute(set_env_var, backend_config.env_var) @dataclass class MyBackendConfig(BackendConfig): env_var: str = "default_value" def backend_cls(self): return MyBackend class MyTrainer(DataParallelTrainer): def __init__(self, train_loop_per_worker, my_backend_config: MyBackendConfig, **kwargs): super().__init__( train_loop_per_worker, backend_config=my_backend_config, **kwargs)
- Parameters
train_loop_per_worker – The training function to execute. This can either take in no arguments or a
config
dict.train_loop_config – Configurations to pass into
train_loop_per_worker
if it accepts an argument.backend_config – Configuration for setting up a Backend (e.g. Torch, Tensorflow, Horovod) on each worker to enable distributed communication. If no Backend should be set up, then set this to None.
scaling_config – Configuration for how to scale data parallel training.
run_config – Configuration for the execution of the training run.
datasets – Any Ray Datasets to use for training. Use the key “train” to denote which dataset is the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided.preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint – A checkpoint to resume training from.
DeveloperAPI: This API may change across minor Ray releases.
- training_loop() None [source]¶
Loop called by fit() to run training and report results to Tune.
Note: this method runs on a remote process.
self.datasets
have already been preprocessed byself.preprocessor
.You can use the Tune Function API functions (
tune.report()
andtune.save_checkpoint()
) inside this training loop.Example
- class ray.ml.train.gbdt_trainer.GBDTTrainer(*args, **kwargs)[source]¶
Bases:
ray.ml.trainer.Trainer
Common logic for gradient-boosting decision tree (GBDT) frameworks like XGBoost-Ray and LightGBM-Ray.
- Parameters
datasets – Ray Datasets to use for training and validation. Must include a “train” key denoting the training dataset. If a
preprocessor
is provided and has not already been fit, it will be fit on the training dataset. All datasets will be transformed by thepreprocessor
if one is provided. All non-training datasets will be used as separate validation sets, each reporting a separate metric.label_column – Name of the label column. A column with this name must be present in the training dataset.
params – Framework specific training parameters.
dmatrix_params – Dict of
dataset name:dict of kwargs
passed to respectivexgboost_ray.RayDMatrix
initializations.scaling_config – Configuration for how to scale data parallel training.
run_config – Configuration for the execution of the training run.
preprocessor – A ray.ml.preprocessor.Preprocessor to preprocess the provided datasets.
resume_from_checkpoint – A checkpoint to resume training from.
**train_kwargs – Additional kwargs passed to framework
train()
function.
DeveloperAPI: This API may change across minor Ray releases.
- preprocess_datasets() None [source]¶
Called during fit() to preprocess dataset attributes with preprocessor.
Note: This method is run on a remote process.
This method is called prior to entering the training_loop.
If the
Trainer
has both a datasets dict and a preprocessor, the datasets dict contains a training dataset (denoted by the “train” key), and the preprocessor has not yet been fit, then it will be fit on the train dataset.Then, all Trainer’s datasets will be transformed by the preprocessor.
The transformed datasets will be set back in the
self.datasets
attribute of the Trainer to be used when overridingtraining_loop
.
- training_loop() None [source]¶
Loop called by fit() to run training and report results to Tune.
Note: this method runs on a remote process.
self.datasets
have already been preprocessed byself.preprocessor
.You can use the Tune Function API functions (
tune.report()
andtune.save_checkpoint()
) inside this training loop.Example
- as_trainable() Type[ray.tune.trainable.Trainable] [source]¶
Convert self to a
tune.Trainable
class.
Tuner¶
- class ray.tune.tuner.Tuner(trainable: Optional[Union[str, Callable, Type[ray.tune.trainable.Trainable], ray.ml.trainer.Trainer]] = None, param_space: Optional[Dict[str, Any]] = None, tune_config: Optional[ray.tune.tune_config.TuneConfig] = None, run_config: Optional[ray.ml.config.RunConfig] = None, _tuner_kwargs: Optional[Dict] = None, _tuner_internal: Optional[ray.tune.impl.tuner_internal.TunerInternal] = None)[source]¶
Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune.
- Parameters
trainable – The trainable to be tuned.
param_space – Search space of the tuning job. One thing to note is that both preprocessor and dataset can be tuned here.
tune_config – Tuning algorithm specific configs. Refer to ray.tune.tune_config.TuneConfig for more info.
run_config – Runtime configuration that is specific to individual trials. If passed, this will overwrite the run config passed to the Trainer, if applicable. Refer to ray.ml.config.RunConfig for more info.
Usage pattern:
from sklearn.datasets import load_breast_cancer from ray import tune from ray.data import from_pandas from ray.ml.config import RunConfig from ray.ml.train.integrations.xgboost import XGBoostTrainer from ray.tune.tuner import Tuner def get_dataset(): data_raw = load_breast_cancer(as_frame=True) dataset_df = data_raw["data"] dataset_df["target"] = data_raw["target"] dataset = from_pandas(dataset_df) return dataset trainer = XGBoostTrainer( label_column="target", params={}, datasets={"train": get_dataset()}, ) param_space = { "scaling_config": { "num_workers": tune.grid_search([2, 4]), "resources_per_worker": { "CPU": tune.grid_search([1, 2]), }, }, # You can even grid search various datasets in Tune. # "datasets": { # "train": tune.grid_search( # [ds1, ds2] # ), # }, "params": { "objective": "binary:logistic", "tree_method": "approx", "eval_metric": ["logloss", "error"], "eta": tune.loguniform(1e-4, 1e-1), "subsample": tune.uniform(0.5, 1.0), "max_depth": tune.randint(1, 9), }, } tuner = Tuner(trainable=trainer, param_space=param_space, run_config=RunConfig(name="my_tune_run")) analysis = tuner.fit()
To retry a failed tune run, you can then do
tuner = Tuner.restore(experiment_checkpoint_dir) tuner.fit()
experiment_checkpoint_dir
can be easily located near the end of the console output of your first failed run.PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- classmethod restore(path: str) ray.tune.tuner.Tuner [source]¶
Restores Tuner after a previously failed run.
- Parameters
path – The path where the previous failed run is checkpointed. This information could be easily located near the end of the console output of previous run. Note: depending on whether ray client mode is used or not, this path may or may not exist on your local machine.
- fit() ray.tune.result_grid.ResultGrid [source]¶
Executes hyperparameter tuning job as configured and returns result.
Failure handling: For the kind of exception that happens during the execution of a trial, one may inspect it together with stacktrace through the returned result grid. See
ResultGrid
for reference. Each trial may fail up to a certain number. This is configured byRunConfig.FailureConfig.max_failures
.Exception that happens beyond trials will be thrown by this method as well. In such cases, there will be instruction like the following printed out at the end of console output to inform users on how to resume.
Please use tuner = Tuner.restore(“~/ray_results/tuner_resume”) to resume.
- Raises
RayTaskError when the exception happens in trainable else TuneError. –
- class ray.tune.result_grid.ResultGrid(experiment_analysis: ray.tune.analysis.experiment_analysis.ExperimentAnalysis)[source]¶
A set of
Result
objects returned from a call totuner.fit()
.You can use it to inspect the trials run as well as obtaining the best result.
The constructor is a private API.
Usage pattern:
result_grid = tuner.fit() for i in range(len(result_grid)): result = result_grid[i] if not result.error: print(f"Trial finishes successfully with metric {result.metric}.") else: print(f"Trial errors out with {result.error}.") best_result = result_grid.get_best_result() best_checkpoint = best_result.checkpoint best_metric = best_result.metric
Note that trials of all statuses are included in the final result grid. If a trial is not in terminated state, its latest result and checkpoint as seen by Tune will be provided.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- get_best_result(metric: Optional[str] = None, mode: Optional[str] = None, scope: str = 'last', filter_nan_and_inf: bool = True) ray.ml.result.Result [source]¶
Get the best result from all the trials run.
- Parameters
metric – Key for trial info to order on. Defaults to the metric specified in your Tuner’s
TuneConfig
.mode – One of [min, max]. Defaults to the mode specified in your Tuner’s
TuneConfig
.scope – One of [all, last, avg, last-5-avg, last-10-avg]. If scope=last, only look at each trial’s final step for metric, and compare across trials based on mode=[min,max]. If scope=avg, consider the simple average over all steps for metric and compare across trials based on mode=[min,max]. If scope=last-5-avg or scope=last-10-avg, consider the simple average over the last 5 or 10 steps for metric and compare across trials based on mode=[min,max]. If scope=all, find each trial’s min/max score for metric based on mode, and compare trials based on mode=[min,max].
filter_nan_and_inf – If True (default), NaN or infinite values are disregarded and these trials are never selected as the best trial.
- get_dataframe(filter_metric: Optional[str] = None, filter_mode: Optional[str] = None) pandas.core.frame.DataFrame [source]¶
Return dataframe of all trials with their configs and reported results.
Per default, this returns the last reported results for each trial.
If
filter_metric
andfilter_mode
are set, the results from each trial are filtered for this metric and mode. For example, iffilter_metric="some_metric"
andfilter_mode="max"
, for each trial, every received result is checked, and the one wheresome_metric
is maximal is returned.Example
result_grid = Tuner.fit(...) # Get last reported results per trial df = result_grid.get_dataframe() # Get best ever reported accuracy per trial df = result_grid.get_dataframe(metric="accuracy", mode="max")
- Parameters
filter_metric – Metric to filter best result for.
filter_mode – If
filter_metric
is given, one of["min", "max"]
to specify if we should find the minimum or maximum result.
- Returns
Pandas DataFrame with each trial as a row and their results as columns.
Predictors¶
- class ray.ml.predictor.Predictor[source]¶
Predictors load models from checkpoints to perform inference.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, **kwargs) ray.ml.predictor.Predictor [source]¶
Create a specific predictor from a checkpoint.
- Parameters
checkpoint – Checkpoint to load predictor data from.
kwargs – Arguments specific to predictor implementations.
- Returns
Predictor object.
- Return type
- predict(data: Union[pd.DataFrame, np.ndarray], **kwargs) Union[pd.DataFrame, np.ndarray] [source]¶
Perform inference on a batch of data.
- Parameters
data – A batch of input data. Either a pandas Dataframe or numpy array.
kwargs – Arguments specific to predictor implementations.
- Returns
Prediction result.
- Return type
DataBatchType
- class ray.ml.batch_predictor.BatchPredictor(checkpoint: ray.ml.checkpoint.Checkpoint, predictor_cls: Type[ray.ml.predictor.Predictor], **predictor_kwargs)[source]¶
Batch predictor class.
Takes a predictor class and a checkpoint and provides an interface to run batch scoring on Ray datasets.
This batch predictor wraps around a predictor class and executes it in a distributed way when calling
predict()
.- checkpoint¶
Checkpoint loaded by the distributed predictor objects.
- predictor_cls¶
Predictor class reference. When scoring, each scoring worker will create an instance of this class and call
predict(batch)
on it.
- \*\*predictor_kwargs
Keyword arguments passed to the predictor on initialization.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, predictor_cls: Type[ray.ml.predictor.Predictor], **kwargs) ray.ml.batch_predictor.BatchPredictor [source]¶
Create a specific predictor from a checkpoint.
- Parameters
checkpoint – Checkpoint to load predictor data from.
kwargs – Arguments specific to predictor implementations.
- Returns
Predictor object.
- Return type
- predict(data: ray.data.dataset.Dataset, *, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: int = 1, num_gpus_per_worker: int = 0, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) ray.data.dataset.Dataset [source]¶
Run batch scoring on dataset.
- Parameters
data – Ray dataset to run batch prediction on.
batch_size – Split dataset into batches of this size for prediction.
min_scoring_workers – Minimum number of scoring actors.
max_scoring_workers – If set, specify the maximum number of scoring actors.
num_cpus_per_worker – Number of CPUs to allocate per scoring worker.
num_gpus_per_worker – Number of GPUs to allocate per scoring worker.
ray_remote_args – Additional resource requirements to request from ray.
predict_kwargs – Keyword arguments passed to the predictor’s
predict()
method.
- Returns
Dataset containing scoring results.
- class ray.ml.predictors.integrations.xgboost.XGBoostPredictor(model: xgboost.core.Booster, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]¶
Bases:
ray.ml.predictor.Predictor
A predictor for XGBoost models.
- Parameters
model – The XGBoost booster to use for predictions.
preprocessor – A preprocessor used to transform data batches prior to prediction.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.xgboost.xgboost_predictor.XGBoostPredictor [source]¶
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
XGBoostTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
XGBoostTrainer
run.
- predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, dmatrix_kwargs: Optional[Dict[str, Any]] = None, **predict_kwargs) pandas.core.frame.DataFrame [source]¶
Run inference on data batch.
The data is converted into an XGBoost DMatrix before being inputted to the model.
- Parameters
data – A batch of input data. Either a pandas DataFrame or numpy array.
feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in
data
.dmatrix_kwargs – Dict of keyword arguments passed to
xgboost.DMatrix
.**predict_kwargs – Keyword arguments passed to
xgboost.Booster.predict
.
Examples:
import numpy as np import xgboost as xgb from ray.ml.predictors.xgboost import XGBoostPredictor train_X = np.array([[1, 2], [3, 4]]) train_y = np.array([0, 1]) model = xgb.XGBClassifier().fit(train_X, train_y) predictor = XGBoostPredictor(model=model.get_booster()) data = np.array([[1, 2], [3, 4]]) predictions = predictor.predict(data) # Only use first and second column as the feature data = np.array([[1, 2, 8], [3, 4, 9]]) predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd import xgboost as xgb from ray.ml.predictors.xgboost import XGBoostPredictor train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) train_y = pd.Series([0, 1]) model = xgb.XGBClassifier().fit(train_X, train_y) predictor = XGBoostPredictor(model=model.get_booster()) # Pandas dataframe. data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) predictions = predictor.predict(data) # Only use first and second column as the feature data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"]) predictions = predictor.predict(data, feature_columns=["A", "B"])
- Returns
Prediction result.
- Return type
pd.DataFrame
- class ray.ml.predictors.integrations.lightgbm.LightGBMPredictor(model: lightgbm.basic.Booster, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]¶
Bases:
ray.ml.predictor.Predictor
A predictor for LightGBM models.
- Parameters
model – The LightGBM booster to use for predictions.
preprocessor – A preprocessor used to transform data batches prior to prediction.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.lightgbm.lightgbm_predictor.LightGBMPredictor [source]¶
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
LightGBMTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
LightGBMTrainer
run.
- predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, **predict_kwargs) pandas.core.frame.DataFrame [source]¶
Run inference on data batch.
- Parameters
data – A batch of input data. Either a pandas DataFrame or numpy array.
feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in
data
.**predict_kwargs – Keyword arguments passed to
lightgbm.Booster.predict
.
Examples:
import numpy as np import lightgbm as lgbm from ray.ml.predictors.lightgbm import LightGBMPredictor train_X = np.array([[1, 2], [3, 4]]) train_y = np.array([0, 1]) model = lgbm.LGBMClassifier().fit(train_X, train_y) predictor = LightGBMPredictor(model=model.booster_) data = np.array([[1, 2], [3, 4]]) predictions = predictor.predict(data) # Only use first and second column as the feature data = np.array([[1, 2, 8], [3, 4, 9]]) predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd import lightgbm as lgbm from ray.ml.predictors.lightgbm import LightGBMPredictor train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) train_y = pd.Series([0, 1]) model = lgbm.LGBMClassifier().fit(train_X, train_y) predictor = LightGBMPredictor(model=model.booster_) # Pandas dataframe. data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) predictions = predictor.predict(data) # Only use first and second column as the feature data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"]) predictions = predictor.predict(data, feature_columns=["A", "B"])
- Returns
Prediction result.
- Return type
pd.DataFrame
- class ray.ml.predictors.integrations.tensorflow.TensorflowPredictor(model_definition: Union[Callable[[], <MagicMock name='mock.keras.Model' id='140040349448656'>], Type[<MagicMock name='mock.keras.Model' id='140040349448656'>]], preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None, model_weights: Optional[list] = None)[source]¶
Bases:
ray.ml.predictor.Predictor
A predictor for TensorFlow models.
- Parameters
model_definition – A callable that returns a TensorFlow Keras model to use for predictions.
preprocessor – A preprocessor used to transform data batches prior to prediction.
model_weights – List of weights to use for the model.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model_definition: Union[Callable[[], <MagicMock name='mock.keras.Model' id='140040349448656'>], Type[<MagicMock name='mock.keras.Model' id='140040349448656'>]]) ray.ml.predictors.integrations.tensorflow.tensorflow_predictor.TensorflowPredictor [source]¶
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
TensorflowTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
TensorflowTrainer
run.model_definition – A callable that returns a TensorFlow Keras model to use. Model weights will be loaded from the checkpoint.
- predict(data: Union[pd.DataFrame, np.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, dtype: Optional[<MagicMock name='mock.dtypes.DType' id='140038620202576'>] = None) Union[pd.DataFrame, np.ndarray] [source]¶
Run inference on data batch.
The data is converted into a TensorFlow Tensor before being inputted to the model.
- Parameters
data – A batch of input data. Either a pandas DataFrame or numpy array.
feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in
data
.dtype – The TensorFlow dtype to use when creating the TensorFlow tensor. If set to None, then automatically infer the dtype.
Examples:
import numpy as np import tensorflow as tf from ray.ml.predictors.tensorflow import TensorflowPredictor def build_model(self): return tf.keras.Sequential( [ tf.keras.layers.InputLayer(input_shape=(2,)), tf.keras.layers.Dense(1), ] ) predictor = TensorflowPredictor(model_definition=build_model) data = np.array([[1, 2], [3, 4]]) predictions = predictor.predict(data)
import pandas as pd import tensorflow as tf from ray.ml.predictors.tensorflow import TensorflowPredictor def build_model(self): return tf.keras.Sequential( [ tf.keras.layers.InputLayer(input_shape=(1,)), tf.keras.layers.Dense(1), ] ) predictor = TensorflowPredictor(model_definition=build_model) # Pandas dataframe. data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) predictions = predictor.predict(data) # Only use first column as the feature predictions = predictor.predict(data, feature_columns=["A"])
- Returns
Prediction result.
- Return type
DataBatchType
- class ray.ml.predictors.integrations.torch.TorchPredictor(model: torch.nn.modules.module.Module, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]¶
Bases:
ray.ml.predictor.Predictor
A predictor for PyTorch models.
- Parameters
model – The torch module to use for predictions.
preprocessor – A preprocessor used to transform data batches prior to prediction.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, model: Optional[torch.nn.modules.module.Module] = None) ray.ml.predictors.integrations.torch.torch_predictor.TorchPredictor [source]¶
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
TorchTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
TorchTrainer
run.model – If the checkpoint contains a model state dict, and not the model itself, then the state dict will be loaded to this
model
.
- predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[List[str]], List[int], List[List[int]]]] = None, dtype: Optional[torch.dtype] = None, unsqueeze: bool = True) Union[pandas.core.frame.DataFrame, numpy.ndarray] [source]¶
Run inference on data batch.
The data is converted into a torch Tensor before being inputted to the model.
- Parameters
data – A batch of input data. Either a pandas DataFrame or numpy array.
feature_columns – The names or indices of the columns in the data to use as features to predict on. If this arg is a list of lists or a dict of string-list pairs, then the data batch will be converted into a multiple tensors which are then concatenated before feeding into the model. This is useful for multi-input models. If None, then use all columns in
data
.dtype – The dtypes to use for the tensors. This should match the format of
feature_columns
, or be a single dtype, in which case it will be applied to all tensors. If None, then automatically infer the dtype.unsqueeze – If set to True, the features tensors will be unsqueezed (reshaped to (N, 1)) before being concatenated into the final features tensor. Otherwise, they will be left as is, that is (N, ). Defaults to True.
Examples:
import numpy as np import torch from ray.ml.predictors.integrations.torch import TorchPredictor model = torch.nn.Linear(2, 1) predictor = TorchPredictor(model=model) data = np.array([[1, 2], [3, 4]]) predictions = predictor.predict(data)
import pandas as pd import torch from ray.ml.predictors.integrations.torch import TorchPredictor model = torch.nn.Linear(1, 1) predictor = TorchPredictor(model=model) # Pandas dataframe. data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) predictions = predictor.predict(data) # Only use first column as the feature predictions = predictor.predict(data, feature_columns=["A"])
- Returns
Prediction result.
- Return type
DataBatchType
- class ray.ml.predictors.integrations.sklearn.SklearnPredictor(estimator: sklearn.base.BaseEstimator, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]¶
Bases:
ray.ml.predictor.Predictor
A predictor for scikit-learn compatible estimators.
- Parameters
estimator – The fitted scikit-learn compatible estimator to use for predictions.
preprocessor – A preprocessor used to transform data batches prior to prediction.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint) ray.ml.predictors.integrations.sklearn.sklearn_predictor.SklearnPredictor [source]¶
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
SklearnTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
SklearnTrainer
run.
- predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[Union[List[str], List[int]]] = None, num_estimator_cpus: Optional[int] = 1, **predict_kwargs) pandas.core.frame.DataFrame [source]¶
Run inference on data batch.
- Parameters
data – A batch of input data. Either a pandas DataFrame or numpy array.
feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, then use all columns in
data
.num_estimator_cpus – If set to a value other than None, will set the values of all
n_jobs
andthread_count
parameters in the estimator (including in nested objects) to the given value.**predict_kwargs – Keyword arguments passed to
estimator.predict
.
Examples:
import numpy as np from sklearn.ensemble import RandomForestClassifier from ray.ml.predictors.sklearn import SklearnPredictor train_X = np.array([[1, 2], [3, 4]]) train_y = np.array([0, 1]) model = RandomForestClassifier().fit(train_X, train_y) predictor = SklearnPredictor(model=model) data = np.array([[1, 2], [3, 4]]) predictions = predictor.predict(data) # Only use first and second column as the feature data = np.array([[1, 2, 8], [3, 4, 9]]) predictions = predictor.predict(data, feature_columns=[0, 1])
import pandas as pd from sklearn.ensemble import RandomForestClassifier from ray.ml.predictors.sklearn import SklearnPredictor train_X = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) train_y = pd.Series([0, 1]) model = RandomForestClassifier().fit(train_X, train_y) predictor = SklearnPredictor(model=model) # Pandas dataframe. data = pd.DataFrame([[1, 2], [3, 4]], columns=["A", "B"]) predictions = predictor.predict(data) # Only use first and second column as the feature data = pd.DataFrame([[1, 2, 8], [3, 4, 9]], columns=["A", "B", "C"]) predictions = predictor.predict(data, feature_columns=["A", "B"])
- Returns
Prediction result.
- Return type
pd.DataFrame
- class ray.ml.predictors.integrations.huggingface.HuggingFacePredictor(pipeline: Optional[<MagicMock name='mock.Pipeline' id='140038619696592'>] = None, preprocessor: Optional[ray.ml.preprocessor.Preprocessor] = None)[source]¶
Bases:
ray.ml.predictor.Predictor
A predictor for HuggingFace Transformers PyTorch models.
This predictor uses Transformers Pipelines for inference.
- Parameters
pipeline – The Transformers pipeline to use for inference.
preprocessor – A preprocessor used to transform data batches prior to prediction.
- classmethod from_checkpoint(checkpoint: ray.ml.checkpoint.Checkpoint, *, pipeline: Optional[Type[<MagicMock name='mock.Pipeline' id='140038619696592'>]] = None, **pipeline_kwargs) ray.ml.predictors.integrations.huggingface.huggingface_predictor.HuggingFacePredictor [source]¶
Instantiate the predictor from a Checkpoint.
The checkpoint is expected to be a result of
HuggingFaceTrainer
.- Parameters
checkpoint – The checkpoint to load the model and preprocessor from. It is expected to be from the result of a
HuggingFaceTrainer
run.pipeline – A
transformers.pipelines.Pipeline
class to use. If not specified, will use thepipeline
abstraction wrapper.**pipeline_kwargs – Any kwargs to pass to the pipeline initialization. If
pipeline
is None, this must contain the ‘task’ argument. Cannot contain ‘model’.
- predict(data: Union[pandas.core.frame.DataFrame, numpy.ndarray], feature_columns: Optional[List[str]] = None, **pipeline_call_kwargs) Union[pandas.core.frame.DataFrame, numpy.ndarray] [source]¶
Run inference on data batch.
The data is converted into a list (unless
pipeline
is aTableQuestionAnsweringPipeline
) and passed to thepipeline
object.- Parameters
data – A batch of input data. Either a pandas DataFrame or numpy array.
feature_columns – The names or indices of the columns in the data to use as features to predict on. If None, use all columns.
**pipeline_call_kwargs – additional kwargs to pass to the
pipeline
object.
Examples:
import pandas as pd from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer from transformers.pipelines import pipeline from ray.ml.predictors.integrations.huggingface import HuggingFacePredictor model_checkpoint = "gpt2" tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer" tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint) model_config = AutoConfig.from_pretrained(model_checkpoint) model = AutoModelForCausalLM.from_config(model_config) predictor = HuggingFacePredictor( pipeline=pipeline( task="text-generation", model=model, tokenizer=tokenizer ) ) prompts = pd.DataFrame( ["Complete me", "And me", "Please complete"], columns=["sentences"] ) predictions = predictor.predict(prompts)
- Returns
Prediction result.
- Return type
DataBatchType
Serving¶
- ray.serve.model_wrappers.ModelWrapperDeployment¶
alias of Deployment(name=ModelWrapperDeployment,version=None,route_prefix=/ModelWrapperDeployment)
- class ray.serve.model_wrappers.ModelWrapper(predictor_cls: Union[str, Type[ray.ml.predictor.Predictor]], checkpoint: Union[ray.ml.checkpoint.Checkpoint, Dict], http_adapter: Union[str, Callable[[Any], Any]] = 'ray.serve.http_adapters.json_to_ndarray', batching_params: Optional[Union[Dict[str, int], bool]] = None, **predictor_kwargs)[source]¶
Serve any Ray AIR predictor from an AIR checkpoint.
- Parameters
predictor_cls (str, Type[Predictor]) – The class or path for predictor class. The type must be a subclass of
ray.ml.predicotr.Predictor
.checkpoint (Checkpoint, dict) –
The checkpoint object or a dictionary describe the object.
The checkpoint object must be a subclass of
ray.ml.checkpoint.Checkpoint
.The dictionary should be in the form of
{"checkpoint_cls": "import.path.MyCheckpoint", "uri": "uri_to_load_from"}
. Serve will then callMyCheckpoint.from_uri("uri_to_load_from")
to instantiate the object.
http_adapter (str, HTTPAdapterFn, None) – The FastAPI input conversion function. By default, Serve will use the NdArray schema and convert to numpy array. You can pass in any FastAPI dependency resolver that returns an array. When you pass in a string, Serve will import it. Please refer to Serve HTTP adatpers documentation to learn more.
batching_params (dict, None, False) – override the default parameters to
ray.serve.batch()
. PassFalse
to disable batching.**predictor_kwargs – Additional keyword arguments passed to the
Predictor.from_checkpoint()
call.
Outputs¶
- class ray.ml.checkpoint.Checkpoint(local_path: Optional[str] = None, data_dict: Optional[dict] = None, uri: Optional[str] = None, obj_ref: Optional[<MagicMock name='mock.ObjectRef' id='140040716323920'>] = None)[source]¶
Ray ML Checkpoint.
This implementation provides methods to translate between different checkpoint storage locations: Local storage, external storage (e.g. cloud storage), and data dict representations.
The constructor is a private API, instead the
from_
methods should be used to create checkpoint objects (e.g.Checkpoint.from_directory()
).When converting between different checkpoint formats, it is guaranteed that a full round trip of conversions (e.g. directory –> dict –> obj ref –> directory) will recover the original checkpoint data. There are no guarantees made about compatibility of intermediate representations.
Examples
Example for an arbitrary data checkpoint:
from ray.ml.checkpoint import Checkpoint # Create checkpoint data dict checkpoint_data = {"data": 123} # Create checkpoint object from data checkpoint = Checkpoint.from_dict(checkpoint_data) # Save checkpoint to temporary location path = checkpoint.to_directory() # This path can then be passed around, e.g. to a different function # At some other location, recover Checkpoint object from path checkpoint = Checkpoint.from_directory(path) # Convert into dictionary again recovered_data = checkpoint.to_dict() # It is guaranteed that the original data has been recovered assert recovered_data == checkpoint_data
Example using MLflow for saving and loading a classifier:
from ray.ml.checkpoint import Checkpoint from sklearn.ensemble import RandomForestClassifier import mlflow.sklearn # Create an sklearn classifier clf = RandomForestClassifier(max_depth=7, random_state=0) # ... e.g. train model with clf.fit() # Save model using MLflow mlflow.sklearn.save_model(clf, "model_directory") # Create checkpoint object from path checkpoint = Checkpoint.from_directory("model_directory") # Convert into dictionary checkpoint_dict = checkpoint.to_dict() # This dict can then be passed around, e.g. to a different function # At some other location, recover checkpoint object from dict checkpoint = Checkpoint.from_dict(checkpoint_dict) # Convert into a directory again checkpoint.to_directory("other_directory") # We can now use MLflow to re-load the model clf = mlflow.sklearn.load_model("other_directory") # It is guaranteed that the original data was recovered assert isinstance(clf, RandomForestClassifier)
Checkpoints can be pickled and sent to remote processes. Please note that checkpoints pointing to local directories will be pickled as data representations, so the full checkpoint data will be contained in the checkpoint object. If you want to avoid this, consider passing only the checkpoint directory to the remote task and re-construct your checkpoint object in that function. Note that this will only work if the “remote” task is scheduled on the same node or a node that also has access to the local data path (e.g. on a shared file system like NFS).
Checkpoints pointing to object store references will keep the object reference in tact - this means that these checkpoints cannot be properly deserialized on other Ray clusters or outside a Ray cluster. If you need persistence across clusters, use the
to_uri()
orto_directory()
methods to persist your checkpoints to disk.PublicAPI: This API is stable across Ray releases.
- classmethod from_bytes(data: bytes) ray.ml.checkpoint.Checkpoint [source]¶
Create a checkpoint from the given byte string.
- Parameters
data – Data object containing pickled checkpoint data.
- Returns
checkpoint object.
- Return type
- to_bytes() bytes [source]¶
Return Checkpoint serialized as bytes object.
- Returns
Bytes object containing checkpoint data.
- Return type
bytes
- classmethod from_dict(data: dict) ray.ml.checkpoint.Checkpoint [source]¶
Create checkpoint object from dictionary.
- Parameters
data – Dictionary containing checkpoint data.
- Returns
checkpoint object.
- Return type
- to_dict() dict [source]¶
Return checkpoint data as dictionary.
- Returns
Dictionary containing checkpoint data.
- Return type
dict
- classmethod from_object_ref(obj_ref: <MagicMock name='mock.ObjectRef' id='140040716323920'>) ray.ml.checkpoint.Checkpoint [source]¶
Create checkpoint object from object reference.
- Parameters
obj_ref – ObjectRef pointing to checkpoint data.
- Returns
checkpoint object.
- Return type
- to_object_ref() <MagicMock name='mock.ObjectRef' id='140040716323920'> [source]¶
Return checkpoint data as object reference.
- Returns
ObjectRef pointing to checkpoint data.
- Return type
ray.ObjectRef
- classmethod from_directory(path: str) ray.ml.checkpoint.Checkpoint [source]¶
Create checkpoint object from directory.
- Parameters
path – Directory containing checkpoint data. The caller promises to not delete the directory (gifts ownership of the directory to this Checkpoint).
- Returns
checkpoint object.
- Return type
- to_directory(path: Optional[str] = None, dedup: bool = True) str [source]¶
Write checkpoint data to directory.
- Parameters
path – Target directory to restore data in. If not specified,
- Returns
Directory containing checkpoint data.
- Return type
str
- as_directory() Iterator[str] [source]¶
Return checkpoint directory path in a context.
This function makes checkpoint data available as a directory while avoiding unnecessary copies and left-over temporary data.
If the checkpoint is already a directory checkpoint, it will return the existing path. If it is not, it will create a temporary directory, which will be deleted after the context is exited.
Users should treat the returned checkpoint directory as read-only and avoid changing any data within it, as it might get deleted when exiting the context.
Example
- with checkpoint.as_directory() as checkpoint_dir:
# Do some read-only processing of files within checkpoint_dir pass
# At this point, if a temporary directory was created, it will have # been deleted.
- classmethod from_uri(uri: str) ray.ml.checkpoint.Checkpoint [source]¶
Create checkpoint object from location URI (e.g. cloud storage).
Valid locations currently include AWS S3 (
s3://
), Google cloud storage (gs://
), HDFS (hdfs://
), and local files (file://
).- Parameters
uri – Source location URI to read data from.
- Returns
checkpoint object.
- Return type
- to_uri(uri: str) str [source]¶
Write checkpoint data to location URI (e.g. cloud storage).
- Parameters
uri – Target location URI to write data to.
- Returns
Cloud location containing checkpoint data.
- Return type
str
- get_internal_representation() Tuple[str, Union[dict, str, <MagicMock name='mock.ObjectRef' id='140040716323920'>]] [source]¶
Return tuple of (type, data) for the internal representation.
The internal representation can be used e.g. to compare checkpoint objects for equality or to access the underlying data storage.
The returned type is a string and one of
["local_path", "data_dict", "uri", "object_ref"]
.The data is the respective data value.
Note that paths converted from
file://...
will be returned aslocal_path
(without thefile://
prefix) and not asuri
.- Returns:
Tuple of type and data.
DeveloperAPI: This API may change across minor Ray releases.
- class ray.ml.result.Result(metrics: Optional[Dict[str, Any]], checkpoint: Optional[ray.ml.checkpoint.Checkpoint], error: Optional[Exception])[source]¶
The final result of a ML training run or a Tune trial.
This is the class produced by Trainer.fit(). It contains a checkpoint, which can be used for resuming training and for creating a Predictor object. It also contains a metrics object describing training metrics. error is included so that non successful runs and trials can be represented as well.
The constructor is a private API.
- Parameters
metrics – The final metrics as reported by an Trainable.
checkpoint – The final checkpoint of the Trainable.
error – The execution error of the Trainable run, if the trial finishes in error.
- property config: Optional[Dict[str, Any]]¶
The config associated with the result.
Configs¶
- class ray.ml.config.ScalingConfigDataClass(trainer_resources: Optional[Dict] = None, num_workers: Optional[int] = None, use_gpu: bool = False, resources_per_worker: Optional[Dict] = None, placement_strategy: str = 'PACK')[source]¶
Configuration for scaling training.
This is the schema for the scaling_config dict, and after beta, this will be the actual representation for Scaling config objects.
- trainer_resources: Resources to allocate for the trainer. If none is provided,
will default to 1 CPU.
- num_workers: The number of workers (Ray actors) to launch.
Each worker will reserve 1 CPU by default. The number of CPUs reserved by each worker can be overridden with the
resources_per_worker
argument.- use_gpu: If True, training will be done on GPUs (1 per worker).
Defaults to False. The number of GPUs reserved by each worker can be overridden with the
resources_per_worker
argument.- resources_per_worker: If specified, the resources
defined in this Dict will be reserved for each worker. The
CPU
andGPU
keys (case-sensitive) can be defined to override the number of CPU/GPUs used by each worker.- placement_strategy: The placement strategy to use for the
placement group of the Ray actors. See Placement Group Strategies for the possible options.
- property num_cpus_per_worker¶
The number of CPUs to set per worker.
- property num_gpus_per_worker¶
The number of GPUs to set per worker.
- property additional_resources_per_worker¶
Resources per worker, not including CPU or GPU resources.
- class ray.ml.config.FailureConfig[source]¶
Configuration related to failure handling of each run/trial.
- Parameters
max_failures – Tries to recover a run at least this many times. Will recover from the latest checkpoint if present. Setting to -1 will lead to infinite recovery retries. Setting to 0 will disable retries. Defaults to 0.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- class ray.ml.config.RunConfig(name: Optional[str] = None, local_dir: Optional[str] = None, callbacks: Optional[List[Callback]] = None, stop: Optional[Union[Mapping, Stopper, Callable[[str, Mapping], bool]]] = None, failure: Optional[ray.ml.config.FailureConfig] = None, sync_config: Optional[ray.tune.syncer.SyncConfig] = None, verbose: Union[int, ray.tune.utils.log.Verbosity] = Verbosity.V3_TRIAL_DETAILS)[source]¶
Runtime configuration for individual trials that are run.
This contains information that applies to individual runs of Trainable classes. This includes both running a Trainable by itself or running a hyperparameter tuning job on top of a Trainable (applies to each trial).
At resume, Ray Tune will automatically apply the same run config so that resumed run uses the same run config as the original run.
- Parameters
name – Name of the trial or experiment. If not provided, will be deduced from the Trainable.
local_dir – Local dir to save training results to. Defaults to
~/ray_results
.stop – Stop conditions to consider. Refer to ray.tune.stopper.Stopper for more info. Stoppers should be serializable.
callbacks – Callbacks to invoke. Refer to ray.tune.callback.Callback for more info. Callbacks should be serializable. Currently only stateless callbacks are supported for resumed runs. (any state of the callback will not be checkpointed by Tune and thus will not take effect in resumed runs).
failure – The failure mode configuration.
sync_config – Configuration object for syncing. See tune.SyncConfig.
verbose – 0, 1, 2, or 3. Verbosity mode. 0 = silent, 1 = only status updates, 2 = status and brief results, 3 = status and detailed results. Defaults to 2.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.