Get Started with Distributed Training using Horovod#

Ray Train configures the Horovod environment and Rendezvous server for you, allowing you to run your DistributedOptimizer training script. See the Horovod documentation for more information.


import os
import tempfile

import horovod.torch as hvd
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
import ray.train.torch  # Need this to use `train.torch.get_device()`
from ray.train.horovod import HorovodTrainer
import torch
import torch.nn as nn

# If using GPUs, set this to True.
use_gpu = False

input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

def train_loop_per_worker():
    dataset_shard = train.get_dataset_shard("train")
    model = NeuralNetwork()
    device = train.torch.get_device()
    loss_fn = nn.MSELoss()
    lr_scaler = 1
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
    for epoch in range(num_epochs):
        for batch in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
            inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            print(f"epoch: {epoch}, loss: {loss.item()}")

        with tempfile.TemporaryDirectory() as tmpdir:
  , os.path.join(tmpdir, ""))
                {"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)

train_dataset =[{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
    datasets={"train": train_dataset},
result =

Update your training function#

First, update your training function to support distributed training.

If you have a training function that already runs with the Horovod Ray Executor, you shouldn’t need to make any additional changes.

To onboard onto Horovod, visit the Horovod guide.

Create a HorovodTrainer#

Trainers are the primary Ray Train classes to use to manage state and execute training. For Horovod, use a HorovodTrainer that you can setup like this:

from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = HorovodTrainer(
    scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)

When training with Horovod, always use a HorovodTrainer, irrespective of the training framework, for example, PyTorch or TensorFlow.

To customize the backend setup, you can pass a HorovodConfig:

from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer, HorovodConfig

trainer = HorovodTrainer(

For more configurability, see the DataParallelTrainer API.

Run a training function#

With a distributed training function and a Ray Train Trainer, you are now ready to start training.

Further reading#

Ray Train’s HorovodTrainer replaces the distributed communication backend of the native libraries with its own implementation. Thus, the remaining integration points remain the same. If you’re using Horovod with PyTorch or Tensorflow, refer to the respective guides for further configuration and information.

If you are implementing your own Horovod-based training routine without using any of the training libraries, read through the User Guides, as you can apply much of the content to generic use cases and adapt them easily.