Get Started with Distributed Training using XGBoost and LightGBM#
Ray Train has built-in support for XGBoost and LightGBM.
Quickstart#
import ray
from ray.train import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration. Set to True to schedule GPU workers.
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
# XGBoost specific params (see the `xgboost.train` API reference)
"objective": "binary:logistic",
# uncomment this and set `use_gpu=True` to use GPU for training
# "tree_method": "gpu_hist",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
import ray
from ray.train import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer
# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
trainer = LightGBMTrainer(
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration. Set to True to schedule GPU workers.
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
# LightGBM specific params
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
Basic training with tree-based models in Train#
Just as in the original xgboost.train() and
lightgbm.train() functions, the
training parameters are passed as the params
dictionary.
import ray
from ray.train import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration. Set to True to schedule GPU workers.
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
# XGBoost specific params (see the `xgboost.train` API reference)
"objective": "binary:logistic",
# uncomment this and set `use_gpu=True` to use GPU for training
# "tree_method": "gpu_hist",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
import ray
from ray.train import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer
# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
trainer = LightGBMTrainer(
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration. Set to True to schedule GPU workers.
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
# LightGBM specific params
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
Trainer constructors pass Ray-specific parameters.
Save and load XGBoost and LightGBM checkpoints#
When you train a new tree on every boosting round,
you can save a checkpoint to snapshot the training progress so far.
XGBoostTrainer
and LightGBMTrainer
both implement checkpointing out of the box. These checkpoints can be loaded into memory
using static methods XGBoostTrainer.get_model
and
LightGBMTrainer.get_model
.
The only required change is to configure CheckpointConfig
to set
the checkpointing frequency. For example, the following configuration
saves a checkpoint on every boosting round and only keeps the latest checkpoint:
from ray.train import RunConfig, CheckpointConfig
run_config = RunConfig(
checkpoint_config=CheckpointConfig(
# Checkpoint every iteration.
checkpoint_frequency=1,
# Only keep the latest checkpoint and delete the others.
num_to_keep=1,
)
)
# from ray.train.xgboost import XGBoostTrainer
# trainer = XGBoostTrainer(..., run_config=run_config)
Tip
Once you enable checkpointing, you can follow this guide to enable fault tolerance.
How to scale out training?#
The benefit of using Ray Train is that you can seamlessly scale up your training by
adjusting the ScalingConfig
.
Note
Ray Train doesn’t modify or otherwise alter the working of the underlying XGBoost or LightGBM distributed training algorithms. Ray only provides orchestration, data ingest and fault tolerance. For more information on GBDT distributed training, refer to XGBoost documentation and LightGBM documentation.
Following are some examples of common use-cases:
Setup: 4 nodes with 8 CPUs each.
Use-case: To utilize all resources in multi-node training.
scaling_config = ScalingConfig(
num_workers=4,
resources_per_worker={"CPU": 8},
)
Setup: 1 node with 8 CPUs and 4 GPUs.
Use-case: If you have a single node with multiple GPUs, you need to use distributed training to leverage all GPUs.
scaling_config = ScalingConfig(
num_workers=4,
use_gpu=True,
)
Setup: 4 node with 8 CPUs and 4 GPUs each.
Use-case: If you have a multiple nodes with multiple GPUs, you need to schedule one worker per GPU.
scaling_config = ScalingConfig(
num_workers=16,
use_gpu=True,
)
Note that you just have to adjust the number of workers. Ray handles everything else automatically.
Warning
Specifying a shared storage location (such as cloud storage or NFS) is optional for single-node clusters, but it is required for multi-node clusters. Using a local path will raise an error during checkpointing for multi-node clusters.
trainer = XGBoostTrainer(
..., run_config=ray.train.RunConfig(storage_path="s3://...")
)
How many remote actors should you use?#
This depends on your workload and your cluster setup. Generally there is no inherent benefit of running more than one remote actor per node for CPU-only training. This is because XGBoost can already leverage multiple CPUs with threading.
However, in some cases, you should consider some starting more than one actor per node:
For multi GPU training, each GPU should have a separate remote actor. Thus, if your machine has 24 CPUs and 4 GPUs, you want to start 4 remote actors with 6 CPUs and 1 GPU each
In a heterogeneous cluster , you might want to find the greatest common divisor for the number of CPUs. For example, for a cluster with three nodes of 4, 8, and 12 CPUs, respectively, you should set the number of actors to 6 and the CPUs per actor to 4.
How to use GPUs for training?#
Ray Train enables multi-GPU training for XGBoost and LightGBM. The core backends
automatically leverage NCCL2 for cross-device communication.
All you have to do is to start one actor per GPU and set GPU-compatible parameters.
For example, XGBoost’s tree_method
to gpu_hist
. See XGBoost
documentation for more details.
For instance, if you have 2 machines with 4 GPUs each, you want
to start 8 workers, and set use_gpu=True
. There is usually
no benefit in allocating less (for example, 0.5) or more than one GPU per actor.
You should divide the CPUs evenly across actors per machine, so if your machines have 16 CPUs in addition to the 4 GPUs, each actor should have 4 CPUs to use.
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration.
use_gpu=True,
),
params={
# XGBoost specific params
"tree_method": "gpu_hist",
"eval_metric": ["logloss", "error"],
},
label_column="target",
num_boost_round=20,
datasets={"train": train_dataset, "valid": valid_dataset},
)
How to preprocess data for training?#
Particularly for tabular data, Ray Data comes with out-of-the-box preprocessors that implement common feature preprocessing operations. You can use this with Ray Train Trainers by applying them on the dataset before passing the dataset into a Trainer. For example:
import ray
from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer
from ray.train import ScalingConfig
train_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(0, 32, 3)])
valid_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(1, 32, 3)])
preprocessor = MinMaxScaler(["x"])
preprocessor.fit(train_dataset)
train_dataset = preprocessor.transform(train_dataset)
valid_dataset = preprocessor.transform(valid_dataset)
trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": train_dataset, "valid": valid_dataset},
)
result = trainer.fit()
How to optimize XGBoost memory usage?#
XGBoost uses a compute-optimized datastructure, the DMatrix
,
to hold training data. When converting a dataset to a DMatrix
,
XGBoost creates intermediate copies and ends up
holding a complete copy of the full data. XGBoost converts the data
into the local data format. On a 64-bit system the format is 64-bit floats.
Depending on the system and original dataset dtype, this matrix can
thus occupy more memory than the original dataset.
The peak memory usage for CPU-based training is at least
3x the dataset size, assuming dtype float32
on a 64-bit system,
plus about 400,000 KiB for other resources,
like operating system requirements and storing of intermediate
results.
Example
Machine type: AWS m5.xlarge (4 vCPUs, 16 GiB RAM)
Usable RAM: ~15,350,000 KiB
Dataset: 1,250,000 rows with 1024 features, dtype float32. Total size: 5,000,000 KiB
XGBoost DMatrix size: ~10,000,000 KiB
This dataset fits exactly on this node for training.
Note that the DMatrix size might be lower on a 32 bit system.
GPUs
Generally, the same memory requirements exist for GPU-based training. Additionally, the GPU must have enough memory to hold the dataset.
In the preceding example, the GPU must have at least
10,000,000 KiB (about 9.6 GiB) memory. However,
empirical data shows that using a DeviceQuantileDMatrix
seems to result in more peak GPU memory usage, possibly
for intermediate storage when loading data (about 10%).
Best practices
In order to reduce peak memory usage, consider the following suggestions:
Store data as
float32
or less. You often don’t need more precision is often, and keeping data in a smaller format helps reduce peak memory usage for initial data loading.Pass the
dtype
when loading data from CSV. Otherwise, floating point values are loaded asnp.float64
per default, increasing peak memory usage by 33%.