Get Started with Distributed Training using Horovod#
Ray Train configures the Horovod environment and Rendezvous
server for you, allowing you to run your DistributedOptimizer
training
script. See the Horovod documentation
for more information.
Quickstart#
import os
import tempfile
import horovod.torch as hvd
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
import ray.train.torch # Need this to use `train.torch.get_device()`
from ray.train.horovod import HorovodTrainer
import torch
import torch.nn as nn
# If using GPUs, set this to True.
use_gpu = False
input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.layer1 = nn.Linear(input_size, layer_size)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(layer_size, output_size)
def forward(self, input):
return self.layer2(self.relu(self.layer1(input)))
def train_loop_per_worker():
hvd.init()
dataset_shard = train.get_dataset_shard("train")
model = NeuralNetwork()
device = train.torch.get_device()
model.to(device)
loss_fn = nn.MSELoss()
lr_scaler = 1
optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
op=hvd.Average,
)
for epoch in range(num_epochs):
model.train()
for batch in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
):
inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
outputs = model(inputs)
loss = loss_fn(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(
{"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)
)
train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=scaling_config,
datasets={"train": train_dataset},
)
result = trainer.fit()
Update your training function#
First, update your training function to support distributed training.
If you have a training function that already runs with the Horovod Ray Executor, you shouldn’t need to make any additional changes.
To onboard onto Horovod, visit the Horovod guide.
Create a HorovodTrainer#
Trainer
s are the primary Ray Train classes to use to manage state and
execute training. For Horovod, use a HorovodTrainer
that you can setup like this:
from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = HorovodTrainer(
train_func,
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)
When training with Horovod, always use a HorovodTrainer, irrespective of the training framework, for example, PyTorch or TensorFlow.
To customize the backend setup, you can pass a
HorovodConfig
:
from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer, HorovodConfig
trainer = HorovodTrainer(
train_func,
tensorflow_backend=HorovodConfig(...),
scaling_config=ScalingConfig(num_workers=2),
)
For more configurability, see the DataParallelTrainer
API.
Run a training function#
With a distributed training function and a Ray Train Trainer
, you are now
ready to start training.
trainer.fit()
Further reading#
Ray Train’s HorovodTrainer
replaces the distributed
communication backend of the native libraries with its own implementation.
Thus, the remaining integration points remain the same. If you’re using Horovod
with PyTorch or Tensorflow,
refer to the respective guides for further configuration
and information.
If you are implementing your own Horovod-based training routine without using any of the training libraries, read through the User Guides, as you can apply much of the content to generic use cases and adapt them easily.