Getting Data in and out of Tune#

Often, you will find yourself needing to pass data into Tune Trainables (datasets, models, other large parameters) and get data out of them (metrics, checkpoints, other artifacts). In this guide, we’ll explore different ways of doing that and see in what circumstances they should be used.

Let’s start by defining a simple Trainable function. We’ll be expanding this function with different functionality as we go.

import random
import time
import pandas as pd


def training_function(config):
    # For now, we have nothing here.
    data = None
    model = {"hyperparameter_a": None, "hyperparameter_b": None}
    epochs = 0

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}

Our training_function function requires a pandas DataFrame, a model with some hyperparameters and the number of epochs to train the model for as inputs. The hyperparameters of the model impact the metric returned, and in each epoch (iteration of training), the trained_model state is changed.

We will run hyperparameter optimization using the Tuner API.

from ray.tune import Tuner
from ray import tune

tuner = Tuner(training_function, tune_config=tune.TuneConfig(num_samples=4))

Getting data into Tune#

First order of business is to provide the inputs for the Trainable. We can broadly separate them into two categories - variables and constants.

Variables are the parameters we want to tune. They will be different for every Trial. For example, those may be the learning rate and batch size for a neural network, number of trees and the maximum depth for a random forest, or the data partition if you are using Tune as an execution engine for batch training.

Constants are the parameters that are the same for every Trial. Those can be the number of epochs, model hyperparameters we want to set but not tune, the dataset and so on. Often, the constants will be quite large (e.g. the dataset or the model).

Warning

Objects from the outer scope of the training_function will also be automatically serialized and sent to Trial Actors, which may lead to unintended behavior. Examples include global locks not working (as each Actor operates on a copy) or general errors related to serialization. Best practice is to not refer to any objects from outer scope in the training_function.

Passing data into a Tune run through search spaces#

Note

TL;DR - use the param_space argument to specify small, serializable constants and variables.

The first way of passing inputs into Trainables is the search space (it may also be called parameter space or config). In the Trainable itself, it maps to the config dict passed in as an argument to the function. You define the search space using the param_space argument of the Tuner. The search space is a dict and may be composed of distributions, which will sample a different value for each Trial, or of constant values. The search space may be composed of nested dictionaries, and those in turn can have distributions as well.

Warning

Each value in the search space will be saved directly in the Trial metadata. This means that every value in the search space must be serializable and take up a small amount of memory.

For example, passing in a large pandas DataFrame or an unserializable model object as a value in the search space will lead to unwanted behavior. At best it will cause large slowdowns and disk space usage as Trial metadata saved to disk will also contain this data. At worst, an exception will be raised, as the data cannot be sent over to the Trial workers. For more details, see How can I avoid bottlenecks?.

Instead, use strings or other identifiers as your values, and initialize/load the objects inside your Trainable directly depending on those.

Note

Datasets can be used as values in the search space directly.

In our example, we want to tune the two model hyperparameters. We also want to set the number of epochs, so that we can easily tweak it later. For the hyperparameters, we will use the tune.uniform distribution. We will also modify the training_function to obtain those values from the config dictionary.

def training_function(config):
    # For now, we have nothing here.
    data = None

    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

Using tune.with_parameters access data in Tune runs#

Note

TL;DR - use the tune.with_parameters util function to specify large constant parameters.

If we have large objects that are constant across Trials, we can use the tune.with_parameters utility to pass them into the Trainable directly. The objects will be stored in the Ray object store so that each Trial worker may access them to obtain a local copy to use in its process.

Tip

Objects put into the Ray object store must be serializable.

Note that the serialization (once) and deserialization (for each Trial) of large objects may incur a performance overhead.

In our example, we will pass the data DataFrame using tune.with_parameters. In order to do that, we need to modify our function signature to include data as an argument.

def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

Next step is to wrap the training_function using tune.with_parameters before passing it into the Tuner. Every keyword argument of the tune.with_parameters call will be mapped to the keyword arguments in the Trainable signature.

data = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})

tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4),
)

Loading data in a Tune Trainable#

You can also load data directly in Trainable from e.g. cloud storage, shared file storage such as NFS, or from the local disk of the Trainable worker.

Warning

When loading from disk, ensure that all nodes in your cluster have access to the file you are trying to load.

A common use-case is to load the dataset from S3 or any other cloud storage with pandas, arrow or any other framework.

The working directory of the Trainable worker will be automatically changed to the corresponding Trial directory. For more details, see How do I access relative filepaths in my Tune training function?.

Our tuning run can now be run, though we will not yet obtain any meaningful outputs back.

results = tuner.fit()

Getting data out of Ray Tune#

We can now run our tuning run using the training_function Trainable. The next step is to report metrics to Tune that can be used to guide the optimization. We will also want to checkpoint our trained models so that we can resume the training after an interruption, and to use them for prediction later.

The ray.train.report API is used to get data out of the Trainable workers. It can be called multiple times in the Trainable function. Each call corresponds to one iteration (epoch, step, tree) of training.

Reporting metrics with Tune#

Metrics are values passed through the metrics argument in a train.report call. Metrics can be used by Tune Search Algorithms and Schedulers to direct the search. After the tuning run is complete, you can analyze the results, which include the reported metrics.

Note

Similarly to search space values, each value reported as a metric will be saved directly in the Trial metadata. This means that every value reported as a metric must be serializable and take up a small amount of memory.

Note

Tune will automatically include some metrics, such as the training iteration, timestamp and more. See here for the entire list.

In our example, we want to maximize the metric. We will report it each epoch to Tune, and set the metric and mode arguments in tune.TuneConfig to let Tune know that it should use it as the optimization objective.

from ray import train


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        train.report(metrics={"metric": metric})


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
)

Logging metrics with Tune callbacks#

Every metric logged using train.report can be accessed during the tuning run through Tune Callbacks. Ray Tune provides several built-in integrations with popular frameworks, such as MLFlow, Weights & Biases, CometML and more. You can also use the Callback API to create your own callbacks.

Callbacks are passed in the callback argument of the Tuner’s RunConfig.

In our example, we’ll use the MLFlow callback to track the progress of our tuning run and the changing value of the metric (requires mlflow to be installed).

from ray import train
from ray.train import RunConfig
from ray.tune.logger.mlflow import MLflowLoggerCallback


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        train.report(metrics={"metric": metric})


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

Getting data out of Tune using checkpoints & other artifacts#

Aside from metrics, you may want to save the state of your trained model and any other artifacts to allow resumption from training failure and further inspection and usage. Those cannot be saved as metrics, as they are often far too large and may not be easily serializable. Finally, they should be persisted on disk or cloud storage to allow access after the Tune run is interrupted or terminated.

Ray Train provides a Checkpoint API for that purpose. Checkpoint objects can be created from various sources (dictionaries, directories, cloud storage).

In Ray Tune, Checkpoints are created by the user in their Trainable functions and reported using the optional checkpoint argument of train.report. Checkpoints can contain arbitrary data and can be freely passed around the Ray cluster. After a tuning run is over, Checkpoints can be obtained from the results.

Ray Tune can be configured to automatically sync checkpoints to cloud storage, keep only a certain number of checkpoints to save space (with ray.train.CheckpointConfig) and more.

Note

The experiment state itself is checkpointed separately. See Appendix: Types of data stored by Tune for more details.

In our example, we want to be able to resume the training from the latest checkpoint, and to save the trained_model in a checkpoint every iteration. To accomplish this, we will use the session and Checkpoint APIs.

import os
import pickle
import tempfile

from ray import train
from ray.train import Checkpoint


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Load the checkpoint, if there is any.
    checkpoint = session.get_checkpoint()
    start_epoch = 0
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            with open(os.path.join(checkpoint_dir, "model.pkl"), "w") as f:
                checkpoint_dict = pickle.load(f)
        start_epoch = checkpoint_dict["epoch"] + 1
        model = checkpoint_dict["state"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(start_epoch, epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()

        checkpoint_dict = {"state": model, "epoch": epoch}

        # Create the checkpoint.
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            with open(os.path.join(temp_checkpoint_dir, "model.pkl"), "w") as f:
                pickle.dump(checkpoint_dict, f)
            train.report(
                {"metric": metric},
                checkpoint=Checkpoint.from_directory(temp_checkpoint_dir),
            )


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

With all of those changes implemented, we can now run our tuning and obtain meaningful metrics and artifacts.

results = tuner.fit()
results.get_dataframe()
2022-11-30 17:40:28,839 INFO tune.py:762 -- Total run time: 15.79 seconds (15.65 seconds for the tuning loop).
metric time_this_iter_s should_checkpoint done timesteps_total episodes_total training_iteration trial_id experiment_id date ... hostname node_ip time_since_restore timesteps_since_restore iterations_since_restore warmup_time config/epochs config/hyperparameter_a config/hyperparameter_b logdir
0 -58.399962 1.015951 True False NaN NaN 10 0b239_00000 acf38c19d59c4cf2ad7955807657b6ea 2022-11-30_17-40-26 ... ip-172-31-43-110 172.31.43.110 10.282120 0 10 0.003541 10 18.065981 -98.298928 /home/ubuntu/ray_results/training_function_202...
1 -24.461518 1.030420 True False NaN NaN 10 0b239_00001 5ca9e03d7cca46a7852cd501bc3f7b38 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.362581 0 10 0.004031 10 1.544918 -47.741455 /home/ubuntu/ray_results/training_function_202...
2 18.510299 1.034228 True False NaN NaN 10 0b239_00002 aa38dd786c714486a8d69fa5b372df48 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.333781 0 10 0.005286 10 8.129285 28.846415 /home/ubuntu/ray_results/training_function_202...
3 -16.138780 1.020072 True False NaN NaN 10 0b239_00003 5b401e15ab614332b631d552603a8d77 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.242707 0 10 0.003809 10 17.982020 -27.867871 /home/ubuntu/ray_results/training_function_202...

4 rows × 23 columns

Checkpoints, metrics, and the log directory for each trial can be accessed through the ResultGrid output of a Tune experiment. For more information on how to interact with the returned ResultGrid, see Analyzing Tune Experiment Results.

How do I access Tune results after I am finished?#

After you have finished running the Python session, you can still access the results and checkpoints. By default, Tune will save the experiment results to the ~/ray_results local directory. You can configure Tune to persist results in the cloud as well. See How to Configure Persistent Storage in Ray Tune for more information on how to configure storage options for persisting experiment results.

You can restore the Tune experiment by calling Tuner.restore(path_or_cloud_uri, trainable), where path_or_cloud_uri points to a location either on the filesystem or cloud where the experiment was saved to. After the Tuner has been restored, you can access the results and checkpoints by calling Tuner.get_results() to receive the ResultGrid object, and then proceeding as outlined in the previous section.