{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "(mmt-core)=\n", "\n", "# Batch Training with Ray Core" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{tip}\n", "The workload showcased in this notebook can be expressed using different Ray components, such as Ray Data, Ray Tune and Ray Core.\n", "For best practices, see {ref}`ref-use-cases-mmt`.\n", "```\n", "\n", "Batch training and tuning are common tasks in simple machine learning use-cases such as time series forecasting. They require fitting of simple models on multiple data batches corresponding to locations, products, etc. This notebook showcases how to conduct batch training on the [NYC Taxi Dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) using only Ray Core and stateless Ray tasks." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Batch training in the context of this notebook is understood as creating the same model(s) for different and separate datasets or subsets of a dataset. This task is naively parallelizable and can be easily scaled with Ray.\n", "\n", "![Batch training diagram](./images/batch-training.svg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Contents\n", "In this tutorial, we will walk through the following steps:\n", " 1. Reading parquet data,\n", " 2. Using Ray tasks to preprocess, train and evaluate data batches,\n", " 3. Dividing data into batches and spawning a Ray task for each batch to be run in parallel,\n", " 4. Starting batch training,\n", " 5. [Optional] Optimizing for runtime over memory with centralized data loading.\n", "\n", "# Walkthrough\n", "\n", "We want to analyze the relationship between the dropoff location and the trip duration. The relationship will be very different for each pickup location, therefore we need to have a separate model for each of those. Furthermore, the relationship can change with time. Therefore, our task is to create separate models for each pickup location-month combination. The dataset we are using is already partitioned into months (each file being equal to one), and we can use the `pickup_location_id` column in the dataset to group it into data batches. We will then fit models for each batch and choose the best one.\n", "\n", "Let’s start by importing Ray and initializing a local Ray cluster." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from typing import Callable, Optional, List, Union, Tuple, Iterable\n", "import time\n", "import numpy as np\n", "import pandas as pd\n", "\n", "from sklearn.base import BaseEstimator\n", "from sklearn.model_selection import train_test_split\n", "from sklearn.metrics import mean_absolute_error\n", "\n", "import pyarrow as pa\n", "from pyarrow import fs\n", "from pyarrow import dataset as ds\n", "from pyarrow import parquet as pq\n", "import pyarrow.compute as pc" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "

Ray

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", "
Python version:3.8.13
Ray version: 2.5.0
Dashboard:http://console.anyscale-staging.com/api/v2/sessions/ses_ZmHebxHaZpYkw9x9efJ5wBVX/services?redirect_to=dashboard
\n", "
\n", "
\n" ], "text/plain": [ "RayContext(dashboard_url='console.anyscale-staging.com/api/v2/sessions/ses_ZmHebxHaZpYkw9x9efJ5wBVX/services?redirect_to=dashboard', python_version='3.8.13', ray_version='2.5.0', ray_commit='d1aa5608979891e3dd859c07fa919fa01cfead5f', address_info={'node_ip_address': '172.31.106.108', 'raylet_ip_address': '172.31.106.108', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-10-17_11-31-27_282878_162/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-10-17_11-31-27_282878_162/sockets/raylet', 'webui_url': 'console.anyscale-staging.com/api/v2/sessions/ses_ZmHebxHaZpYkw9x9efJ5wBVX/services?redirect_to=dashboard', 'session_dir': '/tmp/ray/session_2022-10-17_11-31-27_282878_162', 'metrics_export_port': 60885, 'gcs_address': '172.31.106.108:9031', 'address': '172.31.106.108:9031', 'dashboard_agent_listen_port': 52365, 'node_id': 'cb2e7fdcf259260c3e594f270b4b24e6ddb741362ac2535c546e56ed'})" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import ray\n", "\n", "ray.init(ignore_reinit_error=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For benchmarking purposes, we can print the times of various operations. In order to reduce clutter in the output, this is set to False by default." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "PRINT_TIMES = False\n", "\n", "\n", "def print_time(msg: str):\n", " if PRINT_TIMES:\n", " print(msg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To speed things up, we'll only use a small subset of the full dataset consisting of two last months of 2019. You can choose to use the full dataset for 2018-2019 by setting the `SMOKE_TEST` variable to False." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "SMOKE_TEST = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reading parquet data\n", "\n", "The `read_data` function reads a Parquet file and uses a push-down predicate to extract the data batch we want to fit a model on using the provided index to group the rows. By having each task read the data and extract batches separately, we ensure that memory utilization is minimal - as opposed to requiring each task to load the entire partition into memory first.\n", "\n", "We are using PyArrow to read the file, as it supports push-down predicates to be applied during file reading. This lets us avoid having to load an entire file into memory, which could cause an OOM error with a large dataset. After the dataset is loaded, we convert it to pandas so that it can be used for training with scikit-learn." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def read_data(file: str, pickup_location_id: int) -> pd.DataFrame:\n", " return pq.read_table(\n", " file,\n", " filters=[(\"pickup_location_id\", \"=\", pickup_location_id)],\n", " columns=[\n", " \"pickup_at\",\n", " \"dropoff_at\",\n", " \"pickup_location_id\",\n", " \"dropoff_location_id\",\n", " ],\n", " ).to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating Ray tasks to preprocess, train and evaluate data batches\n", "\n", "As we will be using the NYC Taxi dataset, we define a simple batch transformation function to set correct data types, calculate the trip duration and fill missing values." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "def transform_batch(df: pd.DataFrame) -> pd.DataFrame:\n", " df[\"pickup_at\"] = pd.to_datetime(\n", " df[\"pickup_at\"], format=\"%Y-%m-%d %H:%M:%S\"\n", " )\n", " df[\"dropoff_at\"] = pd.to_datetime(\n", " df[\"dropoff_at\"], format=\"%Y-%m-%d %H:%M:%S\"\n", " )\n", " df[\"trip_duration\"] = (df[\"dropoff_at\"] - df[\"pickup_at\"]).dt.seconds\n", " df[\"pickup_location_id\"] = df[\"pickup_location_id\"].fillna(-1)\n", " df[\"dropoff_location_id\"] = df[\"dropoff_location_id\"].fillna(-1)\n", " return df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will be fitting scikit-learn models on data batches. We define a Ray task `fit_and_score_sklearn` that fits the model and calculates mean absolute error on the validation set. We will be treating this as a simple regression problem where we want to predict the relationship between the drop-off location and the trip duration." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# Ray task to fit and score a scikit-learn model.\n", "@ray.remote\n", "def fit_and_score_sklearn(\n", " train: pd.DataFrame, test: pd.DataFrame, model: BaseEstimator\n", ") -> Tuple[BaseEstimator, float]:\n", " train_X = train[[\"dropoff_location_id\"]]\n", " train_y = train[\"trip_duration\"]\n", " test_X = test[[\"dropoff_location_id\"]]\n", " test_y = test[\"trip_duration\"]\n", "\n", " # Start training.\n", " model = model.fit(train_X, train_y)\n", " pred_y = model.predict(test_X)\n", " error = mean_absolute_error(test_y, pred_y)\n", " return model, error" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we will define a `train_and_evaluate` Ray task which contains all logic necessary to load a data batch, transform it, split it into train and test and fit and evaluate models on it. We make sure to return the file and location id used so that we can map the fitted models back to them.\n", "\n", "For data loading and processing, we are using the `read_data` and `transform_batch` functions we have defined earlier.\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def train_and_evaluate_internal(\n", " df: pd.DataFrame, models: List[BaseEstimator], pickup_location_id: int = 0\n", ") -> List[Tuple[BaseEstimator, float]]:\n", " # We need at least 4 rows to create a train / test split.\n", " if len(df) < 4:\n", " print(\n", " f\"Dataframe for LocID: {pickup_location_id} is empty or smaller than 4\"\n", " )\n", " return None\n", "\n", " # Train / test split.\n", " train, test = train_test_split(df)\n", "\n", " # We put the train & test dataframes into Ray object store\n", " # so that they can be reused by all models fitted here.\n", " # https://docs.ray.io/en/master/ray-core/patterns/pass-large-arg-by-value.html\n", " train_ref = ray.put(train)\n", " test_ref = ray.put(test)\n", "\n", " # Launch a fit and score task for each model.\n", " results = ray.get(\n", " [\n", " fit_and_score_sklearn.remote(train_ref, test_ref, model)\n", " for model in models\n", " ]\n", " )\n", " results.sort(key=lambda x: x[1]) # sort by error\n", " return results\n", "\n", "\n", "@ray.remote\n", "def train_and_evaluate(\n", " file_name: str,\n", " pickup_location_id: int,\n", " models: List[BaseEstimator],\n", ") -> Tuple[str, str, List[Tuple[BaseEstimator, float]]]:\n", " start_time = time.time()\n", " data = read_data(file_name, pickup_location_id)\n", " data_loading_time = time.time() - start_time\n", " print_time(\n", " f\"Data loading time for LocID: {pickup_location_id}: {data_loading_time}\"\n", " )\n", "\n", " # Perform transformation\n", " start_time = time.time()\n", " data = transform_batch(data)\n", " transform_time = time.time() - start_time\n", " print_time(\n", " f\"Data transform time for LocID: {pickup_location_id}: {transform_time}\"\n", " )\n", "\n", " # Perform training & evaluation for each model\n", " start_time = time.time()\n", " results = (train_and_evaluate_internal(data, models, pickup_location_id),)\n", " training_time = time.time() - start_time\n", " print_time(\n", " f\"Training time for LocID: {pickup_location_id}: {training_time}\"\n", " )\n", "\n", " return (\n", " file_name,\n", " pickup_location_id,\n", " results,\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dividing data into batches and spawning a Ray task for each batch to be ran in parallel\n", "\n", "The `run_batch_training` driver function generates tasks for each Parquet file it recieves (with each file corresponding to one month). We define the function to take in a list of models, so that we can evaluate them all and choose the best one for each batch. The function blocks when it reaches `ray.get()` and waits for tasks to return their results." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "def run_batch_training(files: List[str], models: List[BaseEstimator]):\n", " print(\"Starting run...\")\n", " start = time.time()\n", "\n", " # Store task references\n", " task_refs = []\n", " for file in files:\n", " try:\n", " locdf = pq.read_table(file, columns=[\"pickup_location_id\"])\n", " except Exception:\n", " continue\n", " pickup_location_ids = locdf[\"pickup_location_id\"].unique()\n", "\n", " for pickup_location_id in pickup_location_ids:\n", " # Cast PyArrow scalar to Python if needed.\n", " try:\n", " pickup_location_id = pickup_location_id.as_py()\n", " except Exception:\n", " pass\n", " task_refs.append(\n", " train_and_evaluate.remote(file, pickup_location_id, models)\n", " )\n", "\n", " # Block to obtain results from each task\n", " results = ray.get(task_refs)\n", "\n", " taken = time.time() - start\n", " count = len(results)\n", " # If result is None, then it means there weren't enough records to train\n", " results_not_none = [x for x in results if x is not None]\n", " count_not_none = len(results_not_none)\n", "\n", " # Sleep a moment for nicer output\n", " time.sleep(1)\n", " print(\"\", flush=True)\n", " print(f\"Number of pickup locations: {count}\")\n", " print(\n", " f\"Number of pickup locations with enough records to train: {count_not_none}\"\n", " )\n", " print(f\"Number of models trained: {count_not_none * len(models)}\")\n", " print(f\"TOTAL TIME TAKEN: {taken:.2f} seconds\")\n", " return results" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Starting batch training\n", "\n", "We can now tie everything together! First, we obtain the partitions of the dataset from an S3 bucket so that we can pass them to `run`. The dataset is partitioned by year and month, meaning each file represents one month." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Obtained 2 files!\n" ] } ], "source": [ "# Obtain the dataset. Each month is a separate file.\n", "dataset = ds.dataset(\n", " \"s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/\",\n", " partitioning=[\"year\", \"month\"],\n", ")\n", "starting_idx = -2 if SMOKE_TEST else 0\n", "files = [f\"s3://anonymous@{file}\" for file in dataset.files][starting_idx:]\n", "print(f\"Obtained {len(files)} files!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now run our script. The output is a list of tuples in the following format: `(file name, partition id, list of models and their MAE scores)`. For brevity, we will print out the first 10 tuples." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting run...\n", "(train_and_evaluate pid=3658) Dataframe for LocID: 214 is empty or smaller than 4\n", "(train_and_evaluate pid=2027) Dataframe for LocID: 204 is empty or smaller than 4\n", "(train_and_evaluate pid=3658) Dataframe for LocID: 176 is empty or smaller than 4\n", "\n", "Number of pickup locations: 522\n", "Number of pickup locations with enough records to train: 522\n", "Number of models trained: 522\n", "TOTAL TIME TAKEN: 139.27 seconds\n", "[('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 145, ([(LinearRegression(), 811.1991448011532)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 161, ([(LinearRegression(), 753.8173175448575)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 163, ([(LinearRegression(), 735.7208096221053)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 193, ([(LinearRegression(), 915.8790566477112)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 260, ([(LinearRegression(), 626.6908388606766)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 56, ([(LinearRegression(), 902.6575414213821)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 79, ([(LinearRegression(), 710.7781383724797)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 90, ([(LinearRegression(), 667.0555322496516)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 162, ([(LinearRegression(), 700.0288733783458)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 50, ([(LinearRegression(), 697.2487742278146)],))]\n" ] } ], "source": [ "from sklearn.linear_model import LinearRegression\n", "\n", "results = run_batch_training(files, models=[LinearRegression()])\n", "print(results[:10])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using the output we've gotten, we can now tie each model to the given file (month)-pickup location combination and see their predictive power, as measured by the error. At this stage, we can carry on with further analysis if necessary or use them for inference.\n", "\n", "We can also provide multiple scikit-learn models to our `run` function and the best one will be chosen for each batch. A common use-case here would be to define several models of the same type with different hyperparameters." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting run...\n", "(train_and_evaluate pid=21437) Dataframe for LocID: 214 is empty or smaller than 4\n", "(train_and_evaluate pid=21888) Dataframe for LocID: 204 is empty or smaller than 4\n", "(train_and_evaluate pid=22358) Dataframe for LocID: 176 is empty or smaller than 4\n", "\n", "Number of pickup locations: 522\n", "Number of pickup locations with enough records to train: 522\n", "Number of models trained: 1566\n", "TOTAL TIME TAKEN: 247.80 seconds\n", "[('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 145, ([(DecisionTreeRegressor(splitter='random'), 586.3557158021763), (DecisionTreeRegressor(), 587.4490404009856), (LinearRegression(), 867.6406607489587)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 161, ([(DecisionTreeRegressor(), 598.902261739656), (DecisionTreeRegressor(splitter='random'), 598.9147196919863), (LinearRegression(), 760.6576436185691)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 163, ([(DecisionTreeRegressor(splitter='random'), 573.8896116905775), (DecisionTreeRegressor(), 573.8983618518819), (LinearRegression(), 738.3486584028989)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 193, ([(DecisionTreeRegressor(splitter='random'), 743.5483210338866), (DecisionTreeRegressor(), 744.3629120390056), (LinearRegression(), 953.6672220167137)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 260, ([(DecisionTreeRegressor(splitter='random'), 498.29219023609505), (DecisionTreeRegressor(), 501.13978495420673), (LinearRegression(), 665.543426962402)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 56, ([(LinearRegression(), 1516.8825665745849), (DecisionTreeRegressor(), 1572.7744375553175), (DecisionTreeRegressor(splitter='random'), 1572.7744375553175)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 79, ([(DecisionTreeRegressor(), 567.3130440323552), (DecisionTreeRegressor(splitter='random'), 567.3722846337118), (LinearRegression(), 701.2370802810619)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 90, ([(DecisionTreeRegressor(splitter='random'), 513.5831366488217), (DecisionTreeRegressor(), 513.6235175626782), (LinearRegression(), 666.2786163862434)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 162, ([(DecisionTreeRegressor(splitter='random'), 557.7537740834588), (DecisionTreeRegressor(), 557.7568050908675), (LinearRegression(), 701.2237669363365)],)), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 50, ([(DecisionTreeRegressor(), 563.7371119126768), (DecisionTreeRegressor(splitter='random'), 563.8079887794675), (LinearRegression(), 714.1553440667034)],))]\n" ] } ], "source": [ "from sklearn.tree import DecisionTreeRegressor\n", "\n", "results = run_batch_training(\n", " files,\n", " models=[\n", " LinearRegression(),\n", " DecisionTreeRegressor(),\n", " DecisionTreeRegressor(splitter=\"random\"),\n", " ],\n", ")\n", "print(results[:10])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## [Optional] Optimizing for runtime over memory with centralized data loading\n", "\n", "In order to ensure that the data can always fit in memory, each task reads the files independently and extracts the desired data batch. This, however, negatively impacts the runtime. If we have sufficient memory in our Ray cluster, we can instead load each partition once, extract the batches, and save them in the [Ray object store](objects-in-ray), reducing time required dramatically at a cost of higher memory usage. In other words, we perform centralized data loading using Ray object store as opposed to distributed data loading.\n", "\n", "Notice we do not call `ray.get()` on the references of the `read_into_object_store`. Instead, we pass the reference itself as the argument to the `train_and_evaluate.remote` dispatch, [allowing for the data to stay in the object store until it is actually needed](unnecessary-ray-get). This avoids a situation where all the data would be loaded into the memory of the process calling `ray.get()`.\n", "\n", "You can use the Ray Dashboard to compare the memory usage between the previous approach and this one." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "# Redefine the train_and_evaluate task to use in-memory data.\n", "# We still keep file_name and pickup_location_id for identification purposes.\n", "@ray.remote\n", "def train_and_evaluate(\n", " pickup_location_id_and_data: Tuple[int, pd.DataFrame],\n", " file_name: str,\n", " models: List[BaseEstimator],\n", ") -> Tuple[str, str, List[Tuple[BaseEstimator, float]]]:\n", " pickup_location_id, data = pickup_location_id_and_data\n", " # Perform transformation\n", " start_time = time.time()\n", " # The underlying numpy arrays are stored in the Ray object\n", " # store for efficient access, making them immutable. We therefore\n", " # copy the DataFrame to obtain a mutable copy we can transform.\n", " data = data.copy()\n", " data = transform_batch(data)\n", " transform_time = time.time() - start_time\n", " print_time(\n", " f\"Data transform time for LocID: {pickup_location_id}: {transform_time}\"\n", " )\n", "\n", " return (\n", " file_name,\n", " pickup_location_id,\n", " train_and_evaluate_internal(data, models, pickup_location_id),\n", " )\n", "\n", "\n", "# This allows us to create a Ray Task that is also a generator, returning object references.\n", "@ray.remote(num_returns=\"dynamic\")\n", "def read_into_object_store(file: str) -> ray.ObjectRefGenerator:\n", " print(f\"Loading {file}\")\n", " # Read the entire file into memory.\n", " try:\n", " locdf = pq.read_table(\n", " file,\n", " columns=[\n", " \"pickup_at\",\n", " \"dropoff_at\",\n", " \"pickup_location_id\",\n", " \"dropoff_location_id\",\n", " ],\n", " )\n", " except Exception:\n", " return []\n", "\n", " pickup_location_ids = locdf[\"pickup_location_id\"].unique()\n", "\n", " for pickup_location_id in pickup_location_ids:\n", " # Each id-data batch tuple will be put as a separate object into the Ray object store.\n", "\n", " # Cast PyArrow scalar to Python if needed.\n", " try:\n", " pickup_location_id = pickup_location_id.as_py()\n", " except Exception:\n", " pass\n", "\n", " yield (\n", " pickup_location_id,\n", " locdf.filter(\n", " pc.equal(locdf[\"pickup_location_id\"], pickup_location_id)\n", " ).to_pandas(),\n", " )\n", "\n", "\n", "def run_batch_training_with_object_store(\n", " files: List[str], models: List[BaseEstimator]\n", "):\n", " print(\"Starting run...\")\n", " start = time.time()\n", "\n", " # Store task references\n", " task_refs = []\n", "\n", " # Use a SPREAD scheduling strategy to load each\n", " # file on a separate node as an OOM safeguard.\n", " # This is not foolproof though! We can also specify a resource\n", " # requirement for memory, if we know what is the maximum\n", " # memory requirement for a single file.\n", " read_into_object_store_spread = read_into_object_store.options(\n", " scheduling_strategy=\"SPREAD\"\n", " )\n", "\n", " # Dictionary of references to read tasks with file names as keys\n", " read_tasks_by_file = {\n", " files[file_id]: read_into_object_store_spread.remote(file)\n", " for file_id, file in enumerate(files)\n", " }\n", "\n", " for file, read_task_ref in read_tasks_by_file.items():\n", " # We iterate over references and pass them to the tasks directly\n", " for pickup_location_id_and_data_batch_ref in iter(ray.get(read_task_ref)):\n", " task_refs.append(\n", " train_and_evaluate.remote(\n", " pickup_location_id_and_data_batch_ref, file, models\n", " )\n", " )\n", "\n", " # Block to obtain results from each task\n", " results = ray.get(task_refs)\n", "\n", " taken = time.time() - start\n", " count = len(results)\n", " # If result is None, then it means there weren't enough records to train\n", " results_not_none = [x for x in results if x is not None]\n", " count_not_none = len(results_not_none)\n", "\n", " # Sleep a moment for nicer output\n", " time.sleep(1)\n", " print(\"\", flush=True)\n", " print(f\"Number of pickup locations: {count}\")\n", " print(\n", " f\"Number of pickup locations with enough records to train: {count_not_none}\"\n", " )\n", " print(f\"Number of models trained: {count_not_none * len(models)}\")\n", " print(f\"TOTAL TIME TAKEN: {taken:.2f} seconds\")\n", " return results" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting run...\n", "(read_into_object_store pid=22584) Loading s3://air-example-data/ursa-labs-taxi-data/by_year/2019/06/data.parquet/ab5b9d2b8cc94be19346e260b543ec35_000000.parquet\n", "(read_into_object_store pid=22586) Loading s3://air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet\n", "(train_and_evaluate pid=22584) Dataframe for LocID: 214 is empty or smaller than 4\n", "(train_and_evaluate pid=23204) Dataframe for LocID: 204 is empty or smaller than 4\n", "(train_and_evaluate pid=23204) Dataframe for LocID: 176 is empty or smaller than 4\n", "\n", "Number of pickup locations: 522\n", "Number of pickup locations with enough records to train: 522\n", "Number of models trained: 522\n", "TOTAL TIME TAKEN: 19.47 seconds\n", "[('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 145, [(LinearRegression(), 851.6540137470965)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 161, [(LinearRegression(), 759.3457730674915)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 163, [(LinearRegression(), 743.6905538807495)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 193, [(LinearRegression(), 857.6903787276541)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 260, [(LinearRegression(), 646.4703728065817)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 56, [(LinearRegression(), 1372.6945225983686)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 79, [(LinearRegression(), 701.0097453726145)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 90, [(LinearRegression(), 650.179758287182)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 162, [(LinearRegression(), 706.316835556958)]), ('s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet', 50, [(LinearRegression(), 694.0467262859878)])]\n" ] } ], "source": [ "results = run_batch_training_with_object_store(\n", " files, models=[LinearRegression()]\n", ")\n", "print(results[:10])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that this approach allowed us to finish training much faster, but it would not have been possible if the dataset was too large to fit into our cluster memory. Therefore, this pattern is only recommended if the data you are working with is small. Otherwise, it is recommended to load the data inside the tasks right before its used. As always, your mileage may vary - we recommend you try both approaches for your workload and see what works best for you!" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.8.10 ('venv': venv)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f" } } }, "nbformat": 4, "nbformat_minor": 2 }