Distributed PyTorch

The RaySGD TorchTrainer simplifies distributed model training for PyTorch.

../_images/raysgd-actors.svg

Tip

Get in touch with us if you’re using or considering using RaySGD!

The TorchTrainer is a wrapper around torch.distributed.launch with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to wrap your training code in bash scripts.

For end to end examples leveraging RaySGD TorchTrainer, jump to TorchTrainer Examples.

Basic Usage

Setting up training

Tip

If you want to leverage multi-node data parallel training with PyTorch while using RayTune without using RaySGD, check out the Tune PyTorch user guide and Tune’s distributed pytorch integrations.

The TorchTrainer can be constructed from a custom PyTorch TrainingOperator subclass that defines training components like the model, data, optimizer, loss, and lr_scheduler. These components are all automatically replicated across different machines and devices so that training can be executed in parallel.

Warning

You should call self.register(...) and self.register_data(...) inside the setup method of your custom TrainingOperator to register the necessary training components with Ray SGD.

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

from ray.util.sgd.torch import TrainingOperator
from ray.util.sgd.torch.examples.train_example import LinearDataset

class MyTrainingOperator(TrainingOperator):
    def setup(self, config):
        # Setup all components needed for training here. This could include
        # data, models, optimizers, loss & schedulers.

        # Setup data loaders.
        train_dataset, val_dataset = LinearDataset(2, 5), LinearDataset(2,
                                                                        5)
        train_loader = DataLoader(train_dataset,
                                  batch_size=config["batch_size"])
        val_loader = DataLoader(val_dataset,
                                batch_size=config["batch_size"])

        # Setup model.
        model = nn.Linear(1, 1)

        # Setup optimizer.
        optimizer = torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4))

        # Setup loss.
        criterion = torch.nn.BCELoss()

        # Setup scheduler.
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9)

        # Register all of these components with Ray SGD.
        # This allows Ray SGD to do framework level setup like Cuda, DDP,
        # Distributed Sampling, FP16.
        # We also assign the return values of self.register to instance
        # attributes so we can access it in our custom training/validation
        # methods.
        self.model, self.optimizer, self.criterion, self.scheduler = \
            self.register(models=model, optimizers=optimizer,
                          criterion=criterion,
                          schedulers=scheduler)
        self.register_data(train_loader=train_loader, validation_loader=val_loader)

Under the hood, TorchTrainer will create replicas of your model (controlled by num_workers), each of which is managed by a Ray actor.

Before instantiating the trainer, first start or connect to a Ray cluster:

import ray

ray.init()
# or ray.init(address="auto") to connect to a running cluster.

And then you can instantiate the trainer object using your custom TrainingOperator:

from ray.util.sgd import TorchTrainer

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    scheduler_step_freq="epoch",  # if scheduler is used
    config={"lr": 0.001, "batch_size": 64})

You can also set the number of workers and whether the workers will use GPUs:

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    config={"lr": 0.001},
    num_workers=100,
    use_gpu=True)

Executing Training

Now that the trainer is constructed, here’s how to train the model.

for i in range(10):
    metrics = trainer.train()
    val_metrics = trainer.validate()

Each train call makes one pass over the training data (trains on 1 epoch), and each validate call runs the model on the validation data. Override training and validation methods in your Training Operator (Custom Training and Validation) to calculate custom metrics or customize the training/validation process.

Tip

Setting the batch size: Using a provided ray.util.sgd.utils.BATCH_SIZE variable, you can provide a global batch size that will be divided among all workers automatically.

from torch.utils.data import DataLoader
from ray.util.sgd.utils import BATCH_SIZE

class MyTrainingOperator(TrainingOperator):
    def setup(self, config):
        ...
        # Create data loaders.
        # config[BATCH_SIZE] == provided BATCH_SIZE // num_workers
        train_dataset, val_dataset = LinearDataset(2, 5), LinearDataset(2, 5)
        train_loader = DataLoader(train_dataset, batch_size=config[BATCH_SIZE])
        val_loader = DataLoader(val_dataset, batch_size=config[BATCH_SIZE])
        ...
trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    config={BATCH_SIZE: 1024},
    num_workers=128
)

# Each worker will process 1024 // 128 samples per batch
stats = Trainer.train()

You can also obtain profiling information:

>>> from ray.tune.logger import pretty_print
>>> print(pretty_print(trainer.train(profile=True)))

batch_count: 16
epoch: 1
last_train_loss: 0.15574650466442108
mean_train_loss: 7.475177114367485
num_samples: 1000
profile:
  mean_apply_s: 2.639293670654297e-05
  mean_fwd_s: 0.00012960433959960938
  mean_grad_s: 0.00016553401947021483
  train_epoch_s: 0.023712158203125

After training, you may want to reappropriate the Ray cluster. To release Ray resources obtained by the Trainer:

trainer.shutdown()

Note

Be sure to call trainer.save() or trainer.get_model() before shutting down.

See the documentation on the TorchTrainer here: TorchTrainer.

See the documentation on the TrainingOperator here: PyTorch TrainingOperator.

Custom Training and Validation

If you would like to implement custom training and validation logic, you can do so by overriding the appropiate methods inside your PyTorch TrainingOperator subclass.

For both training and validation, there are two granularities that you can provide customization - per epoch and per batch. These correspond to train_batch, train_epoch, validate, and validate_batch. Other useful methods to override include state_dict and load_state_dict. You can use these to save and load additional state for your custom TrainingOperator.

Custom training is necessary if you are using multiple models, optimizers, or schedulers.

Below is a partial example of a custom TrainingOperator that provides a train_batch implementation for a Deep Convolutional GAN.

import torch
from ray.util.sgd.torch import TrainingOperator

class GANOperator(TrainingOperator):
    def setup(self, config):
        """Setup for this operator.

        This is where you define the training state and register it with Ray SGD.

        Args:
            config (dict): Custom configuration value to be passed to
                all creator and operator constructors. Same as ``self.config``.
        """
        ...
        self.models, self.optimizers, ... = self.register(...)
        self.register_data(...)

    def train_batch(self, batch, batch_info):
        """Trains on one batch of data from the data creator.

        Example taken from:
            https://github.com/eriklindernoren/PyTorch-GAN/blob/
            a163b82beff3d01688d8315a3fd39080400e7c01/implementations/dcgan/dcgan.py

        Args:
            batch: One item of the validation iterator.
            batch_info (dict): Information dict passed in from ``train_epoch``.

        Returns:
            A dict of metrics. Defaults to "loss" and "num_samples",
                corresponding to the total number of datapoints in the batch.
        """
        Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
        discriminator, generator = self.models
        optimizer_D, optimizer_G = self.optimizers

        # Adversarial ground truths
        valid = Variable(Tensor(batch.shape[0], 1).fill_(1.0), requires_grad=False)
        fake = Variable(Tensor(batch.shape[0], 1).fill_(0.0), requires_grad=False)

        # Configure input
        real_imgs = Variable(batch.type(Tensor))

        # -----------------
        #  Train Generator
        # -----------------

        optimizer_G.zero_grad()

        # Sample noise as generator input
        z = Variable(Tensor(np.random.normal(0, 1, (
                batch.shape[0], self.config["latent_dim"]))))

        # Generate a batch of images
        gen_imgs = generator(z)

        # Loss measures generator's ability to fool the discriminator
        g_loss = adversarial_loss(discriminator(gen_imgs), valid)

        g_loss.backward()
        optimizer_G.step()

        # ---------------------
        #  Train Discriminator
        # ---------------------

        optimizer_D.zero_grad()

        # Measure discriminator's ability to classify real from generated samples
        real_loss = adversarial_loss(discriminator(real_imgs), valid)
        fake_loss = adversarial_loss(discriminator(gen_imgs.detach()), fake)
        d_loss = (real_loss + fake_loss) / 2

        d_loss.backward()
        optimizer_D.step()

        return {
            "loss_g": g_loss.item(),
            "loss_d": d_loss.item(),
            "num_samples": imgs.shape[0]
        }

trainer = TorchTrainer(
    training_operator_cls=GANOperator,
    num_workers=num_workers,
    config=config,
    use_gpu=True
)

for i in range(5):
    stats = trainer.train()
    print(stats)

See the DCGAN example for an end to end example. It constructs two models and two optimizers and uses a custom training operator to provide a non-standard training loop.

Custom DistributedDataParallel Wrappers

TorchTrainer automatically applies a DistributedDataParallel wrapper to your model.

DistributedDataParallel(model, device_ids=self.device_ids)

You can also pass in additional arguments to DistributedDataParallel by setting the ddp_args field in your TrainingOperator.

from ray.util.sgd.torch import TrainingOperator

class CustomOperator(TrainingOperator):
    def setup(self, config):
        ...
        self.model, ... = self.register(..., ddp_args={"find_unused_parameters": True})

If you want to use a custom wrapper for distributed training or if you want to wrap in DistributedDataParallel yourself, you can do so by setting TorchTrainer(wrap_ddp=False).

Note

Make sure to register the model before it is wrapped in DistributedDataParallel or a custom wrapper.

from ray.util.sgd.torch import TrainingOperator

class CustomOperator(TrainingOperator):
    def setup(self, config):
        ...
        self.model, ... = self.register(...)
        self.new_model = CustomDataParallel(self.model,
                                             device_ids=self.device_ids)

    def train_batch(self, batch, batch_idx):
        output = self.new_model(batch)
        # calculate loss, etc
        return {"loss": loss}

trainer = TorchTrainer(
    training_operator_cls=CustomOperator,
    num_workers=2,
    use_gpu=True
    wrap_ddp=False)

Backwards Compatibility

In previous versions of Ray, creator functions (model_creator, optimizer_creator, etc.) were necessary to setup the training components. These creator functions are no longer used and instead training component setup should be specified inside the setup method of a TrainingOperator subclass. However, if you have these creator functions already and do not want to change your code, you can easily use these creator functions to create a custom TrainingOperator.

from ray.util.sgd import TorchTrainer

MyTrainingOperator = TrainingOperator.from_creators(
    model_creator=model_creator, optimizer_creator=optimizer_creator,
    loss_creator=loss_creator, scheduler_creator=scheduler_creator,
    data_creator=data_creator)

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    scheduler_step_freq="epoch",  # if scheduler_creator is passed in
    config={"lr": 0.001, "batch_size": 64})

Initialization Functions

Use the initialization_hook parameter to initialize state on each worker process when they are started. This is useful when setting an environment variable:

def initialization_hook():
    print("NCCL DEBUG SET")
    # Need this for avoiding a connection restart issue
    os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
    os.environ["NCCL_LL_THRESHOLD"] = "0"
    os.environ["NCCL_DEBUG"] = "INFO"

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    initialization_hook=initialization_hook,
    config={"lr": 0.001}
    num_workers=100,
    use_gpu=True)

Save and Load

If you want to save or reload the training procedure, you can use trainer.save and trainer.load, which wraps the relevant torch.save and torch.load calls. This should work across a distributed cluster even without a NFS because it takes advantage of Ray’s distributed object store.

Tip

Make sure to override the state_dict and load_state_dict methods in your custom TrainingOperator if necessary.

checkpoint_path = os.path.join(tempfile.mkdtemp(), "checkpoint")
trainer_1.save(checkpoint_path)
# You can only have 1 trainer alive at a time
trainer_1.shutdown()

trainer_2 = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    num_workers=num_workers)
trainer_2.load(checkpoint_path)

Retrieving the model

The trained torch model can be extracted for use within the same Python program with trainer.get_model(). This will load the state dictionary of the model(s).

trainer.train()
model = trainer.get_model()  # Returns multiple models if the model_creator does.

Training & Validation Results

The output for trainer.train() and trainer.validate() are first collected on a per-batch basis. These results are then averaged: first across each batch in the epoch, and then across all workers.

By default, the output of train contains the following:

# Total number of samples trained on in this epoch.
num_samples
# Current training epoch.
epoch
# Number of batches trained on in this epoch averaged across all workers.
batch_count
# Training loss averaged across all batches on all workers.
train_loss
# Training loss for the last batch in epoch averaged across all workers.
last_train_loss

And for validate:

# Total number of samples validated on.
num_samples
# Number of batches validated on averaged across all workers.
batch_count
# Validation loss averaged across all batches on all workers.
val_loss
# Validation loss for last batch averaged across all workers.
last_val_loss
# Validation accuracy for last batch averaged across all workers.
val_accuracy
# Validation accuracy for last batch averaged across all workers.
last_val_accuracy

If train or validate are run with reduce_results=False, results are not averaged across workers and a list of results for each worker is returned. If run with profile=True, timing stats for a single worker is returned alongside the results above.

To add additional metrics to return you should implement your own custom training operator (Custom Training and Validation). If overriding train_batch or validate_batch, the result outputs are automatically averaged across all batches, and the results for the last batch are automatically returned. If overriding train_epoch or validate you may find ray.util.sgd.utils.AverageMeterCollection (Utils) useful to handle this averaging.

Mixed Precision (FP16) Training

You can enable mixed precision training for PyTorch with the use_fp16 flag. This automatically converts the model(s) and optimizer(s) to train using mixed-precision. This requires NVIDIA Apex, which can be installed from the NVIDIA/Apex repository:

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    num_workers=4,
    use_fp16=True)

Apex is a Pytorch extension with NVIDIA-maintained utilities to streamline mixed precision and distributed training. When use_fp16=True, you should not manually cast your model or data to .half(). The flag informs the Trainer to call amp.initialize on the created models and optimizers and optimize using the scaled loss: amp.scale_loss(loss, optimizer).

To specify particular parameters for amp.initialize, you can use the apex_args field when calling self.register in your TrainingOperator. Valid arguments can be found on the Apex documentation:

class MyTrainingOperator(TrainingOperator):
    def setup(self, config):
        models = [...]
        optimizers = [...]
        model, optimizer = self.register(
            models=models,
            optimizers=optimizers,
            apex_args={
                opt_level="03",
                num_losses=2,
                verbosity=0
            }
        )

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    num_workers=4,
    use_fp16=True
)

Note that if implementing custom training (Custom Training and Validation), you will need to manage loss scaling manually.

Distributed Multi-node Training

You can scale your training to multiple nodes without making any modifications to your training code.

To train across a cluster, first make sure that the Ray cluster is started (see Distributed Ray Overview for more details).

Then, in your program, you’ll need to connect to this cluster via ray.init:

ray.init(address="auto")  # or a specific redis address of the form "ip-address:port"

After connecting, you can scale up the number of workers seamlessly across multiple nodes:

trainer = TorchTrainer(
    training_operator_cls=MyTrainingOperator,
    num_workers=100
)
trainer.train()
model = trainer.get_model()

Advanced: Fault Tolerance

For distributed deep learning, jobs are often run on infrastructure where nodes can be pre-empted frequently (i.e., spot instances in the cloud). To overcome this, RaySGD provides fault tolerance features that enable training to continue regardless of node failures.

trainer.train(max_retries=N)

During each train method, each parallel worker iterates through the iterable, synchronizing gradients and parameters at each batch. These synchronization primitives can hang when one or more of the parallel workers becomes unresponsive (i.e., when a node is lost). To address this, we’ve implemented the following protocol.

  1. If any worker node is lost, Ray will mark the training task as complete (ray.wait will return).

  2. Ray will throw RayActorException when fetching the result for any worker, so the Trainer class will call ray.get on the “finished” training task.

  3. Upon catching this exception, the Trainer class will kill all of its workers.

  4. The Trainer will then detect the quantity of available resources (either CPUs or GPUs). It will then restart as many workers as it can, each resuming from the last checkpoint. Note that this may result in fewer workers than initially specified.

  5. If there are no available resources, the Trainer will apply an exponential backoff before retrying to create workers.

  6. If there are available resources and the Trainer has fewer workers than initially specified, then it will scale up its worker pool until it reaches the initially specified num_workers.

Note that we assume the Trainer itself is not on a pre-emptible node. To allow the entire Trainer to recover from failure, you must use Tune to execute the training.

Simultaneous Multi-model Training

In certain scenarios, such as training GANs, you may want to use multiple models in the training loop. You can do this by registering multiple models, optimizers, or schedulers in the setup method of TrainingOperator. You must implement custom training and validation (Custom Training and Validation) to train across multiple models.

You can see the DCGAN script for an end-to-end example.

from ray.util.sgd.torch import TorchTrainer, TrainingOperator

def train(*, model=None, criterion=None, optimizer=None, dataloader=None):
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(dataloader):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    return {
        "accuracy": correct / total,
        "train_loss": train_loss / (batch_idx + 1)
    }

def model_creator(config):
    return Discriminator(), Generator()

def optimizer_creator(models, config):
    net_d, net_g = models
    discriminator_opt = optim.Adam(
        net_d.parameters(), lr=config.get("lr", 0.01), betas=(0.5, 0.999))
    generator_opt = optim.Adam(
        net_g.parameters(), lr=config.get("lr", 0.01), betas=(0.5, 0.999))
    return discriminator_opt, generator_opt

class CustomOperator(TrainingOperator):
    def setup(self, config):
        net_d = Discriminator()
        net_g = Generator()

        d_opt = optim.Adam(
            net_d.parameters(), lr=config.get("lr", 0.01), betas=(0.5, 0.999))
        g_opt = optim.Adam(
            net_g.parameters(), lr=config.get("lr", 0.01), betas=(0.5, 0.999))

        # Setup data loaders, loss, schedulers here.
        ...

        # Register all the components.
        self.models, self.optimizers, ... = self.register(models=(net_d, net_g), optimizers=(d_opt, g_opt), ...)
        self.register_data(...)

    def train_epoch(self, iterator, info):
        result = {}
        for i, (model, optimizer) in enumerate(
                zip(self.models, self.optimizers)):
            result["model_{}".format(i)] = train(
                model=model,
                criterion=self.criterion,
                optimizer=optimizer,
                dataloader=iterator)
        return result

trainer = TorchTrainer(training_operator_cls=CustomOperator)

stats = trainer.train()

Benchmarks

RaySGD TorchTrainer provides comparable or better performance than other existing solutions for parallel or distributed training.

Multi-GPU (Single Node) benchmarks:

# Images per second for ResNet50
# Batch size per worker = 128
# GPU Type = V100
# Run on AWS us-east-1c, p3dn.24xlarge instance.


Number   DataParallel  Ray (PyTorch)  DataParallel  Ray (PyTorch)
of GPUs                               + Apex        + Apex
=======  ============  =============  ============  ==============
1        355.5         356            776           770
2        656           701            1303          1346
4        1289          1401           2606          2695
8        2521          2795           4795          5862

Multi-node benchmarks:

# Images per second for ResNet50
# Batch size per worker = 128
# GPU Type = V100
# Run on AWS us-east-1c, p3dn.24xlarge instances.

Number   Horovod  Ray (PyTorch)  Horovod  Ray (PyTorch)
of GPUs                          + Apex   + Apex
=======  =======  =============  =======  ==============
1 * 8    2769.7   2962.7         5143     6172
2 * 8    5492.2   5886.1         9463     10052.8
4 * 8    10733.4  11705.9        18807    20319.5
8 * 8    21872.5  23317.9        36911.8  38642

You can see more details in the benchmarking README.

DISCLAIMER: RaySGD does not provide any custom communication primitives. If you see any performance issues, you may need to file them on the PyTorch github repository.

Debugging/Tips

Here’s some simple tips on how to debug the TorchTrainer.

My TorchTrainer implementation is erroring after I ported things over from my previous code.

Try using ipdb and num_workers=1. This will provide you introspection what is being called and when.

# first run pip install ipdb

from ray.util.sgd.torch import TrainingOperator

class CustomOperator(TrainingOperator):
    def setup(self, config):
        import ipdb; ipdb.set_trace()
        ...

    def train_batch(self, batch, batch_idx):
        import ipdb; ipdb.set_trace()
        ... # press 'n' or 's' to navigate the session
        ... # custom code if exists?
        ... # or super(CustomOperator, self).train_batch(batch, batch_idx)


trainer = TorchTrainer(
    training_operator_cls=GANOperator,
    num_workers=1,
)

My TorchTrainer implementation is super slow.

Try using a profiler. Either use:

trainer.train(profile=True)
trainer.validate(profile=True)

or use Python profiling.

My setup function downloads data, and I don’t want multiple processes downloading to the same path at once.

Use FileLock to create locks for critical regions. For example:

import os
import tempfile
from filelock import FileLock

def create_dataset(config):
    dataset_path = config["dataset_path"]

    # Create a critical region of the code
    # This will take a longer amount of time to download the data at first.
    # Other processes will block at the ``with`` statement.
    # After downloading, this code block becomes very fast.
    with FileLock(os.path.join(tempfile.gettempdir(), "download_data.lock")):
        if not os.path.exists(dataset_path):
            download_data(dataset_path)

    # load_data is assumed to safely support concurrent reads.
    data = load_data(dataset_path)
    return DataLoader(data)

I get a ‘socket timeout’ error during training.

Try increasing the length of the NCCL timeout. The current timeout is 10 seconds.

NCCL_TIMEOUT_S=1000 python ray_training_script.py

# or

NCCL_TIMEOUT_S=1000 ray start [--head | --address]

Feature Requests

Have features that you’d really like to see in RaySGD? Feel free to open an issue.

TorchTrainer Examples

Here are some examples of using RaySGD for training PyTorch models. If you’d like to contribute an example, feel free to create a pull request here.