Get Started with Distributed Training using XGBoost#

This tutorial walks through the process of converting an existing XGBoost script to use Ray Train.

Learn how to:

  1. Configure a training function to report metrics and save checkpoints.

  2. Configure scaling and CPU or GPU resource requirements for a training job.

  3. Launch a distributed training job with a XGBoostTrainer.

Quickstart#

For reference, the final code will look something like this:

import ray.train
from ray.train.xgboost import XGBoostTrainer

def train_func():
    # Your XGBoost training code here.
    ...

scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 4})
trainer = XGBoostTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
  1. train_func is the Python code that executes on each distributed training worker.

  2. ScalingConfig defines the number of distributed training workers and whether to use GPUs.

  3. XGBoostTrainer launches the distributed training job.

Compare a XGBoost training script with and without Ray Train.

import xgboost

import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback

# 1. Load your data as a Ray Data Dataset.
train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/train")
eval_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris/val")


def train_func():
    # 2. Load your data shard as an `xgboost.DMatrix`.

    # Get dataset shards for this worker
    train_shard = ray.train.get_dataset_shard("train")
    eval_shard = ray.train.get_dataset_shard("eval")

    # Convert shards to pandas DataFrames
    train_df = train_shard.materialize().to_pandas()
    eval_df = eval_shard.materialize().to_pandas()

    train_X = train_df.drop("target", axis=1)
    train_y = train_df["target"]
    eval_X = eval_df.drop("target", axis=1)
    eval_y = eval_df["target"]

    dtrain = xgboost.DMatrix(train_X, label=train_y)
    deval = xgboost.DMatrix(eval_X, label=eval_y)

    # 3. Define your xgboost model training parameters.
    params = {
        "tree_method": "approx",
        "objective": "reg:squarederror",
        "eta": 1e-4,
        "subsample": 0.5,
        "max_depth": 2,
    }

    # 4. Do distributed data-parallel training.
    # Ray Train sets up the necessary coordinator processes and
    # environment variables for your workers to communicate with each other.
    bst = xgboost.train(
        params,
        dtrain=dtrain,
        evals=[(deval, "validation")],
        num_boost_round=10,
        # Optional: Use the `RayTrainReportCallback` to save and report checkpoints.
        callbacks=[RayTrainReportCallback()],
    )


# 5. Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, resources_per_worker={"CPU": 2})

# 6. Launch distributed training job.
trainer = XGBoostTrainer(
    train_func,
    scaling_config=scaling_config,
    datasets={"train": train_dataset, "eval": eval_dataset},
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()

# 7. Load the trained model
import os

with result.checkpoint.as_directory() as checkpoint_dir:
    model_path = os.path.join(checkpoint_dir, RayTrainReportCallback.CHECKPOINT_NAME)
    model = xgboost.Booster()
    model.load_model(model_path)
import pandas as pd
import xgboost

# 1. Load your data as an `xgboost.DMatrix`.
train_df = pd.read_csv("s3://ray-example-data/iris/train/1.csv")
eval_df = pd.read_csv("s3://ray-example-data/iris/val/1.csv")

train_X = train_df.drop("target", axis=1)
train_y = train_df["target"]
eval_X = eval_df.drop("target", axis=1)
eval_y = eval_df["target"]

dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(eval_X, label=eval_y)

# 2. Define your xgboost model training parameters.
params = {
    "tree_method": "approx",
    "objective": "reg:squarederror",
    "eta": 1e-4,
    "subsample": 0.5,
    "max_depth": 2,
}

# 3. Do non-distributed training.
bst = xgboost.train(
    params,
    dtrain=dtrain,
    evals=[(deval, "validation")],
    num_boost_round=10,
)

Set up a training function#

First, update your training code to support distributed training. Begin by wrapping your native or scikit-learn estimator XGBoost training code in a training function:

def train_func():
    # Your native XGBoost training code here.
    dmatrix = ...
    xgboost.train(...)

Each distributed training worker executes this function.

You can also specify the input argument for train_func as a dictionary via the Trainer’s train_loop_config. For example:

def train_func(config):
    label_column = config["label_column"]
    num_boost_round = config["num_boost_round"]
    ...

config = {"label_column": "y", "num_boost_round": 10}
trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)

Warning

Avoid passing large data objects through train_loop_config to reduce the serialization and deserialization overhead. Instead, initialize large objects (e.g. datasets, models) directly in train_func.

 def load_dataset():
     # Return a large in-memory dataset
     ...

 def load_model():
     # Return a large in-memory model instance
     ...

-config = {"data": load_dataset(), "model": load_model()}

 def train_func(config):
-    data = config["data"]
-    model = config["model"]

+    data = load_dataset()
+    model = load_model()
     ...

 trainer = ray.train.xgboost.XGBoostTrainer(train_func, train_loop_config=config, ...)

Ray Train automatically performs the worker communication setup that is needed to do distributed xgboost training.

Report metrics and save checkpoints#

To persist your checkpoints and monitor training progress, add a ray.train.xgboost.RayTrainReportCallback utility callback to your Trainer:

import xgboost
from ray.train.xgboost import RayTrainReportCallback

def train_func():
    ...
    bst = xgboost.train(
        ...,
        callbacks=[
            RayTrainReportCallback(
                metrics=["eval-logloss"], frequency=1
            )
        ],
    )
    ...

Reporting metrics and checkpoints to Ray Train enables fault-tolerant training and the integration with Ray Tune.

Loading data#

When running distributed XGBoost training, each worker should use a different shard of the dataset.

def get_train_dataset(world_rank: int) -> xgboost.DMatrix:
    # Define logic to get the DMatrix shard for this worker rank
    ...

def get_eval_dataset(world_rank: int) -> xgboost.DMatrix:
    # Define logic to get the DMatrix for each worker
    ...

def train_func():
    rank = ray.train.get_world_rank()
    dtrain = get_train_dataset(rank)
    deval = get_eval_dataset(rank)
    ...

A common way to do this is to pre-shard the dataset and then assign each worker a different set of files to read.

Pre-sharding the dataset is not very flexible to changes in the number of workers, since some workers may be assigned more data than others. For more flexibility, Ray Data provides a solution for sharding the dataset at runtime.

Use Ray Data to shard the dataset#

Ray Data is a distributed data processing library that allows you to easily shard and distribute your data across multiple workers.

First, load your entire dataset as a Ray Data Dataset. Reference the Ray Data Quickstart for more details on how to load and preprocess data from different sources.

train_dataset = ray.data.read_parquet("s3://path/to/entire/train/dataset/dir")
eval_dataset = ray.data.read_parquet("s3://path/to/entire/eval/dataset/dir")

In the training function, you can access the dataset shards for this worker using ray.train.get_dataset_shard(). Convert this into a native xgboost.DMatrix.

def get_dmatrix(dataset_name: str) -> xgboost.DMatrix:
    shard = ray.train.get_dataset_shard(dataset_name)
    df = shard.materialize().to_pandas()
    X, y = df.drop("target", axis=1), df["target"]
    return xgboost.DMatrix(X, label=y)

def train_func():
    dtrain = get_dmatrix("train")
    deval = get_dmatrix("eval")
    ...

Finally, pass the dataset to the Trainer. This will automatically shard the dataset across the workers. These keys must match the keys used when calling get_dataset_shard in the training function.

trainer = XGBoostTrainer(..., datasets={"train": train_dataset, "eval": eval_dataset})
trainer.fit()

For more details, see Data Loading and Preprocessing.

Configure scale and GPUs#

Outside of your training function, create a ScalingConfig object to configure:

  1. num_workers - The number of distributed training worker processes.

  2. use_gpu - Whether each worker should use a GPU (or CPU).

  3. resources_per_worker - The number of CPUs or GPUs per worker.

from ray.train import ScalingConfig

# 4 nodes with 8 CPUs each.
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 8})

Note

When using Ray Data with Ray Train, be careful not to request all available CPUs in your cluster with the resources_per_worker parameter. Ray Data needs CPU resources to execute data preprocessing operations in parallel. If all CPUs are allocated to training workers, Ray Data operations may be bottlenecked, leading to reduced performance. A good practice is to leave some portion of CPU resources available for Ray Data operations.

For example, if your cluster has 8 CPUs per node, you might allocate 6 CPUs to training workers and leave 2 CPUs for Ray Data:

# Allocate 6 CPUs per worker, leaving resources for Ray Data operations
scaling_config = ScalingConfig(num_workers=4, resources_per_worker={"CPU": 6})

In order to use GPUs, you will need to set the use_gpu parameter to True in your ScalingConfig object. This will request and assign a single GPU per worker.

# 4 nodes with 8 CPUs and 4 GPUs each.
scaling_config = ScalingConfig(num_workers=16, use_gpu=True)

When using GPUs, you will also need to update your training function to use the assigned GPU. This can be done by setting the "device" parameter as "cuda". For more details on XGBoost’s GPU support, see the XGBoost GPU documentation.

  def train_func():
      ...

      params = {
          ...,
+         "device": "cuda",
      }

      bst = xgboost.train(
          params,
          ...
      )

Configure persistent storage#

Create a RunConfig object to specify the path where results (including checkpoints and artifacts) will be saved.

from ray.train import RunConfig

# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")

# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")

# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")

Warning

Specifying a shared storage location (such as cloud storage or NFS) is optional for single-node clusters, but it is required for multi-node clusters. Using a local path will raise an error during checkpointing for multi-node clusters.

For more details, see Configuring Persistent Storage.

Launch a training job#

Tying this all together, you can now launch a distributed training job with a XGBoostTrainer.

from ray.train.xgboost import XGBoostTrainer

trainer = XGBoostTrainer(
    train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()

Access training results#

After training completes, a Result object is returned which contains information about the training run, including the metrics and checkpoints reported during training.

result.metrics     # The metrics reported during training.
result.checkpoint  # The latest checkpoint reported during training.
result.path        # The path where logs are stored.
result.error       # The exception that was raised, if training failed.

For more usage examples, see Inspecting Training Results.

Next steps#

After you have converted your XGBoost training script to use Ray Train:

  • See User Guides to learn more about how to perform specific tasks.

  • Browse the Examples for end-to-end examples of how to use Ray Train.

  • Consult the API Reference for more details on the classes and methods from this tutorial.