Ray Train: Distributed Deep Learning

Tip

Get in touch with us if you’re using or considering using Ray Train!

Ray Train is a lightweight library for distributed deep learning, allowing you to scale up and speed up training for your deep learning models.

The main features are:

  • Ease of use: Scale your single process training code to a cluster in just a couple lines of code.

  • Composability: Ray Train interoperates with Ray Tune to tune your distributed model and Ray Datasets to train on large amounts of data.

  • Interactivity: Ray Train fits in your workflow with support to run from any environment, including seamless Jupyter notebook support.

Note

This API is in its Beta release (as of Ray 1.9) and may be revised in future Ray releases. If you encounter any bugs, please file an issue on GitHub.

Note

Ray Train replaces Ray SGD as the standard library for distributed deep learning on Ray. Ray SGD has been fully deprecated as of Ray 1.13. If you are using an older version of Ray and are looking for the Ray SGD docs, you can find them in the Ray 1.12 docs.

Intro to Ray Train

Ray Train is a library that aims to simplify distributed deep learning.

Frameworks: Ray Train is built to abstract away the coordination/configuration setup of distributed deep learning frameworks such as Pytorch Distributed and Tensorflow Distributed, allowing users to only focus on implementing training logic.

  • For Pytorch, Ray Train automatically handles the construction of the distributed process group.

  • For Tensorflow, Ray Train automatically handles the coordination of the TF_CONFIG. The current implementation assumes that the user will use a MultiWorkerMirroredStrategy, but this will change in the near future.

  • For Horovod, Ray Train automatically handles the construction of the Horovod runtime and Rendezvous server.

Built for data scientists/ML practitioners: Ray Train has support for standard ML tools and features that practitioners love:

  • Callbacks for early stopping

  • Checkpointing

  • Integration with TensorBoard, Weights/Biases, and MLflow

  • Jupyter notebooks

Integration with Ray Ecosystem: Distributed deep learning often comes with a lot of complexity.

  • Use Ray Datasets with Ray Train to handle and train on large amounts of data.

  • Use Ray Tune with Ray Train to leverage cutting edge hyperparameter techniques and distribute both your training and tuning.

  • You can leverage the Ray cluster launcher to launch autoscaling or spot instance clusters to train your model at scale on any cloud.

Quick Start

Ray Train abstracts away the complexity of setting up a distributed training system. Let’s take following simple examples:

This example shows how you can use Ray Train with PyTorch.

First, set up your dataset and model.

import torch
import torch.nn as nn

num_samples = 20
input_size = 10
layer_size = 15
output_size = 5

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

# In this example we use a randomly generated dataset.
input = torch.randn(num_samples, input_size)
labels = torch.randn(num_samples, output_size)

Now define your single-worker PyTorch training function.


import torch.optim as optim

def train_func():
    num_epochs = 3
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    for epoch in range(num_epochs):
        output = model(input)
        loss = loss_fn(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

This training function can be executed with:


    train_func()

Now let’s convert this to a distributed multi-worker training function!

All you have to do is use the ray.train.torch.prepare_model and ray.train.torch.prepare_data_loader utility functions to easily setup your model & data for distributed training. This will automatically wrap your model with DistributedDataParallel and place it on the right device, and add DistributedSampler to your DataLoaders.


from ray import train
import ray.train.torch

def train_func_distributed():
    num_epochs = 3
    model = NeuralNetwork()
    model = train.torch.prepare_model(model)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    for epoch in range(num_epochs):
        output = model(input)
        loss = loss_fn(output, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

Then, instantiate a Trainer that uses a "torch" backend with 4 workers, and use it to run the new training function!


    from ray.train import Trainer

    trainer = Trainer(backend="torch", num_workers=4)

    # For GPU Training, set `use_gpu` to True.
    # trainer = Trainer(backend="torch", num_workers=4, use_gpu=True)

    trainer.start()
    results = trainer.run(train_func_distributed)
    trainer.shutdown()

See Porting code to Ray Train for a more comprehensive example.

This example shows how you can use Ray Train to set up Multi-worker training with Keras.

First, set up your dataset and model.


import numpy as np
import tensorflow as tf


def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset


def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model

Now define your single-worker TensorFlow training function.


def train_func():
    batch_size = 64
    single_worker_dataset = mnist_dataset(batch_size)
    single_worker_model = build_and_compile_cnn_model()
    single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

This training function can be executed with:


    train_func()

Now let’s convert this to a distributed multi-worker training function! All you need to do is:

  1. Set the global batch size - each worker will process the same size batch as in the single-worker code.

  2. Choose your TensorFlow distributed training strategy. In this example we use the MultiWorkerMirroredStrategy.


import json
import os

def train_func_distributed():
    per_worker_batch_size = 64
    # This environment variable will be set by Ray Train.
    tf_config = json.loads(os.environ['TF_CONFIG'])
    num_workers = len(tf_config['cluster']['worker'])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model()

    multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Then, instantiate a Trainer that uses a "tensorflow" backend with 4 workers, and use it to run the new training function!


    from ray.train import Trainer

    trainer = Trainer(backend="tensorflow", num_workers=4)

    # For GPU Training, set `use_gpu` to True.
    # trainer = Trainer(backend="tensorflow", num_workers=4, use_gpu=True)

    trainer.start()
    results = trainer.run(train_func_distributed)
    trainer.shutdown()

See Porting code to Ray Train for a more comprehensive example.

Next steps: Check out the User Guide!