XGBoost on Ray

This library adds a new backend for XGBoost utilizing Ray.

Please note that this is an early version and both the API and the behavior can change without prior notice.

We’ll switch to a release-based development process once the implementation has all features for first real world use cases.

Installation

You can install XGBoost on Ray (xgboost_ray) like this:

git clone https://github.com/ray-project/xgboost_ray.git
cd xgboost_ray
pip install -e .

Usage

After installation, you can import XGBoost on Ray via two ways:

import xgboost_ray
# or
import ray.util.xgboost

xgboost_ray provides a drop-in replacement for XGBoost’s train function. To pass data, instead of using xgb.DMatrix you will have to use ray.util.xgboost.RayDMatrix.

Here is a simplified example:

def main():
    # Load dataset
    data, labels = datasets.load_breast_cancer(return_X_y=True)
    # Split into train and test set
    train_x, test_x, train_y, test_y = train_test_split(
        data, labels, test_size=0.25)

    train_set = RayDMatrix(train_x, train_y)
    test_set = RayDMatrix(test_x, test_y)

    # Set config
    config = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "max_depth": 3,
    }

    evals_result = {}

    # Train the classifier
    bst = train(
        config,
        train_set,
        evals=[(test_set, "eval")],
        evals_result=evals_result,
        ray_params=RayParams(max_actor_restarts=1, num_actors=1),
        verbose_eval=False)

    bst.save_model("simple.xgb")
    print("Final validation error: {:.4f}".format(
        evals_result["eval"]["error"][-1]))


Data loading

Data is passed to xgboost_ray via a RayDMatrix object.

The RayDMatrix lazy loads data and stores it sharded in the Ray object store. The Ray XGBoost actors then access these shards to run their training on.

A RayDMatrix support various data and file types, like Pandas DataFrames, Numpy Arrays, CSV files and Parquet files.

Example loading multiple parquet files:

import glob
from ray.util.xgboost import RayDMatrix, RayFileType

# We can also pass a list of files
path = list(sorted(glob.glob("/data/nyc-taxi/*/*/*.parquet")))

# This argument will be passed to pd.read_parquet()
columns = [
    "passenger_count",
    "trip_distance", "pickup_longitude", "pickup_latitude",
    "dropoff_longitude", "dropoff_latitude",
    "fare_amount", "extra", "mta_tax", "tip_amount",
    "tolls_amount", "total_amount"
]

dtrain = RayDMatrix(
    path,
    label="passenger_count",  # Will select this column as the label
    columns=columns,
    filetype=RayFileType.PARQUET)

Hyperparameter Tuning

xgboost_ray integrates with Ray Tune (Tune: Scalable Hyperparameter Tuning) to provide distributed hyperparameter tuning for your distributed XGBoost models. You can run multiple xgboost_ray training runs in parallel, each with a different hyperparameter configuration, with each individual training run parallelized.

First, move your training code into a function. This function should take in a config argument which specifies the hyperparameters for the xgboost model.

num_cpus_per_actor = 1
num_actors = 1


def train_model(config):
    # Load dataset
    data, labels = datasets.load_breast_cancer(return_X_y=True)
    # Split into train and test set
    train_x, test_x, train_y, test_y = train_test_split(
        data, labels, test_size=0.25)

    train_set = RayDMatrix(train_x, train_y)
    test_set = RayDMatrix(test_x, test_y)

    evals_result = {}
    bst = train(
        params=config,
        dtrain=train_set,
        evals=[(test_set, "eval")],
        evals_result=evals_result,
        verbose_eval=False,
        ray_params=RayParams(
            num_actors=num_actors, cpus_per_actor=num_cpus_per_actor))
    bst.save_model("model.xgb")


Then, you import tune and use tune’s search primitives to define a hyperparameter search space.

    from ray import tune

    # Set config
    config = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9)
    }

Finally, you call tune.run passing in the training function and the config. Internally, tune will resolve the hyperparameter search space and invoke the training function multiple times, each with different hyperparameters.

    analysis = tune.run(
        train_model,
        config=config,
        metric="eval-error",
        mode="min",
        num_samples=4,
        resources_per_trial={
            "cpu": 1,
            "extra_cpu": num_actors * num_cpus_per_actor
        })

    # Load the best model checkpoint
    import xgboost as xgb
    import os

    # Load in the best performing model.
    best_bst = xgb.Booster()
    best_bst.load_model(os.path.join(analysis.best_logdir, "model.xgb"))

    accuracy = 1. - analysis.best_result["eval-error"]
    print(f"Best model parameters: {analysis.best_config}")
    print(f"Best model total accuracy: {accuracy:.4f}")

Make sure you set the extra_cpu field appropriately so tune is aware of the total number of resources each trial requires.

Resources

By default, xgboost_ray tries to determine the number of CPUs available and distributes them evenly across actors.

In the case of very large clusters or clusters with many different machine sizes, it makes sense to limit the number of CPUs per actor by setting the cpus_per_actor argument. Consider always setting this explicitly.

The number of XGBoost actors always has to be set manually with the num_actors argument.

Memory usage

XGBoost uses a compute-optimized datastructure, the DMatrix, to hold training data. When converting a dataset to a DMatrix, XGBoost creates intermediate copies and ends up holding a complete copy of the full data. The data will be converted into the local dataformat (on a 64 bit system these are 64 bit floats.) Depending on the system and original dataset dtype, this matrix can thus occupy more memory than the original dataset.

The peak memory usage for CPU-based training is at least 3x the dataset size (assuming dtype float32 on a 64bit system) plus about 400,000 KiB for other resources, like operating system requirements and storing of intermediate results.

Example

  • Machine type: AWS m5.xlarge (4 vCPUs, 16 GiB RAM)

  • Usable RAM: ~15,350,000 KiB

  • Dataset: 1,250,000 rows with 1024 features, dtype float32. Total size: 5,000,000 KiB

  • XGBoost DMatrix size: ~10,000,000 KiB

This dataset will fit exactly on this node for training.

Note that the DMatrix size might be lower on a 32 bit system.

GPUs

Generally, the same memory requirements exist for GPU-based training. Additionally, the GPU must have enough memory to hold the dataset.

In the example above, the GPU must have at least 10,000,000 KiB (about 9.6 GiB) memory. However, empirically we found that using a DeviceQuantileDMatrix seems to show more peak GPU memory usage, possibly for intermediate storage when loading data (about 10%).

Best practices

In order to reduce peak memory usage, consider the following suggestions:

  • Store data as float32 or less. More precision is often not needed, and keeping data in a smaller format will help reduce peak memory usage for initial data loading.

  • Pass the dtype when loading data from CSV. Otherwise, floating point values will be loaded as np.float64 per default, increasing peak memory usage by 33%.

Placement Strategies

xgboost_ray leverages Ray’s Placement Group API to implement placement strategies for better fault tolerance.

By default, a SPREAD strategy is used for training, which attempts to spread all of the training workers across the nodes in a cluster on a best-effort basis. This improves fault tolerance since it minimizes the number of worker failures when a node goes down, but comes at a cost of increased inter-node communication To disable this strategy, set the USE_SPREAD_STRATEGY environment variable to 0. If disabled, no particular placement strategy will be used.

Note that this strategy is used only when elastic_training is not used. If elastic_training is set to True, no placement strategy is used.

When xgboost_ray is used with Ray Tune for hyperparameter tuning, a PACK strategy is used. This strategy attempts to place all workers for each trial on the same node on a best-effort basis. This means that if a node goes down, it will be less likely to impact multiple trials.

When placement strategies are used, xgboost_ray will wait for 100 seconds for the required resources to become available, and will fail if the required resources cannot be reserved and the cluster cannot autoscale to increase the number of resources. You can change the PLACEMENT_GROUP_TIMEOUT_S environment variable to modify how long this timeout should be.

More examples

Fore complete end to end examples, please have a look at the examples folder: