RaySGD API

Trainer

class ray.util.sgd.v2.Trainer(backend: Union[str, ray.util.sgd.v2.backends.backend.BackendConfig], num_workers: int = 1, use_gpu: bool = False, resources_per_worker: Optional[Dict[str, float]] = None, logdir: Optional[str] = None, max_retries: int = 3)[source]

A class for enabling seamless distributed deep learning.

Directory structure: - A logdir is created during instantiation. This will hold all the results/checkpoints for the lifetime of the Trainer. By default, it will be of the form ~/ray_results/sgd_<datestring>. - A run_dir is created for each run call. This will hold the checkpoints and results for a single trainer.run() or trainer.run_iterator() call. It will be of the form run_<run_id>.

Parameters
  • backend (Union[str, BackendConfig]) – The backend used for distributed communication. If configurations are needed, a subclass of BackendConfig can be passed in. Supported str values: {“torch”, “tensorflow”, “horovod”}.

  • num_workers (int) – The number of workers (Ray actors) to launch. Defaults to 1. Each worker will reserve 1 CPU by default. The number of CPUs reserved by each worker can be overridden with the resources_per_worker argument.

  • use_gpu (bool) – If True, training will be done on GPUs (1 per worker). Defaults to False. The number of GPUs reserved by each worker can be overridden with the resources_per_worker argument.

  • resources_per_worker (Optional[Dict]) – If specified, the resources defined in this Dict will be reserved for each worker. The CPU and GPU keys (case-sensitive) can be defined to override the number of CPU/GPUs used by each worker.

  • logdir (Optional[str]) –

    Path to the file directory where logs

    should be persisted. If this is not specified, one will be generated.

    max_retries (int): Number of retries when Ray actors fail.

    Defaults to 3. Set to -1 for unlimited retries.

create_logdir(log_dir: Union[str, pathlib.Path, None]) → pathlib.Path[source]

Create logdir for the Trainer.

create_run_dir()[source]

Create rundir for the particular training run.

start(initialization_hook: Optional[Callable[[], None]] = None)[source]

Starts the training execution service.

Parameters

initialization_hook (Optional[Callable]) – The function to call on each worker when it is instantiated.

run(train_func: Union[Callable[], T], Callable[[Dict[str, Any]], T]], config: Optional[Dict[str, Any]] = None, callbacks: Optional[List[ray.util.sgd.v2.callbacks.callback.SGDCallback]] = None, checkpoint: Union[Dict, str, pathlib.Path, None] = None, checkpoint_strategy: Optional[ray.util.sgd.v2.checkpoint.CheckpointStrategy] = None) → List[T][source]

Runs a training function in a distributed manner.

Parameters
  • train_func (Callable) – The training function to execute. This can either take in no arguments or a config dict.

  • config (Optional[Dict]) – Configurations to pass into train_func. If None then an empty Dict will be created.

  • callbacks (Optional[List[SGDCallback]]) – A list of Callbacks which will be executed during training. If this is not set, currently there are NO default Callbacks.

  • checkpoint (Optional[Dict|str|Path]) – The checkpoint data that should be loaded onto each worker and accessed by the training function via sgd.load_checkpoint(). If this is a str or Path then the value is expected to be a path to a file that contains a serialized checkpoint dict. If this is None then no checkpoint will be loaded.

  • checkpoint_strategy (Optional[CheckpointStrategy]) – The configurations for saving checkpoints.

Returns

A list of results from the training function. Each value in the list corresponds to the output of the training function from each worker.

run_iterator(train_func: Union[Callable[], T], Callable[[Dict[str, Any]], T]], config: Optional[Dict[str, Any]] = None, checkpoint: Union[Dict, str, pathlib.Path, None] = None, checkpoint_strategy: Optional[ray.util.sgd.v2.checkpoint.CheckpointStrategy] = None) → ray.util.sgd.v2.trainer.SGDIterator[source]

Same as run except returns an iterator over the results.

This is useful if you want to have more customization of what to do with the intermediate results or how to use the Trainer with Ray Tune.

def train_func(config):
    ...
    for _ in config["epochs"]:
        metrics = train()
        metrics = validate(...)
        ray.sgd.report(**metrics)
    return model

iterator = trainer.run_iterator(train_func, config=config)

for result in iterator:
    do_stuff(result)
    latest_ckpt = trainer.get_latest_checkpoint()

assert iterator.is_finished()
model = iterator.get_fin()[0]
Parameters
  • train_func (Callable) – The training function to execute. This can either take in no arguments or a config dict.

  • config (Optional[Dict]) – Configurations to pass into train_func. If None then an empty Dict will be created.

  • checkpoint (Optional[Dict|Path|str]) – The checkpoint data that should be loaded onto each worker and accessed by the training function via sgd.load_checkpoint(). If this is a str or Path then the value is expected to be a path to a file that contains a serialized checkpoint dict. If this is None then no checkpoint will be loaded.

  • checkpoint_strategy (Optional[CheckpointStrategy]) – The configurations for saving checkpoints.

Returns

An Iterator over the intermediate results from sgd.report().

property latest_run_dir

Path to the log directory for the latest call to run().

Returns None if run() has not been called.

property latest_checkpoint_dir

Path to the checkpoint directory.

Returns None if run() has not been called or if sgd.checkpoint() has not been called from train_func``within the most recent call to ``run.

property latest_checkpoint_path

Path to the latest persisted checkpoint from the latest run.

Returns None if run() has not been called or if sgd.checkpoint() has not been called from train_func within the most recent call to run.

property latest_checkpoint

The latest saved checkpoint.

This checkpoint may not be saved to disk.

Returns None if run() has not been called or if sgd.checkpoint() has not been called from train_func.

shutdown()[source]

Shuts down the training execution service.

to_tune_trainable(train_func: Callable[[Dict[str, Any]], T]) → Type[ray.tune.trainable.Trainable][source]

Creates a Tune Trainable from the input training function.

Parameters

func (Callable) – The function that should be executed on each training worker.

Returns

A Trainable that can directly be passed into tune.run().

to_worker_group(train_cls: Type, *args, **kwargs) → ray.util.sgd.v2.trainer.SGDWorkerGroup[source]

Returns Ray actors with the provided class and the backend started.

This is useful if you want to provide your own class for training and have more control over execution, but still want to use Ray SGD to setup the appropriate backend configurations (torch, tf, etc.).

class Trainer:
    def __init__(self, config):
        self.config = config

    def train_epoch(self):
        ...
        return 1

config = {"lr": 0.1}
trainer = Trainer(num_workers=2, backend="torch")
workers = trainer.to_worker_group(train_cls=Trainer, config=config)
futures = [w.train_epoch.remote() for w in workers]
assert ray.get(futures) == [1, 1]
assert ray.get(workers[0].train_epoch.remote()) == 1
workers.shutdown()
Parameters
  • train_cls (Type) – The class definition to use for the Ray actors/workers.

  • kwargs (args,) – Arguments to pass into the __init__ of the provided train_cls.

SGDIterator

class ray.util.sgd.v2.SGDIterator(backend_executor: ray.util.sgd.v2.backends.backend.BackendExecutor, train_func: Union[Callable[], T], Callable[[Dict[str, Any]], T]], checkpoint: Union[Dict, str, pathlib.Path, None], checkpoint_strategy: Optional[ray.util.sgd.v2.checkpoint.CheckpointStrategy], run_dir: pathlib.Path)[source]

An iterator over SGD results. Returned by trainer.run_iterator.

get_final_results(force: bool = False) → List[T][source]

Gets the training func return values from each worker.

If force is True, then immediately finish training and return even if all the intermediate results have not been processed yet. Else, intermediate results must be processed before obtaining the final results. Defaults to False.

BackendConfig

class ray.util.sgd.v2.BackendConfig[source]

Parent class for configurations of training backend.

TorchConfig

class ray.util.sgd.v2.TorchConfig(backend: Optional[str] = None, init_method: str = 'env', timeout_s: int = 1800)[source]

Configuration for torch process group setup.

See https://pytorch.org/docs/stable/distributed.html for more info.

Parameters
  • backend (str) – The backend to use for training. See torch.distributed.init_process_group for more info and valid values. If set to None, nccl will be used if GPUs are requested, else gloo will be used.

  • init_method (str) – The initialization method to use. Either “env” for environment variable initialization or “tcp” for TCP initialization. Defaults to “env”.

  • timeout_s (int) – Seconds for process group operations to timeout.

TensorflowConfig

class ray.util.sgd.v2.TensorflowConfig[source]

HorovodConfig

class ray.util.sgd.v2.HorovodConfig(nics: Optional[Set[str]] = None, verbose: int = 1)[source]

Configurations for Horovod setup.

See https://github.com/horovod/horovod/blob/master/horovod/runner/common/util/settings.py # noqa: E501

Parameters
  • nics (Optional[Set[str]) – Network interfaces that can be used for communication.

  • verbose (int) – Horovod logging verbosity.

SGDCallback

class ray.util.sgd.v2.SGDCallback[source]

Abstract SGD callback class.

handle_result(results: List[Dict], **info)[source]

Called every time sgd.report() is called.

Parameters
  • results (List[Dict]) – List of results from the training function. Each value in the list corresponds to the output of the training function from each worker.

  • **info – kwargs dict for forward compatibility.

start_training(logdir: str, **info)[source]

Called once on training start.

Parameters
  • logdir (str) – Path to the file directory where logs should be persisted.

  • **info – kwargs dict for forward compatibility.

finish_training(error: bool = False, **info)[source]

Called once after training is over.

Parameters
  • error (bool) – If True, there was an exception during training.

  • **info – kwargs dict for forward compatibility.

JsonLoggerCallback

class ray.util.sgd.v2.callbacks.JsonLoggerCallback(logdir: Optional[str] = None, filename: Optional[str] = None, workers_to_log: Union[int, List[int], None] = 0)[source]

Logs SGD results in json format.

Parameters
  • logdir (Optional[str]) – Path to directory where the results file should be. If None, will be set by the Trainer.

  • filename (Optional[str]) – Filename in logdir to save results to.

  • workers_to_log (int|List[int]|None) – Worker indices to log. If None, will log all workers. By default, will log the worker with index 0.

TBXLoggerCallback

class ray.util.sgd.v2.callbacks.TBXLoggerCallback(logdir: Optional[str] = None, worker_to_log: int = 0)[source]

Logs SGD results in TensorboardX format.

Parameters
  • logdir (Optional[str]) – Path to directory where the results file should be. If None, will be set by the Trainer.

  • worker_to_log (int) – Worker index to log. By default, will log the worker with index 0.

CheckpointStrategy

class ray.util.sgd.v2.CheckpointStrategy(num_to_keep: Optional[int] = None, checkpoint_score_attribute: str = '_timestamp', checkpoint_score_order: str = 'max')[source]

Configurable parameters for defining the SGD checkpointing strategy.

Default behavior is to persist all checkpoints to disk. If num_to_keep is set, the default retention policy is to keep the checkpoints with maximum timestamp, i.e. the most recent checkpoints.

Parameters
  • num_to_keep (Optional[int]) – The number of checkpoints to keep on disk for this run. If a checkpoint is persisted to disk after there are already this many checkpoints, then an existing checkpoint will be deleted. If this is None then checkpoints will not be deleted. If this is 0 then no checkpoints will be persisted to disk.

  • checkpoint_score_attribute (str) – The attribute that will be used to score checkpoints to determine which checkpoints should be kept on disk when there are greater than num_to_keep checkpoints. This attribute must be a key from the checkpoint dictionary which has a numerical value.

  • checkpoint_score_order (str) – If “max”, then checkpoints with highest values of checkpoint_score_attribute will be kept. If “min”, then checkpoints with lowest values of checkpoint_score_attribute will be kept.

Training Function Utilities

sgd.report

ray.util.sgd.v2.report(**kwargs) → None[source]

Reports all keyword arguments to SGD as intermediate results.

import time
from ray.util import sgd

def train_func():
    for iter in range(100):
        time.sleep(1)
        sgd.report(hello="world")

trainer = Trainer(backend="torch")
trainer.start()
trainer.run(train_func)
trainer.shutdown()
Parameters

**kwargs – Any key value pair to be reported by SGD. If callbacks are provided, they are executed on these intermediate results.

sgd.load_checkpoint

ray.util.sgd.v2.load_checkpoint() → Optional[Dict][source]

Loads checkpoint data onto the worker.

from ray.util import sgd

def train_func():
    checkpoint = sgd.load_checkpoint()
    for iter in range(checkpoint["epoch"], 5):
        print(iter)

trainer = Trainer(backend="torch")
trainer.start()
trainer.run(train_func, checkpoint={"epoch": 3})
# 3
# 4
trainer.shutdown()
Parameters

**kwargs – Any key value pair to be checkpointed by SGD.

Returns

The most recently saved checkpoint if sgd.save_checkpoint() has been called. Otherwise, the checkpoint that the session was originally initialized with. None if neither exist.

sgd.save_checkpoint

ray.util.sgd.v2.save_checkpoint(**kwargs) → None[source]

Checkpoints all keyword arguments to SGD as restorable state.

import time
from ray.util import sgd

def train_func():
    for iter in range(100):
        time.sleep(1)
        sgd.save_checkpoint(epoch=iter)

trainer = Trainer(backend="torch")
trainer.start()
trainer.run(train_func)
trainer.shutdown()
Parameters

**kwargs – Any key value pair to be checkpointed by SGD.

sgd.world_rank

ray.util.sgd.v2.world_rank() → int[source]

Get the world rank of this worker.

import time
from ray.util import sgd

def train_func():
    for iter in range(100):
        time.sleep(1)
        if sgd.world_rank() == 0:
            print("Worker 0")

trainer = Trainer(backend="torch")
trainer.start()
trainer.run(train_func)
trainer.shutdown()