XGBoost-Ray with Modin

This notebook includes an example workflow using XGBoost-Ray and Modin for distributed model training and prediction.

Cluster Setup

First, we’ll set up our Ray Cluster. The provided modin_xgboost.yaml cluster config can be used to set up an AWS cluster with 64 CPUs.

The following steps assume you are in a directory with both modin_xgboost.yaml and this file saved as modin_xgboost.ipynb.

Step 1: Bring up the Ray cluster.

$ pip install ray boto3
$ ray up modin_xgboost.yaml

Step 2: Move modin_xgboost.ipynb to the cluster and start Jupyter.

$ ray rsync_up modin_xgboost.yaml "./modin_xgboost.ipynb" \
  "~/modin_xgboost.ipynb"
$ ray exec modin_xgboost.yaml --port-forward=9999 "jupyter notebook \
  --port=9999"

You can then access this notebook at the URL that is output: http://localhost:9999/?token=<token>

Python Setup

First, we’ll import all the libraries we’ll be using. This step also helps us verify that the environment is configured correctly. If any of the imports are missing, an exception will be raised.

import argparse
import time

import modin.pandas as pd
from modin.experimental.sklearn.model_selection import train_test_split
from xgboost_ray import RayDMatrix, RayParams, train, predict

import ray

Next, let’s parse some arguments. This will be used for executing the .py file, but not for the .ipynb. If you are using the interactive notebook, you can directly override the arguments manually.

parser = argparse.ArgumentParser()
parser.add_argument(
    "--address", type=str, default="auto", help="The address to use for Ray.")
parser.add_argument(
    "--smoke-test",
    action="store_true",
    help="Read a smaller dataset for quick testing purposes.")
parser.add_argument(
    "--num-actors",
    type=int,
    default=4,
    help="Sets number of actors for training.")
parser.add_argument(
    "--cpus-per-actor",
    type=int,
    default=8,
    help="The number of CPUs per actor for training.")
parser.add_argument(
    "--num-actors-inference",
    type=int,
    default=16,
    help="Sets number of actors for inference.")
parser.add_argument(
    "--cpus-per-actor-inference",
    type=int,
    default=2,
    help="The number of CPUs per actor for inference.")
# Ignore -f from ipykernel_launcher
args, _ = parser.parse_known_args()

Override these arguments as needed:

address = args.address
smoke_test = args.smoke_test
num_actors = args.num_actors
cpus_per_actor = args.cpus_per_actor
num_actors_inference = args.num_actors_inference
cpus_per_actor_inference = args.cpus_per_actor_inference

Connecting to the Ray cluster

Now, let’s connect our Python script to this newly deployed Ray cluster!

if not ray.is_initialized():
    ray.init(address=address)

Data Preparation

We will use the HIGGS dataset from the UCI Machine Learning dataset repository. The HIGGS dataset consists of 11,000,000 samples and 28 attributes, which is large enough size to show the benefits of distributed computation.

LABEL_COLUMN = "label"
if smoke_test:
    # Test dataset with only 10,000 records.
    FILE_URL = "https://ray-ci-higgs.s3.us-west-2.amazonaws.com/simpleHIGGS" \
                ".csv"
else:
    # Full dataset. This may take a couple of minutes to load.
    FILE_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases" \
                "/00280/HIGGS.csv.gz"

colnames = [LABEL_COLUMN] + ["feature-%02d" % i for i in range(1, 29)]
load_data_start_time = time.time()

df = pd.read_csv(FILE_URL, names=colnames)

load_data_end_time = time.time()
load_data_duration = load_data_end_time - load_data_start_time
print(f"Dataset loaded in {load_data_duration} seconds.")

Split data into training and validation.

df_train, df_validation = train_test_split(df)
print(df_train, df_validation)

Distributed Training

The train_xgboost function contains all of the logic necessary for training using XGBoost-Ray.

Distributed training can not only speed up the process, but also allow you to use datasets that are to large to fit in memory of a single node. With distributed training, the dataset is sharded across different actors running on separate nodes. Those actors communicate with each other to create the final model.

First, the dataframes are wrapped in RayDMatrix objects, which handle data sharding across the cluster. Then, the train function is called. The evaluation scores will be saved to evals_result dictionary. The function returns a tuple of the trained model (booster) and the evaluation scores.

The ray_params variable expects a RayParams object that contains Ray-specific settings, such as the number of workers.

def train_xgboost(config, train_df, test_df, target_column, ray_params):
    train_set = RayDMatrix(train_df, target_column)
    test_set = RayDMatrix(test_df, target_column)

    evals_result = {}

    train_start_time = time.time()

    # Train the classifier
    bst = train(
        params=config,
        dtrain=train_set,
        evals=[(test_set, "eval")],
        evals_result=evals_result,
        verbose_eval=False,
        num_boost_round=100,
        ray_params=ray_params)

    train_end_time = time.time()
    train_duration = train_end_time - train_start_time
    print(f"Total time taken: {train_duration} seconds.")

    model_path = "model.xgb"
    bst.save_model(model_path)
    print("Final validation error: {:.4f}".format(
        evals_result["eval"]["error"][-1]))

    return bst, evals_result

We can now pass our Modin dataframes and run the function. We will use RayParams to specify that the number of actors and CPUs to train with.

# standard XGBoost config for classification
config = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
}

bst, evals_result = train_xgboost(
    config, df_train, df_validation, LABEL_COLUMN,
    RayParams(cpus_per_actor=cpus_per_actor, num_actors=num_actors))
print(f"Results: {evals_result}")

Prediction

With the model trained, we can now predict on unseen data. For the purposes of this example, we will use the same dataset for prediction as for training.

Since prediction is naively parallelizable, distributing it over multiple actors can measurably reduce the amount of time needed.

inference_df = RayDMatrix(df, ignore=[LABEL_COLUMN, "partition"])
results = predict(
    bst,
    inference_df,
    ray_params=RayParams(
        cpus_per_actor=cpus_per_actor_inference,
        num_actors=num_actors_inference))

print(results)

Gallery generated by Sphinx-Gallery