Distributed Training and Inference with XGBoost and LightGBM on Ray#
In this tutorial, you’ll discover how to scale out data preprocessing, training, and inference with XGBoost and LightGBM on Ray.
To run this tutorial, we need to install the following dependencies:
pip install -qU "ray[data,train]" xgboost lightgbm
Then, we need some imports:
from typing import Tuple
import pandas as pd
import xgboost
import ray
from ray.data import Dataset, Preprocessor
from ray.data.preprocessors import StandardScaler
from ray.train import Checkpoint, CheckpointConfig, Result, RunConfig, ScalingConfig
from ray.train.xgboost import XGBoostTrainer
Next we define a function to load our train, validation, and test datasets.
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
"""Load and split the dataset into train, validation, and test sets."""
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
test_dataset = valid_dataset.drop_columns(["target"])
return train_dataset, valid_dataset, test_dataset
How to preprocess data for training?#
Preprocessing is a crucial step in preparing your data for training, especially for tabular datasets. Ray Data offers built-in preprocessors that simplify common feature preprocessing tasks especially for tabular data. These can be seamlessly integrated with Ray Datasets, allowing you to preprocess your data in a fault-tolerant and distributed way before training. Here’s how:
# Load and split the dataset
train_dataset, valid_dataset, _test_dataset = prepare_data()
# pick some dataset columns to scale
columns_to_scale = ["mean radius", "mean texture"]
# Initialize the preprocessor
preprocessor = StandardScaler(columns=columns_to_scale)
# train the preprocessor on the training set
preprocessor.fit(train_dataset)
# apply the preprocessor to the training and validation sets
train_dataset = preprocessor.transform(train_dataset)
valid_dataset = preprocessor.transform(valid_dataset)
Show code cell output
2025-02-07 16:30:44,905 INFO worker.py:1841 -- Started a local Ray instance.
2025-02-07 16:30:45,596 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:45,596 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AggregateNumRows[AggregateNumRows]
2025-02-07 16:30:46,367 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:46,367 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
2025-02-07 16:30:46,729 INFO dataset.py:2704 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2025-02-07 16:30:46,730 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:46,730 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=1]
Save and load XGBoost and LightGBM checkpoints#
Checkpointing is a powerful feature.
It is particularly useful for long-running training sessions, as it enables you to resume training from the last checkpoint in case of interruptions.
XGBoostTrainer
and
LightGBMTrainer
both implement checkpointing out of the box. These checkpoints can be loaded into memory
using static methods XGBoostTrainer.get_model
and LightGBMTrainer.get_model
.
The only required change is to configure CheckpointConfig
to set the checkpointing frequency. For example, the following configuration
saves a checkpoint on every boosting round and only keeps the latest checkpoint.
# Configure checkpointing to save progress during training
run_config = RunConfig(
checkpoint_config=CheckpointConfig(
# Checkpoint every 10 iterations.
checkpoint_frequency=10,
# Only keep the latest checkpoint and delete the others.
num_to_keep=1,
)
## If running in a multi-node cluster, this is where you
## should configure the run's persistent storage that is accessible
## across all worker nodes with `storage_path="s3://..."`
)
Basic training with tree-based models in Train#
Just as in the original xgboost.train()
and lightgbm.train()
functions, the
training parameters are passed as the params
dictionary.
XGBoost Example#
# Set up the XGBoost trainer with the specified configuration
trainer = XGBoostTrainer(
# see "How to scale out training?" for more details
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration. Set to True to schedule GPU workers.
use_gpu=False,
),
label_column="target",
num_boost_round=20,
# XGBoost specific params (see the `xgboost.train` API reference)
params={
"objective": "binary:logistic",
# uncomment this and set `use_gpu=True` to use GPU for training
# "tree_method": "gpu_hist",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# store the preprocessor in the checkpoint for inference later
metadata={"preprocessor_pkl": preprocessor.serialize()},
run_config=run_config,
)
result = trainer.fit()
Show code cell output
2025-02-07 16:32:31,783 INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949
== Status ==
Current time: 2025-02-07 16:32:31 (running for 00:00:00.11)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/12 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-02-07_16-30-44_167214_9631/artifacts/2025-02-07_16-32-31/XGBoostTrainer_2025-02-07_16-32-31/driver_artifacts
Number of trials: 1/1 (1 PENDING)
2025-02-07 16:32:34,045 WARNING experiment_state.py:206 -- Experiment state snapshotting has been triggered multiple times in the last 5.0 seconds and may become a bottleneck. A snapshot is forced if `CheckpointConfig(num_to_keep)` is set, and a trial has checkpointed >= `num_to_keep` times since the last snapshot.
You may want to consider increasing the `CheckpointConfig(num_to_keep)` or decreasing the frequency of saving checkpoints.
You can suppress this warning by setting the environment variable TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S to a smaller value than the current threshold (5.0). Set it to 0 to completely suppress this warning.
2025-02-07 16:32:34,105 WARNING experiment_state.py:206 -- Experiment state snapshotting has been triggered multiple times in the last 5.0 seconds and may become a bottleneck. A snapshot is forced if `CheckpointConfig(num_to_keep)` is set, and a trial has checkpointed >= `num_to_keep` times since the last snapshot.
You may want to consider increasing the `CheckpointConfig(num_to_keep)` or decreasing the frequency of saving checkpoints.
You can suppress this warning by setting the environment variable TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S to a smaller value than the current threshold (5.0). Set it to 0 to completely suppress this warning.
2025-02-07 16:32:35,137 INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/Users/rdecal/ray_results/XGBoostTrainer_2025-02-07_16-32-31' in 0.0110s.
2025-02-07 16:32:35,140 INFO tune.py:1041 -- Total run time: 3.36 seconds (3.34 seconds for the tuning loop).
== Status ==
Current time: 2025-02-07 16:32:35 (running for 00:00:03.35)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/12 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-02-07_16-30-44_167214_9631/artifacts/2025-02-07_16-32-31/XGBoostTrainer_2025-02-07_16-32-31/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)
We can now view the model’s metrics:
print(result.metrics)
This should output something like:
{'train-logloss': 0.00587594546605992, 'train-error': 0.0, 'valid-logloss': 0.06215000962556052, 'valid-error': 0.02941176470588235, 'time_this_iter_s': 0.0101318359375, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '40fed_00000', 'date': '2023-07-06_18-33-25', 'timestamp': 1688693605, 'time_total_s': 4.901317834854126, 'pid': 40725, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.901317834854126, 'iterations_since_restore': 101, 'experiment_tag': '0'}
Tip
Once you enable checkpointing, you can follow this guide to enable fault tolerance. :::
LightGBM Example#
Modifying this example to use LightGBM instead of XGBoost is straightforward. You just have to change the trainer class and the model-specific parameters:
- from ray.train.xgboost import XGBoostTrainer
+ from ray.train.lightgbm import LightGBMTrainer
- trainer = XGBoostTrainer(
+ trainer = LightGBMTrainer(
- "objective": "binary:logistic",
+ "objective": "binary",
- "eval_metric": ["logloss", "error"],
+ "metric": ["binary_logloss", "binary_error"],
Running inference with a trained tree-based model#
Now that we have a trained model, we can use it to make predictions on new data. Let’s define a utility function to perform streaming and distributed batch inference with our trained model.
class Predict:
def __init__(self, checkpoint: Checkpoint):
self.model = XGBoostTrainer.get_model(checkpoint)
# extract the preprocessor from the checkpoint metadata
self.preprocessor = Preprocessor.deserialize(
checkpoint.get_metadata()["preprocessor_pkl"]
)
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
preprocessed_batch = self.preprocessor.transform_batch(batch)
dmatrix = xgboost.DMatrix(preprocessed_batch)
return {"predictions": self.model.predict(dmatrix)}
def predict_xgboost(result: Result):
_, _, test_dataset = prepare_data()
scores = test_dataset.map_batches(
Predict,
fn_constructor_args=[result.checkpoint],
concurrency=1,
batch_format="pandas",
)
predicted_labels = scores.map_batches(
lambda df: (df > 0.5).astype(int), batch_format="pandas"
)
print("PREDICTED LABELS")
predicted_labels.show()
We can now get the predictions from the model on the test set:
predict_xgboost(result)
Show code cell output
2025-02-07 16:30:52,878 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:52,878 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> AggregateNumRows[AggregateNumRows]
2025-02-07 16:30:53,241 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:53,241 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
2025-02-07 16:30:53,559 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-07_16-30-44_167214_9631/logs/ray-data
2025-02-07 16:30:53,559 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(drop_columns)->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches(<lambda>)] -> LimitOperator[limit=20]
PREDICTED LABELS
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
This should output something like:
PREDICTED LABELS
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
How to scale out training?#
One of the key advantages of using Ray Train is its ability to effortlessly scale your training workloads.
By adjusting the ScalingConfig
,
you can optimize resource utilization and reduce training time, making it ideal for large-scale machine learning tasks.
Note
Ray Train doesn’t modify or otherwise alter the working of the underlying XGBoost or LightGBM distributed training algorithms. Ray only provides orchestration, data ingest and fault tolerance. For more information on GBDT distributed training, refer to XGBoost documentation and LightGBM documentation.
Multi-node CPU Example#
Setup: 4 nodes with 8 CPUs each.
Use-case: To utilize all resources in multi-node training.
scaling_config = ScalingConfig(
num_workers=4,
resources_per_worker={"CPU": 8},
)
Single-node multi-GPU Example#
Setup: 1 node with 8 CPUs and 4 GPUs.
Use-case: If you have a single node with multiple GPUs, you need to use distributed training to leverage all GPUs.
scaling_config = ScalingConfig(
num_workers=4,
use_gpu=True,
)
Multi-node multi-GPU Example#
Setup: 4 nodes with 8 CPUs and 4 GPUs each.
Use-case: If you have multiple nodes with multiple GPUs, you need to schedule one worker per GPU.
scaling_config = ScalingConfig(
num_workers=16,
use_gpu=True,
)
Note that you just have to adjust the number of workers. Ray handles everything else automatically.
Warning
Specifying a shared storage location (such as cloud storage or NFS) is optional for single-node clusters, but it is required for multi-node clusters. Using a local path will raise an error during checkpointing for multi-node clusters.
trainer = XGBoostTrainer(
..., run_config=ray.train.RunConfig(storage_path="s3://...")
)
How many remote actors should you use?#
This depends on your workload and your cluster setup. Generally there is no inherent benefit of running more than one remote actor per node for CPU-only training. This is because XGBoost can already leverage multiple CPUs with threading.
However, in some cases, you should consider some starting more than one actor per node:
For multi GPU training, each GPU should have a separate remote actor. Thus, if your machine has 24 CPUs and 4 GPUs, you want to start 4 remote actors with 6 CPUs and 1 GPU each
In a heterogeneous cluster, you might want to find the greatest common divisor for the number of CPUs. For example, for a cluster with three nodes of 4, 8, and 12 CPUs, respectively, you should set the number of actors to 6 and the CPUs per actor to 4.
How to use GPUs for training?#
Ray Train enables multi-GPU training for XGBoost and LightGBM. The core backends automatically leverage NCCL2 for cross-device communication. All you have to do is to start one actor per GPU and set GPU-compatible parameters. For example, XGBoost’s tree_method
to gpu_hist
. See XGBoost documentation for more details.
For instance, if you have 2 machines with 4 GPUs each, you want to start 8 workers, and set use_gpu=True
. There is usually no benefit in allocating less (for example, 0.5) or more than one GPU per actor.
You should divide the CPUs evenly across actors per machine, so if your machines have 16 CPUs in addition to the 4 GPUs, each actor should have 4 CPUs to use.
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration.
use_gpu=True,
),
params={
# XGBoost specific params
"tree_method": "gpu_hist",
"eval_metric": ["logloss", "error"],
},
...
)
How to optimize XGBoost memory usage?#
XGBoost uses a compute-optimized data structure called DMatrix
to store training data.
However, converting a dataset to a DMatrix
involves storing a complete copy of the data
as well as intermediate conversions.
On a 64-bit system the format is 64-bit floats. Depending on the system and original dataset dtype,
this matrix can thus occupy more memory than the original dataset.
The peak memory usage for CPU-based training is at least 3x the dataset size, assuming dtype float32
on a 64-bit system, plus about 400,000 KiB for other resources, like operating system requirements and storing of intermediate results.
Example#
Machine type: AWS m5.xlarge (4 vCPUs, 16 GiB RAM)
Usable RAM: ~15,350,000 KiB
Dataset: 1,250,000 rows with 1024 features, dtype float32. Total size: 5,000,000 KiB
XGBoost DMatrix size: ~10,000,000 KiB
This dataset fits exactly on this node for training.
Note that the DMatrix size might be lower on a 32 bit system.
GPUs#
Generally, the same memory requirements exist for GPU-based training. Additionally, the GPU must have enough memory to hold the dataset.
In the preceding example, the GPU must have at least 10,000,000 KiB (about 9.6 GiB) memory. However, empirical data shows that using a DeviceQuantileDMatrix
seems to result in more peak GPU memory usage, possibly for intermediate storage when loading data (about 10%).
Best practices#
In order to reduce peak memory usage, consider the following suggestions:
Store data as
float32
or less. You often don’t need more precision is often, and keeping data in a smaller format helps reduce peak memory usage for initial data loading.Pass the
dtype
when loading data from CSV. Otherwise, floating point values are loaded asnp.float64
per default, increasing peak memory usage by 33%.