Training a model with Sklearn#

In this example we will train a model in Ray AIR using a Sklearn classifier.

Let’s start with installing our dependencies:

!pip install -qU "ray[tune]" sklearn

Then we need some imports:

from typing import Tuple


import ray
from ray.data.dataset import Dataset
from ray.train.batch_predictor import BatchPredictor
from ray.train.sklearn import SklearnPredictor
from ray.data.preprocessors import Chain, OrdinalEncoder, StandardScaler
from ray.air.result import Result
from ray.train.sklearn import SklearnTrainer
from ray.air.config import ScalingConfig

from sklearn.ensemble import RandomForestClassifier

try:
    from cuml.ensemble import RandomForestClassifier as cuMLRandomForestClassifier
except ImportError:
    cuMLRandomForestClassifier = None

Next we define a function to load our train, validation, and test datasets.

def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    dataset = ray.data.read_csv("s3://[email protected]/breast_cancer_with_categorical.csv")
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(["target"])
    return train_dataset, valid_dataset, test_dataset

The following function will create a Sklearn trainer, train it, and return the result.

def train_sklearn(num_cpus: int, use_gpu: bool = False) -> Result:
    if use_gpu and not cuMLRandomForestClassifier:
        raise RuntimeError("cuML must be installed for GPU enabled sklearn estimators.")

    train_dataset, valid_dataset, _ = prepare_data()

    # Scale some random columns
    columns_to_scale = ["mean radius", "mean texture"]
    preprocessor = Chain(
        OrdinalEncoder(["categorical_column"]), StandardScaler(columns=columns_to_scale)
    )

    if use_gpu:
        trainer_resources = {"CPU": 1, "GPU": 1}
        estimator = cuMLRandomForestClassifier()
    else:
        trainer_resources = {"CPU": num_cpus}
        estimator = RandomForestClassifier()

    trainer = SklearnTrainer(
        estimator=estimator,
        label_column="target",
        datasets={"train": train_dataset, "valid": valid_dataset},
        preprocessor=preprocessor,
        cv=5,
        scaling_config=ScalingConfig(trainer_resources=trainer_resources),
    )
    result = trainer.fit()
    print(result.metrics)

    return result

Once we have the result, we can do batch inference on the obtained model. Let’s define a utility function for this.

def predict_sklearn(result: Result, use_gpu: bool = False):
    _, _, test_dataset = prepare_data()

    batch_predictor = BatchPredictor.from_checkpoint(
        result.checkpoint, SklearnPredictor
    )

    predicted_labels = (
        batch_predictor.predict(
            test_dataset,
            num_gpus_per_worker=int(use_gpu),
        )
        .map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    )
    print(f"PREDICTED LABELS")
    predicted_labels.show()

Now we can run the training:

result = train_sklearn(num_cpus=2, use_gpu=False)
2022-06-22 17:27:37,741	INFO services.py:1477 -- View the Ray dashboard at http://127.0.0.1:8269
2022-06-22 17:27:39,822	WARNING read_api.py:260 -- The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 44.05it/s]
== Status ==
Current time: 2022-06-22 17:27:59 (running for 00:00:18.31)
Memory usage on this node: 10.7/31.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/8 CPUs, 0/0 GPUs, 0.0/12.9 GiB heap, 0.0/6.45 GiB objects
Result logdir: /home/ubuntu/ray_results/SklearnTrainer_2022-06-22_17-27-40
Number of trials: 1/1 (1 TERMINATED)
Trial name status loc iter total time (s) fit_time
SklearnTrainer_9dec8_00000TERMINATED172.31.43.110:1492629 1 15.6842 2.31571


(SklearnTrainer pid=1492629) 2022-06-22 17:27:45,647	WARNING pool.py:591 -- The 'context' argument is not supported using ray. Please refer to the documentation for how to control ray initialization.
Result for SklearnTrainer_9dec8_00000:
  cv:
    fit_time:
    - 2.221003770828247
    - 2.215489387512207
    - 2.2075674533843994
    - 2.222351312637329
    - 2.312389612197876
    fit_time_mean: 2.235760307312012
    fit_time_std: 0.03866614559685742
    score_time:
    - 0.022464990615844727
    - 0.0230865478515625
    - 0.02564835548400879
    - 0.029137849807739258
    - 0.021221637725830078
    score_time_mean: 0.02431187629699707
    score_time_std: 0.0028120522003997595
    test_score:
    - 0.9625
    - 0.9125
    - 0.9875
    - 1.0
    - 0.9367088607594937
    test_score_mean: 0.9598417721518986
    test_score_std: 0.032128186960552516
  date: 2022-06-22_17-27-59
  done: false
  experiment_id: f8215019c10e4a81ba2187c38e875365
  fit_time: 2.3157050609588623
  hostname: ip-172-31-43-110
  iterations_since_restore: 1
  node_ip: 172.31.43.110
  pid: 1492629
  should_checkpoint: true
  time_since_restore: 15.684244871139526
  time_this_iter_s: 15.684244871139526
  time_total_s: 15.684244871139526
  timestamp: 1655918879
  timesteps_since_restore: 0
  training_iteration: 1
  trial_id: 9dec8_00000
  valid:
    score_time: 0.03549623489379883
    test_score: 0.9532163742690059
  warmup_time: 0.0057866573333740234
  
Result for SklearnTrainer_9dec8_00000:
  cv:
    fit_time:
    - 2.221003770828247
    - 2.215489387512207
    - 2.2075674533843994
    - 2.222351312637329
    - 2.312389612197876
    fit_time_mean: 2.235760307312012
    fit_time_std: 0.03866614559685742
    score_time:
    - 0.022464990615844727
    - 0.0230865478515625
    - 0.02564835548400879
    - 0.029137849807739258
    - 0.021221637725830078
    score_time_mean: 0.02431187629699707
    score_time_std: 0.0028120522003997595
    test_score:
    - 0.9625
    - 0.9125
    - 0.9875
    - 1.0
    - 0.9367088607594937
    test_score_mean: 0.9598417721518986
    test_score_std: 0.032128186960552516
  date: 2022-06-22_17-27-59
  done: true
  experiment_id: f8215019c10e4a81ba2187c38e875365
  experiment_tag: '0'
  fit_time: 2.3157050609588623
  hostname: ip-172-31-43-110
  iterations_since_restore: 1
  node_ip: 172.31.43.110
  pid: 1492629
  should_checkpoint: true
  time_since_restore: 15.684244871139526
  time_this_iter_s: 15.684244871139526
  time_total_s: 15.684244871139526
  timestamp: 1655918879
  timesteps_since_restore: 0
  training_iteration: 1
  trial_id: 9dec8_00000
  valid:
    score_time: 0.03549623489379883
    test_score: 0.9532163742690059
  warmup_time: 0.0057866573333740234
  
2022-06-22 17:27:59,333	INFO tune.py:734 -- Total run time: 19.09 seconds (18.31 seconds for the tuning loop).
{'valid': {'score_time': 0.03549623489379883, 'test_score': 0.9532163742690059}, 'cv': {'fit_time': array([2.22100377, 2.21548939, 2.20756745, 2.22235131, 2.31238961]), 'score_time': array([0.02246499, 0.02308655, 0.02564836, 0.02913785, 0.02122164]), 'test_score': array([0.9625    , 0.9125    , 0.9875    , 1.        , 0.93670886]), 'fit_time_mean': 2.235760307312012, 'fit_time_std': 0.03866614559685742, 'score_time_mean': 0.02431187629699707, 'score_time_std': 0.0028120522003997595, 'test_score_mean': 0.9598417721518986, 'test_score_std': 0.032128186960552516}, 'fit_time': 2.3157050609588623, 'time_this_iter_s': 15.684244871139526, 'should_checkpoint': True, 'done': True, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 1, 'trial_id': '9dec8_00000', 'experiment_id': 'f8215019c10e4a81ba2187c38e875365', 'date': '2022-06-22_17-27-59', 'timestamp': 1655918879, 'time_total_s': 15.684244871139526, 'pid': 1492629, 'hostname': 'ip-172-31-43-110', 'node_ip': '172.31.43.110', 'config': {}, 'time_since_restore': 15.684244871139526, 'timesteps_since_restore': 0, 'iterations_since_restore': 1, 'warmup_time': 0.0057866573333740234, 'experiment_tag': '0'}

And perform inference on the obtained model:

predict_sklearn(result, use_gpu=False)
2022-06-22 17:27:59,658	WARNING read_api.py:260 -- The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 64.73it/s]
Map Progress (1 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.60s/it]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 71.41it/s]
PREDICTED LABELS
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}