XGBoost Dynamic Resources Example#

from typing import Union, Dict, Any, Optional
import sklearn.datasets
import sklearn.metrics
import os
from sklearn.model_selection import train_test_split
import xgboost as xgb
from xgboost.core import Booster
import pickle

import ray
from ray import air, tune
from ray.tune.schedulers import ResourceChangingScheduler, ASHAScheduler
from ray.tune import Trainable
from ray.tune.resources import Resources
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.experiment import Trial
from ray.tune.execution import trial_runner
from ray.tune.integration.xgboost import TuneReportCheckpointCallback

CHECKPOINT_FILENAME = "model.xgb"


def get_best_model_checkpoint(best_result: "ray.air.Result"):
    best_bst = xgb.Booster()

    with best_result.checkpoint.as_directory() as checkpoint_dir:
        to_load = os.path.join(checkpoint_dir, CHECKPOINT_FILENAME)

        if not os.path.exists(to_load):
            # Class trainable
            with open(os.path.join(checkpoint_dir, "checkpoint"), "rb") as f:
                _, _, raw_model = pickle.load(f)
            to_load = bytearray(raw_model)

        best_bst.load_model(to_load)

    accuracy = 1.0 - best_result.metrics["eval-logloss"]
    print(f"Best model parameters: {best_result.config}")
    print(f"Best model total accuracy: {accuracy:.4f}")
    return best_bst


# FUNCTION API EXAMPLE


# our train function needs to be able to checkpoint
# to work with ResourceChangingScheduler
def train_breast_cancer(config: dict, checkpoint_dir=None):
    # This is a simple training function to be passed into Tune
    # Load dataset
    data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
    # Split into train and test set
    train_x, test_x, train_y, test_y = train_test_split(data, labels, test_size=0.25)
    # Build input matrices for XGBoost
    train_set = xgb.DMatrix(train_x, label=train_y)
    test_set = xgb.DMatrix(test_x, label=test_y)

    # Checkpointing needs to be set up in order for dynamic
    # resource allocation to work as intended
    xgb_model = None
    if checkpoint_dir:
        xgb_model = xgb.Booster()
        xgb_model.load_model(os.path.join(checkpoint_dir, CHECKPOINT_FILENAME))

    # we can obtain current trial resources through
    # `tune.get_trial_resources()`
    config["nthread"] = int(tune.get_trial_resources().head_cpus)
    print(f"nthreads: {config['nthread']} xgb_model: {xgb_model}")
    # Train the classifier, using the Tune callback
    xgb.train(
        config,
        train_set,
        evals=[(test_set, "eval")],
        verbose_eval=False,
        xgb_model=xgb_model,
        callbacks=[
            TuneReportCheckpointCallback(
                filename=CHECKPOINT_FILENAME,
                # checkpointing should happen every iteration
                # with dynamic resource allocation
                frequency=1,
            )
        ],
    )


# TRAINABLE (CLASS) API EXAMPLE
class BreastCancerTrainable(Trainable):
    def setup(self, config):
        self.config = config
        self.nthread = config.pop("nthread", 1)
        self.model: xgb.Booster = None
        # Load dataset
        data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
        # Split into train and test set
        train_x, test_x, train_y, test_y = train_test_split(
            data, labels, test_size=0.25
        )
        # Build input matrices for XGBoost
        self.train_set = xgb.DMatrix(train_x, label=train_y)
        self.test_set = xgb.DMatrix(test_x, label=test_y)

    def step(self):
        # you can also obtain current trial resources:
        current_resources = self.trial_resources
        if isinstance(current_resources, PlacementGroupFactory):
            self.nthread = current_resources.head_cpus
        else:
            self.nthread = current_resources.cpu

        results = {}
        config = self.config.copy()
        config["nthread"] = int(self.nthread)
        self.model = xgb.train(
            config,
            self.train_set,
            evals=[(self.test_set, "eval")],
            verbose_eval=False,
            xgb_model=self.model,
            evals_result=results,
            num_boost_round=1,
        )
        print(config, results)
        return {"eval-logloss": results["eval"]["logloss"][-1], "nthread": self.nthread}

    def save_checkpoint(self, checkpoint_dir):
        path = os.path.join(checkpoint_dir, "checkpoint")
        with open(path, "wb") as outputFile:
            pickle.dump((self.config, self.nthread, self.model.save_raw()), outputFile)
        return path

    def load_checkpoint(self, checkpoint_path):
        with open(checkpoint_path, "rb") as inputFile:
            self.config, self.nthread, raw_model = pickle.load(inputFile)
        self.model = Booster()
        self.model.load_model(bytearray(raw_model))
        data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
        # Split into train and test set
        train_x, test_x, train_y, test_y = train_test_split(
            data, labels, test_size=0.25
        )
        # Build input matrices for XGBoost
        self.train_set = xgb.DMatrix(train_x, label=train_y)
        self.test_set = xgb.DMatrix(test_x, label=test_y)


def tune_xgboost(use_class_trainable=True):
    search_space = {
        # You can mix constants with search space objects.
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "max_depth": 9,
        "learning_rate": 1,
        "min_child_weight": tune.grid_search([2, 3]),
        "subsample": tune.grid_search([0.8, 0.9]),
        "colsample_bynode": tune.grid_search([0.8, 0.9]),
        "random_state": 1,
        "num_parallel_tree": 2000,
    }
    # This will enable aggressive early stopping of bad trials.
    base_scheduler = ASHAScheduler(
        max_t=16, grace_period=1, reduction_factor=2  # 16 training iterations
    )

    def example_resources_allocation_function(
        trial_runner: "trial_runner.TrialRunner",
        trial: Trial,
        result: Dict[str, Any],
        scheduler: "ResourceChangingScheduler",
    ) -> Optional[Union[PlacementGroupFactory, Resources]]:
        """This is a basic example of a resource allocating function.

        The function naively balances available CPUs over live trials.

        This function returns a new ``PlacementGroupFactory`` with updated
        resource requirements, or None. If the returned
        ``PlacementGroupFactory`` is equal by value to the one the
        trial has currently, the scheduler will skip the update process
        internally (same with None).

        See :class:`DistributeResources` for a more complex,
        robust approach.

        Args:
            trial_runner: Trial runner for this Tune run.
                Can be used to obtain information about other trials.
            trial: The trial to allocate new resources to.
            result: The latest results of trial.
            scheduler: The scheduler calling the function.
        """

        # Get base trial resources as defined in
        # ``tune.with_resources``
        base_trial_resource = scheduler._base_trial_resources

        # Don't bother if this is just the first iteration
        if result["training_iteration"] < 1:
            return None

        # default values if resources_per_trial is unspecified
        if base_trial_resource is None:
            base_trial_resource = PlacementGroupFactory([{"CPU": 1, "GPU": 0}])

        # Assume that the number of CPUs cannot go below what was
        # specified in ``Tuner.fit()``.
        min_cpu = base_trial_resource.required_resources.get("CPU", 0)

        # Get the number of CPUs available in total (not just free)
        total_available_cpus = (
            trial_runner.trial_executor._resource_updater.get_num_cpus()
        )

        # Divide the free CPUs among all live trials
        cpu_to_use = max(
            min_cpu, total_available_cpus // len(trial_runner.get_live_trials())
        )

        # Assign new CPUs to the trial in a PlacementGroupFactory
        return PlacementGroupFactory([{"CPU": cpu_to_use, "GPU": 0}])

    # You can either define your own resources_allocation_function, or
    # use the default one - DistributeResources

    # from ray.tune.schedulers.resource_changing_scheduler import \
    #    DistributeResources

    scheduler = ResourceChangingScheduler(
        base_scheduler=base_scheduler,
        resources_allocation_function=example_resources_allocation_function
        # resources_allocation_function=DistributeResources()  # default
    )

    if use_class_trainable:
        fn = BreastCancerTrainable
    else:
        fn = train_breast_cancer

    tuner = tune.Tuner(
        tune.with_resources(
            fn, resources=PlacementGroupFactory([{"CPU": 1, "GPU": 0}])
        ),
        tune_config=tune.TuneConfig(
            metric="eval-logloss",
            mode="min",
            num_samples=1,
            scheduler=scheduler,
        ),
        run_config=air.RunConfig(
            checkpoint_config=air.CheckpointConfig(
                checkpoint_at_end=use_class_trainable,
            )
        ),
        param_space=search_space,
    )
    results = tuner.fit()

    if use_class_trainable:
        assert results.get_dataframe()["nthread"].max() > 1

    return results.get_best_result()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--class-trainable",
        action="store_true",
        default=False,
        help="set to use the Trainable (class) API instead of functional one",
    )
    parser.add_argument(
        "--test",
        action="store_true",
        default=False,
        help="set to run both functional and Trainable APIs",
    )
    args, _ = parser.parse_known_args()

    ray.init(num_cpus=8)

    if args.test:
        best_result = tune_xgboost(use_class_trainable=True)
        best_bst = get_best_model_checkpoint(best_result)

    best_result = tune_xgboost(use_class_trainable=args.class_trainable)

    best_bst = get_best_model_checkpoint(best_result)

    # You could now do further predictions with
    # best_bst.predict(...)