Training a model with Sklearn
Training a model with Sklearn#
In this example we will train a model in Ray AIR using a Sklearn classifier.
Letβs start with installing our dependencies:
!pip install -qU "ray[tune]" sklearn
Then we need some imports:
from typing import Tuple
import ray
from ray.data.dataset import Dataset
from ray.train.batch_predictor import BatchPredictor
from ray.train.sklearn import SklearnPredictor
from ray.data.preprocessors import Chain, OrdinalEncoder, StandardScaler
from ray.air.result import Result
from ray.train.sklearn import SklearnTrainer
from ray.air.config import ScalingConfig
from sklearn.ensemble import RandomForestClassifier
try:
from cuml.ensemble import RandomForestClassifier as cuMLRandomForestClassifier
except ImportError:
cuMLRandomForestClassifier = None
Next we define a function to load our train, validation, and test datasets.
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
dataset = ray.data.read_csv("s3://[email protected]/breast_cancer_with_categorical.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
test_dataset = valid_dataset.drop_columns(["target"])
return train_dataset, valid_dataset, test_dataset
The following function will create a Sklearn trainer, train it, and return the result.
def train_sklearn(num_cpus: int, use_gpu: bool = False) -> Result:
if use_gpu and not cuMLRandomForestClassifier:
raise RuntimeError("cuML must be installed for GPU enabled sklearn estimators.")
train_dataset, valid_dataset, _ = prepare_data()
# Scale some random columns
columns_to_scale = ["mean radius", "mean texture"]
preprocessor = Chain(
OrdinalEncoder(["categorical_column"]), StandardScaler(columns=columns_to_scale)
)
if use_gpu:
trainer_resources = {"CPU": 1, "GPU": 1}
estimator = cuMLRandomForestClassifier()
else:
trainer_resources = {"CPU": num_cpus}
estimator = RandomForestClassifier()
trainer = SklearnTrainer(
estimator=estimator,
label_column="target",
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
cv=5,
scaling_config=ScalingConfig(trainer_resources=trainer_resources),
)
result = trainer.fit()
print(result.metrics)
return result
Once we have the result, we can do batch inference on the obtained model. Letβs define a utility function for this.
def predict_sklearn(result: Result, use_gpu: bool = False):
_, _, test_dataset = prepare_data()
batch_predictor = BatchPredictor.from_checkpoint(
result.checkpoint, SklearnPredictor
)
predicted_labels = (
batch_predictor.predict(
test_dataset,
num_gpus_per_worker=int(use_gpu),
)
.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
)
print(f"PREDICTED LABELS")
predicted_labels.show()
Now we can run the training:
result = train_sklearn(num_cpus=2, use_gpu=False)
2022-06-22 17:27:37,741 INFO services.py:1477 -- View the Ray dashboard at http://127.0.0.1:8269
2022-06-22 17:27:39,822 WARNING read_api.py:260 -- The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Map_Batches: 100%|ββββββββββ| 1/1 [00:00<00:00, 44.05it/s]
== Status ==
Current time: 2022-06-22 17:27:59 (running for 00:00:18.31)
Memory usage on this node: 10.7/31.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/8 CPUs, 0/0 GPUs, 0.0/12.9 GiB heap, 0.0/6.45 GiB objects
Result logdir: /home/ubuntu/ray_results/SklearnTrainer_2022-06-22_17-27-40
Number of trials: 1/1 (1 TERMINATED)
Current time: 2022-06-22 17:27:59 (running for 00:00:18.31)
Memory usage on this node: 10.7/31.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/8 CPUs, 0/0 GPUs, 0.0/12.9 GiB heap, 0.0/6.45 GiB objects
Result logdir: /home/ubuntu/ray_results/SklearnTrainer_2022-06-22_17-27-40
Number of trials: 1/1 (1 TERMINATED)
Trial name | status | loc | iter | total time (s) | fit_time |
---|---|---|---|---|---|
SklearnTrainer_9dec8_00000 | TERMINATED | 172.31.43.110:1492629 | 1 | 15.6842 | 2.31571 |
(SklearnTrainer pid=1492629) 2022-06-22 17:27:45,647 WARNING pool.py:591 -- The 'context' argument is not supported using ray. Please refer to the documentation for how to control ray initialization.
Result for SklearnTrainer_9dec8_00000:
cv:
fit_time:
- 2.221003770828247
- 2.215489387512207
- 2.2075674533843994
- 2.222351312637329
- 2.312389612197876
fit_time_mean: 2.235760307312012
fit_time_std: 0.03866614559685742
score_time:
- 0.022464990615844727
- 0.0230865478515625
- 0.02564835548400879
- 0.029137849807739258
- 0.021221637725830078
score_time_mean: 0.02431187629699707
score_time_std: 0.0028120522003997595
test_score:
- 0.9625
- 0.9125
- 0.9875
- 1.0
- 0.9367088607594937
test_score_mean: 0.9598417721518986
test_score_std: 0.032128186960552516
date: 2022-06-22_17-27-59
done: false
experiment_id: f8215019c10e4a81ba2187c38e875365
fit_time: 2.3157050609588623
hostname: ip-172-31-43-110
iterations_since_restore: 1
node_ip: 172.31.43.110
pid: 1492629
should_checkpoint: true
time_since_restore: 15.684244871139526
time_this_iter_s: 15.684244871139526
time_total_s: 15.684244871139526
timestamp: 1655918879
timesteps_since_restore: 0
training_iteration: 1
trial_id: 9dec8_00000
valid:
score_time: 0.03549623489379883
test_score: 0.9532163742690059
warmup_time: 0.0057866573333740234
Result for SklearnTrainer_9dec8_00000:
cv:
fit_time:
- 2.221003770828247
- 2.215489387512207
- 2.2075674533843994
- 2.222351312637329
- 2.312389612197876
fit_time_mean: 2.235760307312012
fit_time_std: 0.03866614559685742
score_time:
- 0.022464990615844727
- 0.0230865478515625
- 0.02564835548400879
- 0.029137849807739258
- 0.021221637725830078
score_time_mean: 0.02431187629699707
score_time_std: 0.0028120522003997595
test_score:
- 0.9625
- 0.9125
- 0.9875
- 1.0
- 0.9367088607594937
test_score_mean: 0.9598417721518986
test_score_std: 0.032128186960552516
date: 2022-06-22_17-27-59
done: true
experiment_id: f8215019c10e4a81ba2187c38e875365
experiment_tag: '0'
fit_time: 2.3157050609588623
hostname: ip-172-31-43-110
iterations_since_restore: 1
node_ip: 172.31.43.110
pid: 1492629
should_checkpoint: true
time_since_restore: 15.684244871139526
time_this_iter_s: 15.684244871139526
time_total_s: 15.684244871139526
timestamp: 1655918879
timesteps_since_restore: 0
training_iteration: 1
trial_id: 9dec8_00000
valid:
score_time: 0.03549623489379883
test_score: 0.9532163742690059
warmup_time: 0.0057866573333740234
2022-06-22 17:27:59,333 INFO tune.py:734 -- Total run time: 19.09 seconds (18.31 seconds for the tuning loop).
{'valid': {'score_time': 0.03549623489379883, 'test_score': 0.9532163742690059}, 'cv': {'fit_time': array([2.22100377, 2.21548939, 2.20756745, 2.22235131, 2.31238961]), 'score_time': array([0.02246499, 0.02308655, 0.02564836, 0.02913785, 0.02122164]), 'test_score': array([0.9625 , 0.9125 , 0.9875 , 1. , 0.93670886]), 'fit_time_mean': 2.235760307312012, 'fit_time_std': 0.03866614559685742, 'score_time_mean': 0.02431187629699707, 'score_time_std': 0.0028120522003997595, 'test_score_mean': 0.9598417721518986, 'test_score_std': 0.032128186960552516}, 'fit_time': 2.3157050609588623, 'time_this_iter_s': 15.684244871139526, 'should_checkpoint': True, 'done': True, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 1, 'trial_id': '9dec8_00000', 'experiment_id': 'f8215019c10e4a81ba2187c38e875365', 'date': '2022-06-22_17-27-59', 'timestamp': 1655918879, 'time_total_s': 15.684244871139526, 'pid': 1492629, 'hostname': 'ip-172-31-43-110', 'node_ip': '172.31.43.110', 'config': {}, 'time_since_restore': 15.684244871139526, 'timesteps_since_restore': 0, 'iterations_since_restore': 1, 'warmup_time': 0.0057866573333740234, 'experiment_tag': '0'}
And perform inference on the obtained model:
predict_sklearn(result, use_gpu=False)
2022-06-22 17:27:59,658 WARNING read_api.py:260 -- The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Map_Batches: 100%|ββββββββββ| 1/1 [00:00<00:00, 64.73it/s]
Map Progress (1 actors 1 pending): 100%|ββββββββββ| 1/1 [00:01<00:00, 1.60s/it]
Map_Batches: 100%|ββββββββββ| 1/1 [00:00<00:00, 71.41it/s]
PREDICTED LABELS
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}