{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "0d385409",
"metadata": {},
"source": [
"(lightgbm-example-ref)=\n",
"\n",
"# Training a model with distributed LightGBM\n",
"\n",
"\n",
"
\n",
"\n",
"
\n",
"\n",
"In this example we will train a model in Ray Train using distributed LightGBM."
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "07d92cee",
"metadata": {},
"source": [
"Let's start with installing our dependencies:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "86131abe",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip available: \u001b[0m\u001b[31;49m22.3.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m23.1.2\u001b[0m\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
]
}
],
"source": [
"!pip install -qU \"ray[data,train]\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "135fc884",
"metadata": {},
"source": [
"Then we need some imports:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "102ef1ac",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
" from .autonotebook import tqdm as notebook_tqdm\n",
"2023-07-07 14:34:14,951\tINFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n",
"2023-07-07 14:34:15,892\tINFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n"
]
}
],
"source": [
"from typing import Tuple\n",
"\n",
"import ray\n",
"from ray.data import Dataset, Preprocessor\n",
"from ray.data.preprocessors import Categorizer, StandardScaler\n",
"from ray.train.lightgbm import LightGBMTrainer\n",
"from ray.train import Result, ScalingConfig"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c7d102bd",
"metadata": {},
"source": [
"Next we define a function to load our train, validation, and test datasets."
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "f1f35cd7",
"metadata": {},
"outputs": [],
"source": [
"def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:\n",
" dataset = ray.data.read_csv(\"s3://anonymous@air-example-data/breast_cancer_with_categorical.csv\")\n",
" train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)\n",
" test_dataset = valid_dataset.drop_columns(cols=[\"target\"])\n",
" return train_dataset, valid_dataset, test_dataset"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "8f7afbce",
"metadata": {},
"source": [
"The following function will create a LightGBM trainer, train it, and return the result."
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "fefcbc8a",
"metadata": {},
"outputs": [],
"source": [
"def train_lightgbm(num_workers: int, use_gpu: bool = False) -> Result:\n",
" train_dataset, valid_dataset, _ = prepare_data()\n",
"\n",
" # Scale some random columns, and categorify the categorical_column,\n",
" # allowing LightGBM to use its built-in categorical feature support\n",
" scaler = StandardScaler(columns=[\"mean radius\", \"mean texture\"])\n",
" categorizer = Categorizer([\"categorical_column\"])\n",
"\n",
" train_dataset = categorizer.fit_transform(scaler.fit_transform(train_dataset))\n",
" valid_dataset = categorizer.transform(scaler.transform(valid_dataset))\n",
"\n",
" # LightGBM specific params\n",
" params = {\n",
" \"objective\": \"binary\",\n",
" \"metric\": [\"binary_logloss\", \"binary_error\"],\n",
" }\n",
"\n",
" trainer = LightGBMTrainer(\n",
" scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),\n",
" label_column=\"target\",\n",
" params=params,\n",
" datasets={\"train\": train_dataset, \"valid\": valid_dataset},\n",
" num_boost_round=100,\n",
" metadata = {\"scaler_pkl\": scaler.serialize(), \"categorizer_pkl\": categorizer.serialize()}\n",
" )\n",
" result = trainer.fit()\n",
" print(result.metrics)\n",
"\n",
" return result"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "04d278ae",
"metadata": {},
"source": [
"Once we have the result, we can do batch inference on the obtained model. Let's define a utility function for this."
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "3f1d0c19",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from ray.train import Checkpoint\n",
"\n",
"\n",
"class Predict:\n",
"\n",
" def __init__(self, checkpoint: Checkpoint):\n",
" self.model = LightGBMTrainer.get_model(checkpoint)\n",
" self.scaler = Preprocessor.deserialize(checkpoint.get_metadata()[\"scaler_pkl\"])\n",
" self.categorizer = Preprocessor.deserialize(checkpoint.get_metadata()[\"categorizer_pkl\"])\n",
"\n",
" def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:\n",
" preprocessed_batch = self.categorizer.transform_batch(self.scaler.transform_batch(batch))\n",
" return {\"predictions\": self.model.predict(preprocessed_batch)}\n",
"\n",
"\n",
"def predict_lightgbm(result: Result):\n",
" _, _, test_dataset = prepare_data()\n",
"\n",
" scores = test_dataset.map_batches(\n",
" Predict, \n",
" fn_constructor_args=[result.checkpoint], \n",
" concurrency=1, \n",
" batch_format=\"pandas\"\n",
" )\n",
" \n",
" predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format=\"pandas\")\n",
" print(f\"PREDICTED LABELS\")\n",
" predicted_labels.show()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "2bb0e5df",
"metadata": {},
"source": [
"Now we can run the training:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "8244ff3c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"
\n",
"
\n",
"
Tune Status
\n",
"
\n",
"\n",
"Current time: | 2023-07-07 14:34:34 |
\n",
"Running for: | 00:00:06.06 |
\n",
"Memory: | 12.2/64.0 GiB |
\n",
"\n",
"
\n",
"
\n",
"
\n",
"
\n",
"
System Info
\n",
" Using FIFO scheduling algorithm.
Logical resource usage: 4.0/10 CPUs, 0/0 GPUs\n",
" \n",
" \n",
"
\n",
"
\n",
"
\n",
"
Trial Status
\n",
"
\n",
"\n",
"Trial name | status | loc | iter | total time (s) | train-binary_logloss | train-binary_error | valid-binary_logloss |
\n",
"\n",
"\n",
"LightGBMTrainer_0c5ae_00000 | TERMINATED | 127.0.0.1:10027 | 101 | 4.5829 | 0.000202293 | 0 | 0.130232 |
\n",
"\n",
"
\n",
"
\n",
"
\n",
"\n"
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(get_pd_value_counts)]\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)] -> AllToAllOperator[Aggregate]\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
" \n",
"\u001b[A\n",
"\u001b[A\n",
"\n",
"\u001b[A\u001b[A\n",
"\n",
"(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:00, ?it/s] \n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 9.53it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)->MapBatches(StandardScaler._transform_pandas)]\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 7.59it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 6.59it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
"\n",
"\u001b[A\n",
"\n",
" \n",
"\u001b[A\n",
"\n",
"\u001b[A\u001b[A\n",
"\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)->MapBatches(StandardScaler._transform_pandas)]\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Trying to bind port 51134...\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Binding port 51134 succeeded\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Listening...\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m [LightGBM] [Warning] Connecting to rank 1 failed, waiting for 200 milliseconds\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Connected to rank 0\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Local rank: 1, total number of machines: 2\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Warning] num_threads is set=2, n_jobs=-1 will be ignored. Current value: num_threads=2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m /Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/lightgbm/basic.py:1780: UserWarning: Overriding the parameters from Reference Dataset.\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m _log_warning('Overriding the parameters from Reference Dataset.')\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m /Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/lightgbm/basic.py:1513: UserWarning: categorical_column in param dict is overridden.\n",
"\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m _log_warning(f'{cat_alias} in param dict is overridden.')\n",
"2023-07-07 14:34:34,087\tINFO tune.py:1148 -- Total run time: 7.18 seconds (6.05 seconds for the tuning loop).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'train-binary_logloss': 0.00020229312743896637, 'train-binary_error': 0.0, 'valid-binary_logloss': 0.13023245107091222, 'valid-binary_error': 0.023529411764705882, 'time_this_iter_s': 0.021785974502563477, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '0c5ae_00000', 'date': '2023-07-07_14-34-34', 'timestamp': 1688765674, 'time_total_s': 4.582904100418091, 'pid': 10027, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.582904100418091, 'iterations_since_restore': 101, 'experiment_tag': '0'}\n"
]
}
],
"source": [
"result = train_lightgbm(num_workers=2, use_gpu=False)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d7155d9b",
"metadata": {},
"source": [
"And perform inference on the obtained model:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "871c9be6",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-07-07 14:34:36,769\tINFO read_api.py:374 -- To satisfy the requested parallelism of 20, each read task output will be split into 20 smaller blocks.\n",
"2023-07-07 14:34:38,655\tWARNING plan.py:567 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune\n",
"2023-07-07 14:34:38,668\tINFO dataset.py:2180 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n",
"2023-07-07 14:34:38,674\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches()->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches()]\n",
"2023-07-07 14:34:38,674\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"2023-07-07 14:34:38,676\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
"2023-07-07 14:34:38,701\tINFO actor_pool_map_operator.py:117 -- MapBatches()->MapBatches(Predict): Waiting for 1 pool actors to start...\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"PREDICTED LABELS\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" "
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r"
]
}
],
"source": [
"predict_lightgbm(result)"
]
}
],
"metadata": {
"jupytext": {
"cell_metadata_filter": "-all",
"main_language": "python",
"notebook_metadata_filter": "-all"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
},
"orphan": true,
"vscode": {
"interpreter": {
"hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}