GPT-J-6B Fine-Tuning with Ray AIR and DeepSpeed#

In this example, we will showcase how to use the Ray AIR for GPT-J fine-tuning. GPT-J is a GPT-2-like causal language model trained on the Pile dataset. This particular model has 6 billion parameters. For more information on GPT-J, click here.

We will use Ray AIR (with the 🤗 Transformers integration) and a pretrained model from Hugging Face hub. Note that you can easily adapt this example to use other similar models.

This example focuses more on the performance and distributed computing aspects of Ray AIR. If you are looking for a more beginner-friendly introduction to Ray AIR 🤗 Transformers integration, see this example.

It is highly recommended to read Ray AIR Key Concepts and Ray Data Key Concepts before starting this example.

Note

To run this example, make sure your Ray cluster has access to at least one GPU with 16 or more GBs of memory. The required amount of memory depends on the model. This notebook is tested with 16 g4dn.4xlarge instances (including the head node). If you wish to use a CPU head node, turn on cloud checkpointing to avoid OOM errors that may happen due to the default behavior of syncing the checkpoint files to the head node.

In this notebook, we will:

  1. Set up Ray

  2. Load the dataset

  3. Preprocess the dataset with Ray AIR

  4. Run the training with Ray AIR

  5. Generate text from prompt with Ray AIR

Uncomment and run the following line in order to install all the necessary dependencies (this notebook is being tested with transformers==4.26.0):

#! pip install "datasets" "evaluate" "accelerate==0.18.0" "transformers>=4.26.0" "torch>=1.12.0" "deepspeed==0.8.3"
import numpy as np
import pandas as pd
import os

Set up Ray #

First, let’s set some global variables. We will use 16 workers, each being assigned 1 GPU and 8 CPUs.

model_name = "EleutherAI/gpt-j-6B"
use_gpu = True
num_workers = 16
cpus_per_worker = 8

We will use ray.init() to initialize a local cluster. By default, this cluster will be comprised of only the machine you are running this notebook on. You can also run this notebook on an Anyscale cluster.

We define a runtime environment to ensure that the Ray workers have access to all the necessary packages. You can omit the runtime_env argument if you have all of the packages already installed on each node in your cluster.

import ray

ray.init(
    runtime_env={
        "pip": [
            "datasets",
            "evaluate",
            # Latest combination of accelerate==0.19.0 and transformers==4.29.0
            # seems to have issues with DeepSpeed process group initialization,
            # and will result in a batch_size validation problem.
            # TODO(jungong) : get rid of the pins once the issue is fixed.
            "accelerate==0.16.0",
            "transformers==4.26.0",
            "torch>=1.12.0",
            "deepspeed==0.9.2",
        ]
    }
)
# THIS SHOULD BE HIDDEN IN DOCS AND ONLY RAN IN CI
# Download the model from our S3 mirror as it's faster

import ray
import subprocess
import ray.util.scheduling_strategies


def force_on_node(node_id: str, remote_func_or_actor_class):
    scheduling_strategy = ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=node_id, soft=False
    )
    options = {"scheduling_strategy": scheduling_strategy}
    return remote_func_or_actor_class.options(**options)


def run_on_every_node(remote_func_or_actor_class, **remote_kwargs):
    refs = []
    for node in ray.nodes():
        if node["Alive"] and node["Resources"].get("GPU", None):
            refs.append(
                force_on_node(node["NodeID"], remote_func_or_actor_class).remote(
                    **remote_kwargs
                )
            )
    return ray.get(refs)


@ray.remote(num_gpus=1)
def download_model():
    from transformers.utils.hub import TRANSFORMERS_CACHE

    path = os.path.expanduser(
        os.path.join(TRANSFORMERS_CACHE, "models--EleutherAI--gpt-j-6B")
    )
    subprocess.run(["mkdir", "-p", os.path.join(path, "snapshots", "main")])
    subprocess.run(["mkdir", "-p", os.path.join(path, "refs")])
    if os.path.exists(os.path.join(path, "refs", "main")):
        return
    subprocess.run(
        [
            "aws",
            "s3",
            "sync",
            "--quiet",
            "s3://large-dl-models-mirror/models--EleutherAI--gpt-j-6B/main/",
            os.path.join(path, "snapshots", "main"),
        ]
    )
    with open(os.path.join(path, "snapshots", "main", "hash"), "r") as f:
        f_hash = f.read().strip()
    with open(os.path.join(path, "refs", "main"), "w") as f:
        f.write(f_hash)
    os.rename(
        os.path.join(path, "snapshots", "main"), os.path.join(path, "snapshots", f_hash)
    )


_ = run_on_every_node(download_model)

Loading the dataset #

We will be fine-tuning the model on the tiny_shakespeare dataset, comprised of 40,000 lines of Shakespeare from a variety of Shakespeare’s plays. The aim will be to make the GPT-J model better at generating text in the style of Shakespeare.

from datasets import load_dataset

print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare")
current_dataset
Loading tiny_shakespeare dataset
Found cached dataset tiny_shakespeare (/home/ray/.cache/huggingface/datasets/tiny_shakespeare/default/1.0.0/b5b13969f09fe8707337f6cb296314fbe06960bd9a868dca39e713e163d27b5e)
DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 1
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 1
    })
    test: Dataset({
        features: ['text'],
        num_rows: 1
    })
})

We will use Ray Data for distributed preprocessing and data ingestion. We can easily convert the dataset obtained from Hugging Face Hub to Ray Data by using ray.data.from_huggingface().

import ray.data

ray_datasets = ray.data.from_huggingface(current_dataset)
ray_datasets
{'train': Dataset(num_blocks=1, num_rows=1, schema={text: string}),
 'validation': Dataset(num_blocks=1, num_rows=1, schema={text: string}),
 'test': Dataset(num_blocks=1, num_rows=1, schema={text: string})}

Because the dataset is represented by a single large string, we will need to do some preprocessing. For that, we will define two Ray AIR Preprocessors using the BatchMapper API, allowing us to define functions that will be applied on batches of data.

The split_text function will take the single string and split it into separate lines, removing empty lines and character names ending with ‘:’ (eg. ‘ROMEO:’). The tokenize function will take the lines and tokenize them using the 🤗 Tokenizer associated with the model, ensuring each entry has the same length (block_size) by padding and truncating. This is necessary for training.

Note

This preprocessing can be done in other ways. A common pattern is to tokenize first, and then split the obtained tokens into equally-sized blocks.

We will use the splitter and tokenizer Preprocessors below.

block_size = 512
from transformers import AutoTokenizer

from ray.data.preprocessors import BatchMapper


def split_text(batch: pd.DataFrame) -> pd.DataFrame:
    text = list(batch["text"])
    flat_text = "".join(text)
    split_text = [
        x.strip()
        for x in flat_text.split("\n")
        if x.strip() and not x.strip()[-1] == ":"
    ]
    return pd.DataFrame(split_text, columns=["text"])


def tokenize(batch: pd.DataFrame) -> dict:
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
    tokenizer.pad_token = tokenizer.eos_token
    ret = tokenizer(
        list(batch["text"]),
        truncation=True,
        max_length=block_size,
        padding="max_length",
        return_tensors="np",
    )
    ret["labels"] = ret["input_ids"].copy()
    return dict(ret)


splitter = BatchMapper(split_text, batch_format="pandas")
tokenizer = BatchMapper(tokenize, batch_format="pandas")

Fine-tuning the model with Ray AIR #

We can now configure Ray AIR’s TransformersTrainer to perform distributed fine-tuning of the model. In order to do that, we specify a trainer_init_per_worker function, which creates a 🤗 Transformers Trainer that will be distributed by Ray using Distributed Data Parallelism (using PyTorch Distributed backend internally). This means that each worker will have its own copy of the model, but operate on different data, At the end of each step, all the workers will sync gradients.

Because GPT-J is a relatively large model, it may not be possible to fit it on smaller GPU types (<=16 GB GRAM). To deal with that issue, we can use DeepSpeed, a library to optimize the training process and allow us to (among other things) offload and partition optimizer and parameter states, reducing GRAM usage. Furthermore, DeepSpeed ZeRO Stage 3 allows us to load large models without running out of memory.

🤗 Transformers and Ray AIR’s integration (TransformersTrainer) allow you to easily configure and use DDP and DeepSpeed. All you need to do is specify the DeepSpeed configuration in the TrainingArguments object.

Tip

There are many DeepSpeed settings that allow you to trade-off speed for memory usage. The settings used below are tailored to the cluster setup used (16 g4dn.4xlarge nodes) and per device batch size of 16. Some things to keep in mind:

  • If your GPUs support bfloat16, use that instead of float16 mixed precision to get better performance and prevent overflows. Replace fp16=True with bf16=True in TrainingArguments.

  • If you are running out of GRAM: try reducing batch size (defined in the cell below the next one), set "overlap_comm": False in DeepSpeed config.

  • If you are running out of RAM, add more nodes to your cluster, use nodes with more RAM, set "pin_memory": False in the DeepSpeed config, reduce the batch size, and remove "offload_param" from the DeepSpeed config.

For more information on DeepSpeed configuration, refer to Hugging Face documentation and DeepSpeed documentation.

Additionally, if you prefer a lower-level API, the logic below can be expressed as an Accelerate training loop distributed by a Ray AIR TorchTrainer.

Training speed#

As we are using data parallelism, each worker operates on its own shard of the data. The batch size set in TrainingArguments is the per device batch size (per worker batch size). By changing the number of workers, we can change the effective batch size and thus the time needed for training to complete. The effective batch size is then calculated as per device batch size * number of workers * number of gradient accumulation steps. As we add more workers, the effective batch size rises and thus we need less time to complete a full epoch. While the speedup is not exactly linear due to extra communication overheads, in many cases it can be close to linear.

The preprocessed dataset has 1348 examples. We have set per device batch size to 16.

  • With 16 g4dn.4xlarge nodes, the effective batch size was 256, which equals to 85 steps per epoch. One epoch took ~2440 seconds (including initialization time).

  • With 32 g4dn.4xlarge nodes, the effective batch size was 512, which equals to 43 steps per epoch. One epoch took ~1280 seconds (including initialization time).

import evaluate
from transformers import Trainer, TrainingArguments
from transformers import (
    GPTJForCausalLM,
    AutoTokenizer,
    default_data_collator,
)
from transformers.utils.logging import disable_progress_bar, enable_progress_bar
import torch

from ray.air import session


def trainer_init_per_worker(train_dataset, eval_dataset=None, **config):
    # Use the actual number of CPUs assigned by Ray
    os.environ["OMP_NUM_THREADS"] = str(
        session.get_trial_resources().bundles[-1].get("CPU", 1)
    )
    # Enable tf32 for better performance
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True,
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True,
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        per_device_train_batch_size=batch_size,
        logging_steps=1,
        save_strategy="no",
        per_device_eval_batch_size=batch_size,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        num_train_epochs=epochs,
        push_to_hub=False,
        disable_tqdm=True,  # declutter the output a little
        fp16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )
    disable_progress_bar()

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    print("Loading model")

    model = GPTJForCausalLM.from_pretrained(model_name, use_cache=False)
    model.resize_token_embeddings(len(tokenizer))

    print("Model loaded")

    enable_progress_bar()

    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=default_data_collator,
    )
    return trainer

With our trainer_init_per_worker complete, we can now instantiate the TransformersTrainer. Aside from the function, we set the scaling_config, controlling the amount of workers and resources used, and the datasets we will use for training and evaluation.

We pass the preprocessors we have defined earlier as an argument, wrapped in a Chain. The preprocessor will be included with the returned Checkpoint, meaning it will also be applied during inference.

Note

If you want to upload checkpoints to cloud storage (eg. S3), set air.RunConfig(storage_path). See Run Configuration in Train (RunConfig) for an example. Using cloud storage is highly recommended, especially for production.

from ray.train.huggingface import TransformersTrainer
from ray.air.config import ScalingConfig
from ray.data.preprocessors import Chain


trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    trainer_init_config={
        "batch_size": 16,  # per device
        "epochs": 1,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=use_gpu,
        resources_per_worker={"GPU": 1, "CPU": cpus_per_worker},
    ),
    datasets={"train": ray_datasets["train"], "evaluation": ray_datasets["validation"]},
    preprocessor=Chain(splitter, tokenizer),
)

Finally, we call the fit() method to start training with Ray AIR. We will save the Result object to a variable so we can access metrics and checkpoints.

results = trainer.fit()

Tune Status

Current time:2023-03-06 17:18:41
Running for: 00:43:11.46
Memory: 31.9/62.0 GiB

System Info

Using FIFO scheduling algorithm.
Resources requested: 0/256 CPUs, 0/16 GPUs, 0.0/675.29 GiB heap, 0.0/291.99 GiB objects (0.0/16.0 accelerator_type:T4)

Trial Status

Trial name status loc iter total time (s) loss learning_rate epoch
TransformersTrainer_f623d_00000TERMINATED10.0.30.196:30861 85 2579.30.0715 4.70588e-07 1
(RayTrainWorker pid=31281) 2023-03-06 16:36:00,447	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=1964, ip=10.0.26.83) /tmp/ray/session_2023-03-06_15-55-37_997701_162/runtime_resources/py_modules_files/_ray_pkg_f864ba6869d6802c/ray/train/_internal/dataset_iterator.py:64: UserWarning: session.get_dataset_shard returns a ray.data.DataIterator instead of a Dataset/DatasetPipeline as of Ray v2.3. Use iter_torch_batches(), to_tf(), or iter_batches() to iterate over one epoch. See https://docs.ray.io/en/latest/data/api/dataset_iterator.html for full DataIterator docs.
(RayTrainWorker pid=1964, ip=10.0.26.83)   warnings.warn(
(RayTrainWorker pid=1964, ip=10.0.26.83) 2023-03-06 16:36:00,453	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=1963, ip=10.0.54.163) /tmp/ray/session_2023-03-06_15-55-37_997701_162/runtime_resources/py_modules_files/_ray_pkg_f864ba6869d6802c/ray/train/_internal/dataset_iterator.py:64: UserWarning: session.get_dataset_shard returns a ray.data.DataIterator instead of a Dataset/DatasetPipeline as of Ray v2.3. Use iter_torch_batches(), to_tf(), or iter_batches() to iterate over one epoch. See https://docs.ray.io/en/latest/data/api/dataset_iterator.html for full DataIterator docs.
(RayTrainWorker pid=1963, ip=10.0.54.163)   warnings.warn(
(RayTrainWorker pid=1963, ip=10.0.54.163) 2023-03-06 16:36:00,452	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=1954, ip=10.0.15.115) /tmp/ray/session_2023-03-06_15-55-37_997701_162/runtime_resources/py_modules_files/_ray_pkg_f864ba6869d6802c/ray/train/_internal/dataset_iterator.py:64: UserWarning: session.get_dataset_shard returns a ray.data.DataIterator instead of a Dataset/DatasetPipeline as of Ray v2.3. Use iter_torch_batches(), to_tf(), or iter_batches() to iterate over one epoch. See https://docs.ray.io/en/latest/data/api/dataset_iterator.html for full DataIterator docs.
(RayTrainWorker pid=1954, ip=10.0.15.115)   warnings.warn(
(RayTrainWorker pid=1954, ip=10.0.15.115) 2023-03-06 16:36:00,452	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=1955, ip=10.0.58.255) /tmp/ray/session_2023-03-06_15-55-37_997701_162/runtime_resources/py_modules_files/_ray_pkg_f864ba6869d6802c/ray/train/_internal/dataset_iterator.py:64: UserWarning: session.get_dataset_shard returns a ray.data.DataIterator instead of a Dataset/DatasetPipeline as of Ray v2.3. Use iter_torch_batches(), to_tf(), or iter_batches() to iterate over one epoch. See https://docs.ray.io/en/latest/data/api/dataset_iterator.html for full DataIterator docs.
(RayTrainWorker pid=1955, ip=10.0.58.255)   warnings.warn(
(RayTrainWorker pid=1955, ip=10.0.58.255) 2023-03-06 16:36:00,453	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=1942, ip=10.0.57.85) 2023-03-06 16:36:00,452	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=1963, ip=10.0.29.205) 2023-03-06 16:36:00,452	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=1942, ip=10.0.51.113) 2023-03-06 16:36:00,454	INFO bulk_executor.py:41 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[BatchMapper]
(RayTrainWorker pid=31281) Preparing training arguments
(RayTrainWorker pid=31281) Loading model
(RayTrainWorker pid=31281) [2023-03-06 16:37:21,252] [INFO] [partition_parameters.py:415:__exit__] finished initializing model with 6.05B parameters
(RayTrainWorker pid=31281) Model loaded
(RayTrainWorker pid=31281) Using cuda_amp half precision backend
(RayTrainWorker pid=31281) [2023-03-06 16:38:03,431] [INFO] [logging.py:75:log_dist] [Rank 0] DeepSpeed info: version=0.8.1, git-hash=unknown, git-branch=unknown
(RayTrainWorker pid=31281) [2023-03-06 16:38:03,450] [INFO] [logging.py:75:log_dist] [Rank 0] DeepSpeed Flops Profiler Enabled: False
(RayTrainWorker pid=31281) ***** Running training *****
(RayTrainWorker pid=31281)   Num examples = 1348
(RayTrainWorker pid=31281)   Num Epochs = 1
(RayTrainWorker pid=31281)   Instantaneous batch size per device = 16
(RayTrainWorker pid=31281)   Total train batch size (w. parallel, distributed & accumulation) = 256
(RayTrainWorker pid=31281)   Gradient Accumulation steps = 1
(RayTrainWorker pid=31281)   Total optimization steps = 85
(RayTrainWorker pid=31281)   Number of trainable parameters = 0
(RayTrainWorker pid=31281) /home/ray/anaconda3/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py:2387: UserWarning: torch.distributed._all_gather_base is a private function and will be deprecated. Please use torch.distributed.all_gather_into_tensor instead.
(RayTrainWorker pid=31281)   warnings.warn(
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,024] [INFO] [logging.py:75:log_dist] [Rank 0] DeepSpeed Final Optimizer = adamw
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,024] [INFO] [logging.py:75:log_dist] [Rank 0] DeepSpeed using client callable to create LR scheduler
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,025] [INFO] [logging.py:75:log_dist] [Rank 0] DeepSpeed LR Scheduler = <torch.optim.lr_scheduler.LambdaLR object at 0x7f10a01d7ee0>
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,025] [INFO] [logging.py:75:log_dist] [Rank 0] step=0, skipped=0, lr=[2e-05], mom=[[0.9, 0.999]]
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,025] [INFO] [config.py:1009:print] DeepSpeedEngine configuration:
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,026] [INFO] [config.py:1013:print]   activation_checkpointing_config  {
(RayTrainWorker pid=31281)     "partition_activations": false, 
(RayTrainWorker pid=31281)     "contiguous_memory_optimization": false, 
(RayTrainWorker pid=31281)     "cpu_checkpointing": false, 
(RayTrainWorker pid=31281)     "number_checkpoints": null, 
(RayTrainWorker pid=31281)     "synchronize_checkpoint_boundary": false, 
(RayTrainWorker pid=31281)     "profile": false
(RayTrainWorker pid=31281) }
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,026] [INFO] [config.py:1013:print]   aio_config ................... {'block_size': 1048576, 'queue_depth': 8, 'thread_count': 1, 'single_submit': False, 'overlap_events': True}
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,026] [INFO] [config.py:1013:print]   amp_enabled .................. False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,026] [INFO] [config.py:1013:print]   amp_params ................... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   autotuning_config ............ {
(RayTrainWorker pid=31281)     "enabled": false, 
(RayTrainWorker pid=31281)     "start_step": null, 
(RayTrainWorker pid=31281)     "end_step": null, 
(RayTrainWorker pid=31281)     "metric_path": null, 
(RayTrainWorker pid=31281)     "arg_mappings": null, 
(RayTrainWorker pid=31281)     "metric": "throughput", 
(RayTrainWorker pid=31281)     "model_info": null, 
(RayTrainWorker pid=31281)     "results_dir": "autotuning_results", 
(RayTrainWorker pid=31281)     "exps_dir": "autotuning_exps", 
(RayTrainWorker pid=31281)     "overwrite": true, 
(RayTrainWorker pid=31281)     "fast": true, 
(RayTrainWorker pid=31281)     "start_profile_step": 3, 
(RayTrainWorker pid=31281)     "end_profile_step": 5, 
(RayTrainWorker pid=31281)     "tuner_type": "gridsearch", 
(RayTrainWorker pid=31281)     "tuner_early_stopping": 5, 
(RayTrainWorker pid=31281)     "tuner_num_trials": 50, 
(RayTrainWorker pid=31281)     "model_info_path": null, 
(RayTrainWorker pid=31281)     "mp_size": 1, 
(RayTrainWorker pid=31281)     "max_train_batch_size": null, 
(RayTrainWorker pid=31281)     "min_train_batch_size": 1, 
(RayTrainWorker pid=31281)     "max_train_micro_batch_size_per_gpu": 1.024000e+03, 
(RayTrainWorker pid=31281)     "min_train_micro_batch_size_per_gpu": 1, 
(RayTrainWorker pid=31281)     "num_tuning_micro_batch_sizes": 3
(RayTrainWorker pid=31281) }
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   bfloat16_enabled ............. False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   checkpoint_parallel_write_pipeline  False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   checkpoint_tag_validation_enabled  True
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   checkpoint_tag_validation_fail  False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   comms_config ................. <deepspeed.comm.config.DeepSpeedCommsConfig object at 0x7f1102c55910>
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   communication_data_type ...... None
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   compression_config ........... {'weight_quantization': {'shared_parameters': {'enabled': False, 'quantizer_kernel': False, 'schedule_offset': 0, 'quantize_groups': 1, 'quantize_verbose': False, 'quantization_type': 'symmetric', 'quantize_weight_in_forward': False, 'rounding': 'nearest', 'fp16_mixed_quantize': False, 'quantize_change_ratio': 0.001}, 'different_groups': {}}, 'activation_quantization': {'shared_parameters': {'enabled': False, 'quantization_type': 'symmetric', 'range_calibration': 'dynamic', 'schedule_offset': 1000}, 'different_groups': {}}, 'sparse_pruning': {'shared_parameters': {'enabled': False, 'method': 'l1', 'schedule_offset': 1000}, 'different_groups': {}}, 'row_pruning': {'shared_parameters': {'enabled': False, 'method': 'l1', 'schedule_offset': 1000}, 'different_groups': {}}, 'head_pruning': {'shared_parameters': {'enabled': False, 'method': 'topk', 'schedule_offset': 1000}, 'different_groups': {}}, 'channel_pruning': {'shared_parameters': {'enabled': False, 'method': 'l1', 'schedule_offset': 1000}, 'different_groups': {}}, 'layer_reduction': {'enabled': False}}
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   curriculum_enabled_legacy .... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   curriculum_params_legacy ..... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   data_efficiency_config ....... {'enabled': False, 'seed': 1234, 'data_sampling': {'enabled': False, 'num_epochs': 1000, 'num_workers': 0, 'curriculum_learning': {'enabled': False}}, 'data_routing': {'enabled': False, 'random_ltd': {'enabled': False, 'layer_token_lr_schedule': {'enabled': False}}}}
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   data_efficiency_enabled ...... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   dataloader_drop_last ......... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   disable_allgather ............ False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   dump_state ................... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   dynamic_loss_scale_args ...... {'init_scale': 256, 'scale_window': 1000, 'delayed_shift': 2, 'min_scale': 1}
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_enabled ........... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_gas_boundary_resolution  1
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_layer_name ........ bert.encoder.layer
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_layer_num ......... 0
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_max_iter .......... 100
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_stability ......... 1e-06
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_tol ............... 0.01
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   eigenvalue_verbose ........... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   elasticity_enabled ........... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   flops_profiler_config ........ {
(RayTrainWorker pid=31281)     "enabled": false, 
(RayTrainWorker pid=31281)     "profile_step": 1, 
(RayTrainWorker pid=31281)     "module_depth": -1, 
(RayTrainWorker pid=31281)     "top_modules": 1, 
(RayTrainWorker pid=31281)     "detailed": true, 
(RayTrainWorker pid=31281)     "output_file": null
(RayTrainWorker pid=31281) }
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   fp16_auto_cast ............... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   fp16_enabled ................. True
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   fp16_master_weights_and_gradients  False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   global_rank .................. 0
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   grad_accum_dtype ............. None
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,027] [INFO] [config.py:1013:print]   gradient_accumulation_steps .. 1
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   gradient_clipping ............ 1.0
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   gradient_predivide_factor .... 1.0
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   initial_dynamic_scale ........ 256
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   load_universal_checkpoint .... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   loss_scale ................... 0
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   memory_breakdown ............. False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   monitor_config ............... tensorboard=TensorBoardConfig(enabled=False, output_path='', job_name='DeepSpeedJobName') wandb=WandbConfig(enabled=False, group=None, team=None, project='deepspeed') csv_monitor=CSVConfig(enabled=False, output_path='', job_name='DeepSpeedJobName') enabled=False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   nebula_config ................ {
(RayTrainWorker pid=31281)     "enabled": false, 
(RayTrainWorker pid=31281)     "persistent_storage_path": null, 
(RayTrainWorker pid=31281)     "persistent_time_interval": 100, 
(RayTrainWorker pid=31281)     "num_of_version_in_retention": 2, 
(RayTrainWorker pid=31281)     "enable_nebula_load": true, 
(RayTrainWorker pid=31281)     "load_path": null
(RayTrainWorker pid=31281) }
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   optimizer_legacy_fusion ...... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   optimizer_name ............... adamw
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   optimizer_params ............. {'lr': 2e-05, 'betas': [0.9, 0.999], 'eps': 1e-08}
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   pipeline ..................... {'stages': 'auto', 'partition': 'best', 'seed_layers': False, 'activation_checkpoint_interval': 0}
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   pld_enabled .................. False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   pld_params ................... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   prescale_gradients ........... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   scheduler_name ............... None
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   scheduler_params ............. None
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   sparse_attention ............. None
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   sparse_gradients_enabled ..... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   steps_per_print .............. 10
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   train_batch_size ............. 256
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   train_micro_batch_size_per_gpu  16
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   use_node_local_storage ....... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   wall_clock_breakdown ......... False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   world_size ................... 16
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   zero_allow_untested_optimizer  False
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   zero_config .................. stage=3 contiguous_gradients=True reduce_scatter=True reduce_bucket_size=16777216 allgather_partitions=True allgather_bucket_size=500,000,000 overlap_comm=True load_from_fp32_weights=True elastic_checkpoint=False offload_param=DeepSpeedZeroOffloadParamConfig(device='cpu', nvme_path=None, buffer_count=5, buffer_size=100,000,000, max_in_cpu=1,000,000,000, pin_memory=True) offload_optimizer=DeepSpeedZeroOffloadOptimizerConfig(device='cpu', nvme_path=None, buffer_count=4, pin_memory=True, pipeline=False, pipeline_read=False, pipeline_write=False, fast_init=False) sub_group_size=1,000,000,000 cpu_offload_param=None cpu_offload_use_pin_memory=None cpu_offload=None prefetch_bucket_size=15099494 param_persistence_threshold=40960 model_persistence_threshold=sys.maxsize max_live_parameters=1,000,000,000 max_reuse_distance=1,000,000,000 gather_16bit_weights_on_model_save=True stage3_gather_fp16_weights_on_model_save=False ignore_unused_parameters=True legacy_stage1=False round_robin_gradients=True
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   zero_enabled ................. True
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,028] [INFO] [config.py:1013:print]   zero_optimization_stage ...... 3
(RayTrainWorker pid=31281) [2023-03-06 16:38:25,029] [INFO] [config.py:998:print_user_config]   json = {
(RayTrainWorker pid=31281)     "fp16": {
(RayTrainWorker pid=31281)         "enabled": true, 
(RayTrainWorker pid=31281)         "initial_scale_power": 8
(RayTrainWorker pid=31281)     }, 
(RayTrainWorker pid=31281)     "bf16": {
(RayTrainWorker pid=31281)         "enabled": false
(RayTrainWorker pid=31281)     }, 
(RayTrainWorker pid=31281)     "optimizer": {
(RayTrainWorker pid=31281)         "type": "AdamW", 
(RayTrainWorker pid=31281)         "params": {
(RayTrainWorker pid=31281)             "lr": 2e-05, 
(RayTrainWorker pid=31281)             "betas": [0.9, 0.999], 
(RayTrainWorker pid=31281)             "eps": 1e-08
(RayTrainWorker pid=31281)         }
(RayTrainWorker pid=31281)     }, 
(RayTrainWorker pid=31281)     "zero_optimization": {
(RayTrainWorker pid=31281)         "stage": 3, 
(RayTrainWorker pid=31281)         "offload_optimizer": {
(RayTrainWorker pid=31281)             "device": "cpu", 
(RayTrainWorker pid=31281)             "pin_memory": true
(RayTrainWorker pid=31281)         }, 
(RayTrainWorker pid=31281)         "offload_param": {
(RayTrainWorker pid=31281)             "device": "cpu", 
(RayTrainWorker pid=31281)             "pin_memory": true
(RayTrainWorker pid=31281)         }, 
(RayTrainWorker pid=31281)         "overlap_comm": true, 
(RayTrainWorker pid=31281)         "contiguous_gradients": true, 
(RayTrainWorker pid=31281)         "reduce_bucket_size": 1.677722e+07, 
(RayTrainWorker pid=31281)         "stage3_prefetch_bucket_size": 1.509949e+07, 
(RayTrainWorker pid=31281)         "stage3_param_persistence_threshold": 4.096000e+04, 
(RayTrainWorker pid=31281)         "gather_16bit_weights_on_model_save": true, 
(RayTrainWorker pid=31281)         "round_robin_gradients": true
(RayTrainWorker pid=31281)     }, 
(RayTrainWorker pid=31281)     "gradient_accumulation_steps": 1, 
(RayTrainWorker pid=31281)     "gradient_clipping": 1.0, 
(RayTrainWorker pid=31281)     "steps_per_print": 10, 
(RayTrainWorker pid=31281)     "train_batch_size": 256, 
(RayTrainWorker pid=31281)     "train_micro_batch_size_per_gpu": 16, 
(RayTrainWorker pid=31281)     "wall_clock_breakdown": false
(RayTrainWorker pid=31281) }
(RayTrainWorker pid=31281) Model weights saved in output/checkpoint-85/pytorch_model.bin
(RayTrainWorker pid=31281) tokenizer config file saved in output/checkpoint-85/tokenizer_config.json
(RayTrainWorker pid=31281) Special tokens file saved in output/checkpoint-85/special_tokens_map.json
(RayTrainWorker pid=31281) [2023-03-06 17:18:13,320] [INFO] [engine.py:3516:save_16bit_model] Saving model weights to output/checkpoint-85/pytorch_model.bin
(RayTrainWorker pid=31281) [2023-03-06 17:18:13,320] [INFO] [torch_checkpoint_engine.py:15:save] [Torch] Saving output/checkpoint-85/pytorch_model.bin...
(RayTrainWorker pid=31281) [2023-03-06 17:18:29,075] [INFO] [torch_checkpoint_engine.py:17:save] [Torch] Saved output/checkpoint-85/pytorch_model.bin.
(RayTrainWorker pid=31281) [2023-03-06 17:18:29,087] [INFO] [logging.py:75:log_dist] [Rank 0] [Torch] Checkpoint global_step85 is begin to save!
(RayTrainWorker pid=31281) [2023-03-06 17:18:29,109] [INFO] [logging.py:75:log_dist] [Rank 0] Saving model checkpoint: output/checkpoint-85/global_step85/zero_pp_rank_0_mp_rank_00_model_states.pt
(RayTrainWorker pid=31281) [2023-03-06 17:18:29,109] [INFO] [torch_checkpoint_engine.py:15:save] [Torch] Saving output/checkpoint-85/global_step85/zero_pp_rank_0_mp_rank_00_model_states.pt...
(RayTrainWorker pid=31281) [2023-03-06 17:18:37,982] [INFO] [torch_checkpoint_engine.py:17:save] [Torch] Saved output/checkpoint-85/global_step85/zero_pp_rank_0_mp_rank_00_optim_states.pt.
(RayTrainWorker pid=31281) [2023-03-06 17:18:37,984] [INFO] [engine.py:3407:_save_zero_checkpoint] zero checkpoint saved output/checkpoint-85/global_step85/zero_pp_rank_0_mp_rank_00_optim_states.pt
(RayTrainWorker pid=31281) 
(RayTrainWorker pid=31281) 
(RayTrainWorker pid=31281) Training completed. Do not forget to share your model on huggingface.co/models =)
(RayTrainWorker pid=31281) 
(RayTrainWorker pid=31281) 
(RayTrainWorker pid=31281) [2023-03-06 17:18:38,143] [INFO] [torch_checkpoint_engine.py:27:commit] [Torch] Checkpoint global_step85 is ready now!
(RayTrainWorker pid=31281) {'train_runtime': 2413.1243, 'train_samples_per_second': 0.559, 'train_steps_per_second': 0.035, 'train_loss': 0.32492108064539293, 'epoch': 1.0}
2023-03-06 17:18:41,018	INFO tune.py:825 -- Total run time: 2591.59 seconds (2591.46 seconds for the tuning loop).

You can use the returned Result object to access metrics and the Ray AIR Checkpoint associated with the last iteration.

checkpoint = results.checkpoint
checkpoint
TransformersCheckpoint(local_path=/home/ray/ray_results/TransformersTrainer_2023-03-06_16-35-29/TransformersTrainer_f623d_00000_0_2023-03-06_16-35-30/checkpoint_000000)

Generate text from prompt#

We can use the TransformersPredictor to generate predictions from our fine-tuned model.

Tip

For large scale batch inference, consider configuring cloud checkpointing and then pass the cloud-backed Checkpoint to BatchPredictor. More information here.

Because the TransformersPredictor uses a 🤗 Transformers pipeline under the hood, we disable the tokenizer AIR Preprocessor we have used for training and let the pipeline to tokenize the data itself.

checkpoint.set_preprocessor(None)

We also set device_map="auto" so that the model is automatically placed on the right device and set the task to "text-generation". The predict method passes the arguments to a 🤗 Transformers pipeline call.

from ray.train.huggingface import TransformersPredictor
import pandas as pd

prompts = pd.DataFrame(["Romeo and Juliet", "Romeo", "Juliet"], columns=["text"])

# Predict on the head node.
predictor = TransformersPredictor.from_checkpoint(
    checkpoint=checkpoint,
    task="text-generation",
    torch_dtype=torch.float16 if use_gpu else None,
    device_map="auto",
    use_gpu=use_gpu,
)
prediction = predictor.predict(
    prompts,
    do_sample=True,
    temperature=0.9,
    min_length=32,
    max_length=128,
)
prediction
generated_text
0 Romeo and Juliet, they are married: and it is ...
1 Romeo, thou art Romeo and a Montague; for only...
2 Juliet's name; but I do not sound an ear to na...