How to Configure Storage Options for a Distributed Tune Experiment
Contents
How to Configure Storage Options for a Distributed Tune Experiment#
See also
Before diving into storage options, one can take a look at the different types of data stored by Tune.
Tune allows you to configure persistent storage options to enable following use cases in a distributed Ray cluster:
Trial-level fault tolerance: When trials are restored (e.g. after a node failure or when the experiment was paused), they may be scheduled on different nodes, but still would need access to their latest checkpoint.
Experiment-level fault tolerance: For an entire experiment to be restored (e.g. if the cluster crashes unexpectedly), Tune needs to be able to access the latest experiment state, along with all trial checkpoints to start from where the experiment left off.
Post-experiment analysis: A consolidated location storing data from all trials is useful for post-experiment analysis such as accessing the best checkpoints and hyperparameter configs after the cluster has already been terminated.
Bridge with downstream serving/batch inference tasks: With a configured storage, you can easily access the models and artifacts generated by trials, share them with others or use them in downstream tasks.
Storage Options in Tune#
Tune provides support for three scenarios:
When running Tune on a distributed cluster without any external persistent storage.
When using a network filesystem (NFS) mounted to all machines in the cluster.
When using cloud storage (e.g. AWS S3 or Google Cloud Storage) accessible by all machines in the cluster.
Situation (1) is the default scenario if a network filesystem or cloud storage are not provided. In this scenario, we assume that we only have the local filesystems of each machine in the Ray cluster for storing experiment outputs.
Note
Although we are considering distributed Tune experiments in this guide, a network filesystem or cloud storage can also be configured for single-node experiments. This can be useful to persist your experiment results in external storage if, for example, the instance you run your experiment on clears its local storage after termination.
See also
See SyncConfig
for the full set of configuration options as well as more details.
Configure Tune without external persistent storage#
If you’re using neither a shared filesystem nor cloud storage, Ray Tune will resort to the default mechanism of periodically synchronizing data saved on worker nodes to the head node. This treats the head node’s local filesystem as the main storage location of the distributed Tune experiment.
By default, workers will sync to the head node whenever a trial running on that workers
has finished saving a checkpoint. This can be configured by sync_on_checkpoint
and
sync_period
in SyncConfig
:
from ray import tune
from ray.air.config import RunConfig
tuner = tune.Tuner(
trainable,
run_config=RunConfig(
name="experiment_name",
local_dir="~/ray_results",
sync_config=tune.SyncConfig(
syncer="auto",
# Sync approximately every minute rather than on every checkpoint
sync_on_checkpoint=False,
sync_period=60,
)
)
)
tuner.fit()
In the snippet above, we disabled forceful syncing on trial checkpoints and adjusted the sync period to 60 seconds. Setting the sync period to a lower value (in seconds) will sync from remote nodes more often. This will lead to more robust trial recovery, but it will also lead to more synchronization overhead.
In this example, all experiment results can found on the head node at ~/ray_results/experiment_name
for further processing.
Note
If you don’t provide a SyncConfig
at all, this is the default configuration.
Tip
Please note that this approach is likely the least efficient one - you should always try to use shared or cloud storage if possible when training on a multi-node cluster. Using a network filesystem or cloud storage recommended when training a large number of distributed trials, since the default scenario with many worker nodes can introduce significant overhead.
Configuring Tune with a network filesystem (NFS)#
If all Ray nodes have access to a network filesystem, e.g. AWS EFS or Google Cloud Filestore, they can all write experiment outputs to this directory.
All we need to do is set the shared network filesystem as the path to save results and disable Ray Tune’s default syncing behavior.
from ray import air, tune
tuner = tune.Tuner(
trainable,
run_config=air.RunConfig(
name="experiment_name",
local_dir="/path/to/shared/storage/",
sync_config=tune.SyncConfig(
syncer=None # Disable syncing
)
)
)
tuner.fit()
In this example, all experiment results can be found in the shared storage at /path/to/shared/storage/experiment_name
for further processing.
Configuring Tune with cloud storage (AWS S3, Google Cloud Storage)#
If all nodes in a Ray cluster have access to cloud storage, e.g. AWS S3 or Google Cloud Storage (GCS), then all experiment outputs can be saved in a shared cloud bucket.
We can configure cloud storage by telling Ray Tune to upload to a remote upload_dir
:
from ray import tune
from ray.air.config import RunConfig
tuner = tune.Tuner(
trainable,
run_config=RunConfig(
name="experiment_name",
sync_config=tune.SyncConfig(
upload_dir="s3://bucket-name/sub-path/",
syncer="auto",
)
)
)
tuner.fit()
syncer="auto"
automatically configures a default syncer that uses pyarrow to
perform syncing with the specified cloud upload_dir
.
The syncer
config can also take in a custom Syncer
if you want to implement custom logic for uploading/downloading from the cloud.
See How can I upload my Tune results to cloud storage? and How can I use the awscli or gsutil command line commands for syncing?
for more details and examples of custom syncing.
In this example, all experiment results can be found in the shared storage at s3://bucket-name/sub-path/experiment_name
/path/to/shared/storage/experiment_name
for further processing.
Note
The head node will not have access to all experiment results locally. If you want to process e.g. the best checkpoint further, you will first have to fetch it from the cloud storage.
Experiment restoration should also be done using the experiment directory at the cloud storage URI, rather than the local experiment directory on the head node. See here for an example.
Examples#
Let’s show some examples of configuring storage location and synchronization options. We’ll also show how to resume the experiment for each of the examples, in the case that your experiment gets interrupted. See How to Define Stopping Criteria for a Ray Tune Experiment for more information on resuming experiments.
In each example, we’ll give a practical explanation of how trial checkpoints are saved across the cluster and the external storage location (if one is provided). See Appendix: Types of data stored by Tune for an overview of other experiment data that Tune needs to persist.
Example: Running Tune with cloud storage#
Let’s assume that you’re running this example script from your Ray cluster’s head node.
In the example below, my_trainable
is a Tune trainable
that implements saving and loading checkpoints.
import ray
from ray import air, tune
from your_module import my_trainable
# Look for the existing cluster and connect to it
ray.init()
# Configure how experiment data and checkpoints are sync'd
# We recommend cloud storage checkpointing as it survives the cluster when
# instances are terminated and has better performance
sync_config = tune.SyncConfig(
upload_dir="s3://my-checkpoints-bucket/path/", # requires AWS credentials
)
tuner = tune.Tuner(
my_trainable,
run_config=air.RunConfig(
# Name of your experiment
name="my-tune-exp",
# Directory where each node's results are stored before being
# sync'd to cloud storage
local_dir="/tmp/mypath",
# See above! we will sync our checkpoints to S3 directory
sync_config=sync_config,
checkpoint_config=air.CheckpointConfig(
# We'll keep the best five checkpoints at all times
# (with the highest AUC scores, a metric reported by the trainable)
checkpoint_score_attribute="max-auc",
checkpoint_score_order="max",
num_to_keep=5,
),
),
)
# This starts the run!
results = tuner.fit()
In this example, here’s how trial checkpoints will be saved:
- On head node where we are running from:
/tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step>
(but only for trials running on this node)
- On worker nodes:
/tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step>
(but only for trials running on this node)
- S3:
s3://my-checkpoints-bucket/path/my-tune-exp/<trial_name>/checkpoint_<step>
(all trials)
If this run stopped for any reason (ex: user CTRL+C, terminated due to out of memory issues), you can resume it any time starting from the experiment checkpoint state saved in the cloud:
from ray import tune
tuner = tune.Tuner.restore(
"s3://my-checkpoints-bucket/path/my-tune-exp",
trainable=my_trainable,
resume_errored=True
)
tuner.fit()
There are a few options for restoring an experiment:
resume_unfinished
, resume_errored
and restart_errored
.
Please see the documentation of
Tuner.restore()
for more details.
Example: Running Tune without external persistent storage (default scenario)#
Now, let’s take a look at an example using default syncing behavior described above. Again, we’re running this example script from the Ray cluster’s head node.
import ray
from ray import tune
from your_module import my_trainable
# Look for the existing cluster and connect to it
ray.init()
# This starts the run!
tuner = tune.Tuner(
my_trainable,
run_config=air.RunConfig(
name="my-tune-exp",
local_dir="/tmp/mypath",
# Use the default syncing behavior
# You don't have to pass an empty sync config - but we
# do it here for clarity and comparison
sync_config=tune.SyncConfig(),
checkpoint_config=air.CheckpointConfig(
checkpoint_score_attribute="max-auc",
checkpoint_score_order="max",
num_to_keep=5,
),
)
)
In this example, here’s how trial checkpoints will be saved:
- On head node where we are running from:
/tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step>
(all trials, since they have been synced to the head node)
- On worker nodes:
/tmp/mypath/my-tune-exp/<trial_name>/checkpoint_<step>
(but only for trials running on this node)
This experiment can be resumed from the head node:
from ray import tune
tuner = tune.Tuner.restore(
"/tmp/mypath/my-tune-exp",
trainable=my_trainable,
resume_errored=True
)
tuner.fit()