How to Configure Storage Options for a Distributed Tune Experiment#

See also

Before diving into storage options, one can take a look at the different types of data stored by Tune.

Tune allows you to configure persistent storage options to enable following use cases in a distributed Ray cluster:

  • Trial-level fault tolerance: When trials are restored (e.g. after a node failure or when the experiment was paused), they may be scheduled on different nodes, but still would need access to their latest checkpoint.

  • Experiment-level fault tolerance: For an entire experiment to be restored (e.g. if the cluster crashes unexpectedly), Tune needs to be able to access the latest experiment state, along with all trial checkpoints to start from where the experiment left off.

  • Post-experiment analysis: A consolidated location storing data from all trials is useful for post-experiment analysis such as accessing the best checkpoints and hyperparameter configs after the cluster has already been terminated.

  • Bridge with downstream serving/batch inference tasks: With a configured storage, you can easily access the models and artifacts generated by trials, share them with others or use them in downstream tasks.

Storage Options in Tune#

Tune provides support for three scenarios:

  1. When running Tune on a distributed cluster without any external persistent storage.

  2. When using a network filesystem (NFS) mounted to all machines in the cluster.

  3. When using cloud storage (e.g. AWS S3 or Google Cloud Storage) accessible by all machines in the cluster.

Situation (1) is the default scenario if a network filesystem or cloud storage are not provided. In this scenario, we assume that we only have the local filesystems of each machine in the Ray cluster for storing experiment outputs.

Note

Although we are considering distributed Tune experiments in this guide, a network filesystem or cloud storage can also be configured for single-node experiments. This can be useful to persist your experiment results in external storage if, for example, the instance you run your experiment on clears its local storage after termination.

See also

See SyncConfig for the full set of configuration options as well as more details.

Configure Tune without external persistent storage#

If you’re using neither a shared filesystem nor cloud storage, Ray Tune will resort to the default mechanism of periodically synchronizing data saved on worker nodes to the head node. This treats the head node’s local filesystem as the main storage location of the distributed Tune experiment.

By default, workers will sync to the head node whenever a trial running on that workers has finished saving a checkpoint. This can be configured by sync_on_checkpoint and sync_period in SyncConfig:

from ray import tune
from ray.air.config import RunConfig

tuner = tune.Tuner(
    trainable,
    run_config=RunConfig(
        name="experiment_name",
        storage_path="~/ray_results",
        sync_config=tune.SyncConfig(
            syncer="auto",
            # Sync approximately every minute rather than on every checkpoint
            sync_on_checkpoint=False,
            sync_period=60,
        )
    )
)
tuner.fit()

In the snippet above, we disabled forceful syncing on trial checkpoints and adjusted the sync period to 60 seconds. Setting the sync period to a lower value (in seconds) will sync from remote nodes more often. This will lead to more robust trial recovery, but it will also lead to more synchronization overhead.

In this example, all experiment results can found on the head node at ~/ray_results/experiment_name for further processing.

Note

If you don’t provide a SyncConfig at all, this is the default configuration.

Tip

Please note that this approach is likely the least efficient one - you should always try to use shared or cloud storage if possible when training on a multi-node cluster. Using a network filesystem or cloud storage recommended when training a large number of distributed trials, since the default scenario with many worker nodes can introduce significant overhead.

Configuring Tune with a network filesystem (NFS)#

If all Ray nodes have access to a network filesystem, e.g. AWS EFS or Google Cloud Filestore, they can all write experiment outputs to this directory.

All we need to do is set the shared network filesystem as the path to save results and disable Ray Tune’s default syncing behavior.

from ray import air, tune

tuner = tune.Tuner(
    trainable,
    run_config=air.RunConfig(
        name="experiment_name",
        storage_path="/path/to/shared/storage/",
        sync_config=tune.SyncConfig(
            syncer=None  # Disable syncing
        )
    )
)
tuner.fit()

In this example, all experiment results can be found in the shared storage at /path/to/shared/storage/experiment_name for further processing.

Configuring Tune with cloud storage (AWS S3, Google Cloud Storage)#

If all nodes in a Ray cluster have access to cloud storage, e.g. AWS S3 or Google Cloud Storage (GCS), then all experiment outputs can be saved in a shared cloud bucket.

We can configure cloud storage by telling Ray Tune to upload to a remote storage_path:

from ray import tune
from ray.air.config import RunConfig

tuner = tune.Tuner(
    trainable,
    run_config=RunConfig(
        name="experiment_name",
        storage_path="s3://bucket-name/sub-path/",
    )
)
tuner.fit()

Ray AIR automatically configures a default syncer that uses pyarrow to perform syncing with the specified cloud storage_path. You can also pass a custom Syncer object to a tune.SyncConfig within the air.RunConfig if you want to implement custom logic for uploading/downloading from the cloud. See How can I upload my Tune results to cloud storage? and How can I use the awscli or gsutil command line commands for syncing? for more details and examples of custom syncing.

In this example, all experiment results can be found in the shared storage at s3://bucket-name/sub-path/experiment_name for further processing.

Note

The head node will not have access to all experiment results locally. If you want to process e.g. the best checkpoint further, you will first have to fetch it from the cloud storage.

Experiment restoration should also be done using the experiment directory at the cloud storage URI, rather than the local experiment directory on the head node. See here for an example.

Examples#

Let’s show some examples of configuring storage location and synchronization options. We’ll also show how to resume the experiment for each of the examples, in the case that your experiment gets interrupted. See How to Define Stopping Criteria for a Ray Tune Experiment for more information on resuming experiments.

In each example, we’ll give a practical explanation of how trial checkpoints are saved across the cluster and the external storage location (if one is provided). See Appendix: Types of data stored by Tune for an overview of other experiment data that Tune needs to persist.

Example: Running Tune with cloud storage#

Let’s assume that you’re running this example script from your Ray cluster’s head node.

In the example below, my_trainable is a Tune trainable that implements saving and loading checkpoints.

import os
import ray
from ray import air, tune
from your_module import my_trainable

# Look for the existing cluster and connect to it
ray.init()

# Set the local caching directory. Results will be stored here
# before they are synced to remote storage. This env variable is ignored
# if `storage_path` below is set to a local directory.
os.environ["RAY_AIR_LOCAL_CACHE_DIR"] = "/tmp/mypath"

tuner = tune.Tuner(
    my_trainable,
    run_config=air.RunConfig(
        # Name of your experiment
        name="my-tune-exp",
        # Configure how experiment data and checkpoints are persisted.
        # We recommend cloud storage checkpointing as it survives the cluster when
        # instances are terminated and has better performance.
        storage_path="s3://my-checkpoints-bucket/path/",
        checkpoint_config=air.CheckpointConfig(
            # We'll keep the best five checkpoints at all times
            # (with the highest AUC scores, a metric reported by the trainable)
            checkpoint_score_attribute="max-auc",
            checkpoint_score_order="max",
            num_to_keep=5,
        ),
    ),
)
# This starts the run!
results = tuner.fit()

In this example, here’s how trial checkpoints will be saved:

  • On head node where we are running from:
    • /tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step> (but only for trials running on this node)

  • On worker nodes:
    • /tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step> (but only for trials running on this node)

  • S3:
    • s3://my-checkpoints-bucket/path/my-tune-exp/<trial_name>/checkpoint_<step> (all trials)

If this run stopped for any reason (ex: user CTRL+C, terminated due to out of memory issues), you can resume it any time starting from the experiment checkpoint state saved in the cloud:

from ray import tune
tuner = tune.Tuner.restore(
    "s3://my-checkpoints-bucket/path/my-tune-exp",
    trainable=my_trainable,
    resume_errored=True
)
tuner.fit()

There are a few options for restoring an experiment: resume_unfinished, resume_errored and restart_errored. Please see the documentation of Tuner.restore() for more details.

Example: Running Tune without external persistent storage (default scenario)#

Now, let’s take a look at an example using default syncing behavior described above. Again, we’re running this example script from the Ray cluster’s head node.

import ray
from ray import tune
from your_module import my_trainable

# Look for the existing cluster and connect to it
ray.init()

# This starts the run!
tuner = tune.Tuner(
    my_trainable,
    run_config=air.RunConfig(
        name="my-tune-exp",
        storage_path="/tmp/mypath",
        checkpoint_config=air.CheckpointConfig(
            checkpoint_score_attribute="max-auc",
            checkpoint_score_order="max",
            num_to_keep=5,
        ),
    )
)

In this example, here’s how trial checkpoints will be saved:

  • On head node where we are running from:
    • /tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step> (all trials, since they have been synced to the head node)

  • On worker nodes:
    • /tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step> (but only for trials running on this node)

This experiment can be resumed from the head node:

from ray import tune
tuner = tune.Tuner.restore(
    "/tmp/mypath/my-tune-exp",
    trainable=my_trainable,
    resume_errored=True
)
tuner.fit()