Get Started with Distributed Training using Hugging Face Accelerate#

The TorchTrainer can help you easily launch your Accelerate training across a distributed Ray cluster.

You only need to run your existing training code with a TorchTrainer. You can expect the final code to look like this:

from accelerate import Accelerator

def train_func(config):
    # Instantiate the accelerator
    accelerator = Accelerator(...)

    model = ...
    optimizer = ...
    train_dataloader = ...
    eval_dataloader = ...
    lr_scheduler = ...

    # Prepare everything for distributed training
    (
        model,
        optimizer,
        train_dataloader,
        eval_dataloader,
        lr_scheduler,
    ) = accelerator.prepare(
        model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
    )

    # Start training
    ...

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(...),
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
    ...
)
trainer.fit()

Tip

Model and data preparation for distributed training is completely handled by the Accelerator object and its Accelerator.prepare() method.

Unlike with native PyTorch, don’t call any additional Ray Train utilities like prepare_model() or prepare_data_loader() in your training function.

Configure Accelerate#

In Ray Train, you can set configurations through the accelerate.Accelerator object in your training function. Below are starter examples for configuring Accelerate.

For example, to run DeepSpeed with Accelerate, create a DeepSpeedPlugin from a dictionary:

from accelerate import Accelerator, DeepSpeedPlugin

DEEPSPEED_CONFIG = {
    "fp16": {
        "enabled": True
    },
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": False
        },
        "overlap_comm": True,
        "contiguous_gradients": True,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "gather_16bit_weights_on_model_save": True,
        "round_robin_gradients": True
    },
    "gradient_accumulation_steps": "auto",
    "gradient_clipping": "auto",
    "steps_per_print": 10,
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": "auto",
    "wall_clock_breakdown": False
}

def train_func(config):
    # Create a DeepSpeedPlugin from config dict
    ds_plugin = DeepSpeedPlugin(hf_ds_config=DEEPSPEED_CONFIG)

    # Initialize Accelerator
    accelerator = Accelerator(
        ...,
        deepspeed_plugin=ds_plugin,
    )

    # Start training
    ...

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(...),
    run_config=ray.train.RunConfig(storage_path="s3://..."),
    ...
)
trainer.fit()

For PyTorch FSDP, create a FullyShardedDataParallelPlugin and pass it to the Accelerator.

from torch.distributed.fsdp.fully_sharded_data_parallel import FullOptimStateDictConfig, FullStateDictConfig
from accelerate import Accelerator, FullyShardedDataParallelPlugin

def train_func(config):
    fsdp_plugin = FullyShardedDataParallelPlugin(
        state_dict_config=FullStateDictConfig(
            offload_to_cpu=False,
            rank0_only=False
        ),
        optim_state_dict_config=FullOptimStateDictConfig(
            offload_to_cpu=False,
            rank0_only=False
        )
    )

    # Initialize accelerator
    accelerator = Accelerator(
        ...,
        fsdp_plugin=fsdp_plugin,
    )

    # Start training
    ...

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(...),
    run_config=ray.train.RunConfig(storage_path="s3://..."),
    ...
)
trainer.fit()

Note that Accelerate also provides a CLI tool, "accelerate config", to generate a configuration and launch your training job with "accelerate launch". However, it’s not necessary here because Ray’s TorchTrainer already sets up the Torch distributed environment and launches the training function on all workers.

Next, see these end-to-end examples below for more details:

Show Code
"""
Minimal Ray Train and Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py

Fine-tune a BERT model with Hugging Face Accelerate and Ray Train and Ray Data
"""

from tempfile import TemporaryDirectory

import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from torch.optim import AdamW
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
    set_seed,
)

import ray
import ray.train
from ray.train import Checkpoint, DataConfig, ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    """Your training function that launches on each worker."""

    # Unpack training configs
    lr = config["lr"]
    seed = config["seed"]
    num_epochs = config["num_epochs"]
    train_batch_size = config["train_batch_size"]
    eval_batch_size = config["eval_batch_size"]
    train_ds_size = config["train_dataset_size"]

    set_seed(seed)

    # Initialize accelerator
    accelerator = Accelerator()

    # Load datasets and metrics
    metric = evaluate.load("glue", "mrpc")

    # Prepare Ray Data loaders
    # ====================================================
    train_ds = ray.train.get_dataset_shard("train")
    eval_ds = ray.train.get_dataset_shard("validation")

    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def collate_fn(batch):
        outputs = tokenizer(
            list(batch["sentence1"]),
            list(batch["sentence2"]),
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
        outputs["labels"] = torch.LongTensor(batch["label"])
        outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
        return outputs

    train_dataloader = train_ds.iter_torch_batches(
        batch_size=train_batch_size, collate_fn=collate_fn
    )
    eval_dataloader = eval_ds.iter_torch_batches(
        batch_size=eval_batch_size, collate_fn=collate_fn
    )
    # ====================================================

    # Instantiate the model, optimizer, lr_scheduler
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", return_dict=True
    )

    optimizer = AdamW(params=model.parameters(), lr=lr)

    steps_per_epoch = train_ds_size // (accelerator.num_processes * train_batch_size)
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=100,
        num_training_steps=(steps_per_epoch * num_epochs),
    )

    # Prepare everything with accelerator
    model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)

    for epoch in range(num_epochs):
        # Training
        model.train()
        for batch in train_dataloader:
            outputs = model(**batch)
            loss = outputs.loss
            accelerator.backward(loss)
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

        # Evaluation
        model.eval()
        for batch in eval_dataloader:
            with torch.no_grad():
                outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)

            predictions, references = accelerator.gather_for_metrics(
                (predictions, batch["labels"])
            )
            metric.add_batch(
                predictions=predictions,
                references=references,
            )

        eval_metric = metric.compute()
        accelerator.print(f"epoch {epoch}:", eval_metric)

        # Report checkpoint and metrics to Ray Train
        # ==========================================
        with TemporaryDirectory() as tmpdir:
            if accelerator.is_main_process:
                unwrapped_model = accelerator.unwrap_model(model)
                accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
                checkpoint = Checkpoint.from_directory(tmpdir)
            else:
                checkpoint = None
            ray.train.report(metrics=eval_metric, checkpoint=checkpoint)


if __name__ == "__main__":
    config = {
        "lr": 2e-5,
        "num_epochs": 3,
        "seed": 42,
        "train_batch_size": 16,
        "eval_batch_size": 32,
    }

    # Prepare Ray Datasets
    hf_datasets = load_dataset("glue", "mrpc")
    ray_datasets = {
        "train": ray.data.from_huggingface(hf_datasets["train"]),
        "validation": ray.data.from_huggingface(hf_datasets["validation"]),
    }
    config["train_dataset_size"] = ray_datasets["train"].count()

    trainer = TorchTrainer(
        train_func,
        train_loop_config=config,
        datasets=ray_datasets,
        dataset_config=DataConfig(datasets_to_split=["train", "validation"]),
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
        # If running in a multi-node cluster, this is where you
        # should configure the run's persistent storage that is accessible
        # across all worker nodes.
        # run_config=ray.train.RunConfig(storage_path="s3://..."),
    )

    result = trainer.fit()

Show Code
"""
Minimal Ray Train + Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py

Fine-tune a BERT model with Hugging Face Accelerate and Ray Train
"""

from tempfile import TemporaryDirectory

import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
    set_seed,
)

import ray.train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    """Your training function that will be launched on each worker."""

    # Unpack training configs
    lr = config["lr"]
    seed = config["seed"]
    num_epochs = config["num_epochs"]
    train_batch_size = config["train_batch_size"]
    eval_batch_size = config["eval_batch_size"]

    set_seed(seed)

    # Initialize accelerator
    accelerator = Accelerator()

    # Load datasets and metrics
    metric = evaluate.load("glue", "mrpc")

    # Prepare PyTorch DataLoaders
    # ====================================================
    hf_datasets = load_dataset("glue", "mrpc")

    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def collate_fn(batch):
        outputs = tokenizer(
            [sample["sentence1"] for sample in batch],
            [sample["sentence2"] for sample in batch],
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
        outputs["labels"] = torch.LongTensor([sample["label"] for sample in batch])
        outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
        return outputs

    # Instantiate dataloaders.
    train_dataloader = DataLoader(
        hf_datasets["train"],
        shuffle=True,
        collate_fn=collate_fn,
        batch_size=train_batch_size,
        drop_last=True,
    )
    eval_dataloader = DataLoader(
        hf_datasets["validation"],
        shuffle=False,
        collate_fn=collate_fn,
        batch_size=eval_batch_size,
        drop_last=True,
    )
    # ====================================================

    # Instantiate the model, optimizer, lr_scheduler
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", return_dict=True
    )

    optimizer = AdamW(params=model.parameters(), lr=lr)

    steps_per_epoch = len(train_dataloader)
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=100,
        num_training_steps=(steps_per_epoch * num_epochs),
    )

    # Prepare everything with accelerator
    (
        model,
        optimizer,
        train_dataloader,
        eval_dataloader,
        lr_scheduler,
    ) = accelerator.prepare(
        model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
    )

    for epoch in range(num_epochs):
        # Training
        model.train()
        for batch in train_dataloader:
            outputs = model(**batch)
            loss = outputs.loss
            accelerator.backward(loss)
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

        # Evaluation
        model.eval()
        for batch in eval_dataloader:
            with torch.no_grad():
                outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)

            predictions, references = accelerator.gather_for_metrics(
                (predictions, batch["labels"])
            )
            metric.add_batch(
                predictions=predictions,
                references=references,
            )

        eval_metric = metric.compute()
        accelerator.print(f"epoch {epoch}:", eval_metric)

        # Report Checkpoint and metrics to Ray Train
        # ==========================================
        with TemporaryDirectory() as tmpdir:
            if accelerator.is_main_process:
                unwrapped_model = accelerator.unwrap_model(model)
                accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
                checkpoint = Checkpoint.from_directory(tmpdir)
            else:
                checkpoint = None
            ray.train.report(metrics=eval_metric, checkpoint=checkpoint)


if __name__ == "__main__":
    config = {
        "lr": 2e-5,
        "num_epochs": 3,
        "seed": 42,
        "train_batch_size": 16,
        "eval_batch_size": 32,
    }

    trainer = TorchTrainer(
        train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
        # If running in a multi-node cluster, this is where you
        # should configure the run's persistent storage that is accessible
        # across all worker nodes.
        # run_config=ray.train.RunConfig(storage_path="s3://..."),
    )

    result = trainer.fit()

See also

If you’re looking for more advanced use cases, check out this Llama-2 fine-tuning example:

You may also find these user guides helpful:

AccelerateTrainer Migration Guide#

Before Ray 2.7, Ray Train’s AccelerateTrainer API was the recommended way to run Accelerate code. As a subclass of TorchTrainer, the AccelerateTrainer takes in a configuration file generated by accelerate config and applies it to all workers. Aside from that, the functionality of AccelerateTrainer is identical to TorchTrainer.

However, this caused confusion around whether this was the only way to run Accelerate code. Because you can express the full Accelerate functionality with the Accelerator and TorchTrainer combination, the plan is to deprecate the AccelerateTrainer in Ray 2.8, and it’s recommend to run your Accelerate code directly with TorchTrainer.