from typing import Dict, Any, Optional
import sklearn.datasets
import sklearn.metrics
import os
from sklearn.model_selection import train_test_split
import xgboost as xgb
from xgboost.core import Booster
import pickle
import ray
from ray import air, tune
from ray.tune.schedulers import ResourceChangingScheduler, ASHAScheduler
from ray.tune import Trainable
from ray.tune.execution.placement_groups import PlacementGroupFactory
from ray.tune.experiment import Trial
from ray.tune.execution import trial_runner
from ray.tune.integration.xgboost import TuneReportCheckpointCallback
CHECKPOINT_FILENAME = "model.xgb"
def get_best_model_checkpoint(best_result: "ray.air.Result"):
best_bst = xgb.Booster()
with best_result.checkpoint.as_directory() as checkpoint_dir:
to_load = os.path.join(checkpoint_dir, CHECKPOINT_FILENAME)
if not os.path.exists(to_load):
# Class trainable
with open(os.path.join(checkpoint_dir, "checkpoint"), "rb") as f:
_, _, raw_model = pickle.load(f)
to_load = bytearray(raw_model)
best_bst.load_model(to_load)
accuracy = 1.0 - best_result.metrics["eval-logloss"]
print(f"Best model parameters: {best_result.config}")
print(f"Best model total accuracy: {accuracy:.4f}")
return best_bst
# FUNCTION API EXAMPLE
# our train function needs to be able to checkpoint
# to work with ResourceChangingScheduler
def train_breast_cancer(config: dict, checkpoint_dir=None):
# This is a simple training function to be passed into Tune
# Load dataset
data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, test_x, train_y, test_y = train_test_split(data, labels, test_size=0.25)
# Build input matrices for XGBoost
train_set = xgb.DMatrix(train_x, label=train_y)
test_set = xgb.DMatrix(test_x, label=test_y)
# Checkpointing needs to be set up in order for dynamic
# resource allocation to work as intended
xgb_model = None
if checkpoint_dir:
xgb_model = xgb.Booster()
xgb_model.load_model(os.path.join(checkpoint_dir, CHECKPOINT_FILENAME))
# we can obtain current trial resources through
# `tune.get_trial_resources()`
config["nthread"] = int(tune.get_trial_resources().head_cpus)
print(f"nthreads: {config['nthread']} xgb_model: {xgb_model}")
# Train the classifier, using the Tune callback
xgb.train(
config,
train_set,
evals=[(test_set, "eval")],
verbose_eval=False,
xgb_model=xgb_model,
callbacks=[
TuneReportCheckpointCallback(
filename=CHECKPOINT_FILENAME,
# checkpointing should happen every iteration
# with dynamic resource allocation
frequency=1,
)
],
)
# TRAINABLE (CLASS) API EXAMPLE
class BreastCancerTrainable(Trainable):
def setup(self, config):
self.config = config
self.nthread = config.pop("nthread", 1)
self.model: xgb.Booster = None
# Load dataset
data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, test_x, train_y, test_y = train_test_split(
data, labels, test_size=0.25
)
# Build input matrices for XGBoost
self.train_set = xgb.DMatrix(train_x, label=train_y)
self.test_set = xgb.DMatrix(test_x, label=test_y)
def step(self):
# you can also obtain current trial resources:
current_resources = self.trial_resources
if isinstance(current_resources, PlacementGroupFactory):
self.nthread = current_resources.head_cpus
else:
self.nthread = current_resources.cpu
results = {}
config = self.config.copy()
config["nthread"] = int(self.nthread)
self.model = xgb.train(
config,
self.train_set,
evals=[(self.test_set, "eval")],
verbose_eval=False,
xgb_model=self.model,
evals_result=results,
num_boost_round=1,
)
print(config, results)
return {"eval-logloss": results["eval"]["logloss"][-1], "nthread": self.nthread}
def save_checkpoint(self, checkpoint_dir):
path = os.path.join(checkpoint_dir, "checkpoint")
with open(path, "wb") as outputFile:
pickle.dump((self.config, self.nthread, self.model.save_raw()), outputFile)
return path
def load_checkpoint(self, checkpoint_path):
with open(checkpoint_path, "rb") as inputFile:
self.config, self.nthread, raw_model = pickle.load(inputFile)
self.model = Booster()
self.model.load_model(bytearray(raw_model))
data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, test_x, train_y, test_y = train_test_split(
data, labels, test_size=0.25
)
# Build input matrices for XGBoost
self.train_set = xgb.DMatrix(train_x, label=train_y)
self.test_set = xgb.DMatrix(test_x, label=test_y)
def tune_xgboost(use_class_trainable=True):
search_space = {
# You can mix constants with search space objects.
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"max_depth": 9,
"learning_rate": 1,
"min_child_weight": tune.grid_search([2, 3]),
"subsample": tune.grid_search([0.8, 0.9]),
"colsample_bynode": tune.grid_search([0.8, 0.9]),
"random_state": 1,
"num_parallel_tree": 2000,
}
# This will enable aggressive early stopping of bad trials.
base_scheduler = ASHAScheduler(
max_t=16, grace_period=1, reduction_factor=2 # 16 training iterations
)
def example_resources_allocation_function(
trial_runner: "trial_runner.TrialRunner",
trial: Trial,
result: Dict[str, Any],
scheduler: "ResourceChangingScheduler",
) -> Optional[PlacementGroupFactory]:
"""This is a basic example of a resource allocating function.
The function naively balances available CPUs over live trials.
This function returns a new ``PlacementGroupFactory`` with updated
resource requirements, or None. If the returned
``PlacementGroupFactory`` is equal by value to the one the
trial has currently, the scheduler will skip the update process
internally (same with None).
See :class:`DistributeResources` for a more complex,
robust approach.
Args:
trial_runner: Trial runner for this Tune run.
Can be used to obtain information about other trials.
trial: The trial to allocate new resources to.
result: The latest results of trial.
scheduler: The scheduler calling the function.
"""
# Get base trial resources as defined in
# ``tune.with_resources``
base_trial_resource = scheduler._base_trial_resources
# Don't bother if this is just the first iteration
if result["training_iteration"] < 1:
return None
# default values if resources_per_trial is unspecified
if base_trial_resource is None:
base_trial_resource = PlacementGroupFactory([{"CPU": 1, "GPU": 0}])
# Assume that the number of CPUs cannot go below what was
# specified in ``Tuner.fit()``.
min_cpu = base_trial_resource.required_resources.get("CPU", 0)
# Get the number of CPUs available in total (not just free)
total_available_cpus = (
trial_runner.trial_executor._resource_updater.get_num_cpus()
)
# Divide the free CPUs among all live trials
cpu_to_use = max(
min_cpu, total_available_cpus // len(trial_runner.get_live_trials())
)
# Assign new CPUs to the trial in a PlacementGroupFactory
return PlacementGroupFactory([{"CPU": cpu_to_use, "GPU": 0}])
# You can either define your own resources_allocation_function, or
# use the default one - DistributeResources
# from ray.tune.schedulers.resource_changing_scheduler import \
# DistributeResources
scheduler = ResourceChangingScheduler(
base_scheduler=base_scheduler,
resources_allocation_function=example_resources_allocation_function
# resources_allocation_function=DistributeResources() # default
)
if use_class_trainable:
fn = BreastCancerTrainable
else:
fn = train_breast_cancer
tuner = tune.Tuner(
tune.with_resources(
fn, resources=PlacementGroupFactory([{"CPU": 1, "GPU": 0}])
),
tune_config=tune.TuneConfig(
metric="eval-logloss",
mode="min",
num_samples=1,
scheduler=scheduler,
),
run_config=air.RunConfig(
checkpoint_config=air.CheckpointConfig(
checkpoint_at_end=use_class_trainable,
)
),
param_space=search_space,
)
results = tuner.fit()
if use_class_trainable:
assert results.get_dataframe()["nthread"].max() > 1
return results.get_best_result()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--class-trainable",
action="store_true",
default=False,
help="set to use the Trainable (class) API instead of functional one",
)
parser.add_argument(
"--test",
action="store_true",
default=False,
help="set to run both functional and Trainable APIs",
)
args, _ = parser.parse_known_args()
ray.init(num_cpus=8)
if args.test:
best_result = tune_xgboost(use_class_trainable=True)
best_bst = get_best_model_checkpoint(best_result)
best_result = tune_xgboost(use_class_trainable=args.class_trainable)
best_bst = get_best_model_checkpoint(best_result)
# You could now do further predictions with
# best_bst.predict(...)