{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "0d385409", "metadata": {}, "source": [ "(lightgbm-example-ref)=\n", "\n", "# Training a model with distributed LightGBM\n", "\n", "\n", " \"try-anyscale-quickstart\"\n", "\n", "

\n", "\n", "In this example we will train a model in Ray Train using distributed LightGBM." ] }, { "attachments": {}, "cell_type": "markdown", "id": "07d92cee", "metadata": {}, "source": [ "Let's start with installing our dependencies:" ] }, { "cell_type": "code", "execution_count": 1, "id": "86131abe", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip available: \u001b[0m\u001b[31;49m22.3.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m23.1.2\u001b[0m\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n" ] } ], "source": [ "!pip install -qU \"ray[data,train]\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "135fc884", "metadata": {}, "source": [ "Then we need some imports:" ] }, { "cell_type": "code", "execution_count": 2, "id": "102ef1ac", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", " from .autonotebook import tqdm as notebook_tqdm\n", "2023-07-07 14:34:14,951\tINFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n", "2023-07-07 14:34:15,892\tINFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.\n" ] } ], "source": [ "from typing import Tuple\n", "\n", "import ray\n", "from ray.data import Dataset, Preprocessor\n", "from ray.data.preprocessors import Categorizer, StandardScaler\n", "from ray.train.lightgbm import LightGBMTrainer\n", "from ray.train import Result, ScalingConfig" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c7d102bd", "metadata": {}, "source": [ "Next we define a function to load our train, validation, and test datasets." ] }, { "cell_type": "code", "execution_count": 3, "id": "f1f35cd7", "metadata": {}, "outputs": [], "source": [ "def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:\n", " dataset = ray.data.read_csv(\"s3://anonymous@air-example-data/breast_cancer_with_categorical.csv\")\n", " train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)\n", " test_dataset = valid_dataset.drop_columns(cols=[\"target\"])\n", " return train_dataset, valid_dataset, test_dataset" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8f7afbce", "metadata": {}, "source": [ "The following function will create a LightGBM trainer, train it, and return the result." ] }, { "cell_type": "code", "execution_count": 4, "id": "fefcbc8a", "metadata": {}, "outputs": [], "source": [ "def train_lightgbm(num_workers: int, use_gpu: bool = False) -> Result:\n", " train_dataset, valid_dataset, _ = prepare_data()\n", "\n", " # Scale some random columns, and categorify the categorical_column,\n", " # allowing LightGBM to use its built-in categorical feature support\n", " scaler = StandardScaler(columns=[\"mean radius\", \"mean texture\"])\n", " categorizer = Categorizer([\"categorical_column\"])\n", "\n", " train_dataset = categorizer.fit_transform(scaler.fit_transform(train_dataset))\n", " valid_dataset = categorizer.transform(scaler.transform(valid_dataset))\n", "\n", " # LightGBM specific params\n", " params = {\n", " \"objective\": \"binary\",\n", " \"metric\": [\"binary_logloss\", \"binary_error\"],\n", " }\n", "\n", " trainer = LightGBMTrainer(\n", " scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),\n", " label_column=\"target\",\n", " params=params,\n", " datasets={\"train\": train_dataset, \"valid\": valid_dataset},\n", " num_boost_round=100,\n", " metadata = {\"scaler_pkl\": scaler.serialize(), \"categorizer_pkl\": categorizer.serialize()}\n", " )\n", " result = trainer.fit()\n", " print(result.metrics)\n", "\n", " return result" ] }, { "attachments": {}, "cell_type": "markdown", "id": "04d278ae", "metadata": {}, "source": [ "Once we have the result, we can do batch inference on the obtained model. Let's define a utility function for this." ] }, { "cell_type": "code", "execution_count": 5, "id": "3f1d0c19", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from ray.train import Checkpoint\n", "\n", "\n", "class Predict:\n", "\n", " def __init__(self, checkpoint: Checkpoint):\n", " self.model = LightGBMTrainer.get_model(checkpoint)\n", " self.scaler = Preprocessor.deserialize(checkpoint.get_metadata()[\"scaler_pkl\"])\n", " self.categorizer = Preprocessor.deserialize(checkpoint.get_metadata()[\"categorizer_pkl\"])\n", "\n", " def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:\n", " preprocessed_batch = self.categorizer.transform_batch(self.scaler.transform_batch(batch))\n", " return {\"predictions\": self.model.predict(preprocessed_batch)}\n", "\n", "\n", "def predict_lightgbm(result: Result):\n", " _, _, test_dataset = prepare_data()\n", "\n", " scores = test_dataset.map_batches(\n", " Predict, \n", " fn_constructor_args=[result.checkpoint], \n", " concurrency=1, \n", " batch_format=\"pandas\"\n", " )\n", " \n", " predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format=\"pandas\")\n", " print(f\"PREDICTED LABELS\")\n", " predicted_labels.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2bb0e5df", "metadata": {}, "source": [ "Now we can run the training:" ] }, { "cell_type": "code", "execution_count": 6, "id": "8244ff3c", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Tune Status

\n", " \n", "\n", "\n", "\n", "\n", "\n", "
Current time:2023-07-07 14:34:34
Running for: 00:00:06.06
Memory: 12.2/64.0 GiB
\n", "
\n", "
\n", "
\n", "

System Info

\n", " Using FIFO scheduling algorithm.
Logical resource usage: 4.0/10 CPUs, 0/0 GPUs\n", "
\n", " \n", "
\n", "
\n", "
\n", "

Trial Status

\n", " \n", "\n", "\n", "\n", "\n", "\n", "\n", "
Trial name status loc iter total time (s) train-binary_logloss train-binary_error valid-binary_logloss
LightGBMTrainer_0c5ae_00000TERMINATED127.0.0.1:10027 101 4.5829 0.000202293 0 0.130232
\n", "
\n", "
\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(get_pd_value_counts)]\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)] -> AllToAllOperator[Aggregate]\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", " \n", "\u001b[A\n", "\u001b[A\n", "\n", "\u001b[A\u001b[A\n", "\n", "(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:00 TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)->MapBatches(StandardScaler._transform_pandas)]\n", "\n", "\u001b[A\n", "\n", "(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 7.59it/s]\n", "\u001b[A \n", "\n", "\u001b[A\u001b[A \n", "\n", "\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", "\n", "\u001b[A\n", "\n", "(pid=10027) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 7%|▋ | 1/14 [00:00<00:01, 6.59it/s]\n", "\u001b[A \n", "\n", "\u001b[A\u001b[A \n", "\n", "\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", "\n", "\u001b[A\n", "\n", " \n", "\u001b[A\n", "\n", "\u001b[A\u001b[A\n", "\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(Categorizer._transform_pandas)->MapBatches(StandardScaler._transform_pandas)]\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", "\u001b[2m\u001b[36m(LightGBMTrainer pid=10027)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Trying to bind port 51134...\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Binding port 51134 succeeded\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Listening...\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m [LightGBM] [Warning] Connecting to rank 1 failed, waiting for 200 milliseconds\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Connected to rank 0\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Info] Local rank: 1, total number of machines: 2\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10063)\u001b[0m [LightGBM] [Warning] num_threads is set=2, n_jobs=-1 will be ignored. Current value: num_threads=2\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m /Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/lightgbm/basic.py:1780: UserWarning: Overriding the parameters from Reference Dataset.\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m _log_warning('Overriding the parameters from Reference Dataset.')\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m /Users/balaji/Documents/GitHub/ray/.venv/lib/python3.11/site-packages/lightgbm/basic.py:1513: UserWarning: categorical_column in param dict is overridden.\n", "\u001b[2m\u001b[36m(_RemoteRayLightGBMActor pid=10062)\u001b[0m _log_warning(f'{cat_alias} in param dict is overridden.')\n", "2023-07-07 14:34:34,087\tINFO tune.py:1148 -- Total run time: 7.18 seconds (6.05 seconds for the tuning loop).\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'train-binary_logloss': 0.00020229312743896637, 'train-binary_error': 0.0, 'valid-binary_logloss': 0.13023245107091222, 'valid-binary_error': 0.023529411764705882, 'time_this_iter_s': 0.021785974502563477, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '0c5ae_00000', 'date': '2023-07-07_14-34-34', 'timestamp': 1688765674, 'time_total_s': 4.582904100418091, 'pid': 10027, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.582904100418091, 'iterations_since_restore': 101, 'experiment_tag': '0'}\n" ] } ], "source": [ "result = train_lightgbm(num_workers=2, use_gpu=False)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d7155d9b", "metadata": {}, "source": [ "And perform inference on the obtained model:" ] }, { "cell_type": "code", "execution_count": 7, "id": "871c9be6", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-07-07 14:34:36,769\tINFO read_api.py:374 -- To satisfy the requested parallelism of 20, each read task output will be split into 20 smaller blocks.\n", "2023-07-07 14:34:38,655\tWARNING plan.py:567 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune\n", "2023-07-07 14:34:38,668\tINFO dataset.py:2180 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n", "2023-07-07 14:34:38,674\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches()->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches()]\n", "2023-07-07 14:34:38,674\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", "2023-07-07 14:34:38,676\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", "2023-07-07 14:34:38,701\tINFO actor_pool_map_operator.py:117 -- MapBatches()->MapBatches(Predict): Waiting for 1 pool actors to start...\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "PREDICTED LABELS\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r" ] } ], "source": [ "predict_lightgbm(result)" ] } ], "metadata": { "jupytext": { "cell_metadata_filter": "-all", "main_language": "python", "notebook_metadata_filter": "-all" }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.2" }, "orphan": true, "vscode": { "interpreter": { "hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f" } } }, "nbformat": 4, "nbformat_minor": 5 }