Batch Training with Ray Datasets#

Batch training and tuning are common tasks in simple machine learning use-cases such as time series forecasting. They require fitting of simple models on data batches corresponding to different locations, products, etc. Batch training can take less time to process all the data at once, but only if those batches can run in parallel!

This notebook showcases how to conduct batch training regression algorithms from XGBoost and Scikit-learn with Ray Datasets. XGBoost is a popular open-source library used for regression and classification. Scikit-learn is a popular open-source library with a vast assortment of well-known ML algorithms.


The workload showcased in this notebook can be expressed using different Ray components, such as Ray Data, Ray Tune and Ray Core. For more information, including best practices, see Many Model Training.

Batch training diagram

For the data, we will use the NYC Taxi dataset. This popular tabular dataset contains historical taxi pickups by timestamp and location in NYC.

For the training, we will train separate regression models to predict trip_duration, with a different model for each dropoff location in NYC. Specifically, we will conduct an experiment for each dropoff_location_id, to find the best either XGBoost or Scikit-learn model, per location.


In this this tutorial, you will learn about:

  1. Creating a Ray Dataset

  2. Filtering a Ray Dataset on Read

  3. Inspecting a Ray Dataset

  4. Transforming a Ray Dataset in parallel

  5. Batch training with Ray Datasets in parallel

  6. Load a saved model and perform batch prediction


Let us start by importing a few required libraries, including open-source Ray itself!

import os
num_cpu = os.cpu_count()

print(f"Number of CPUs in this system: {num_cpu}")
from typing import Tuple, List, Union, Optional, Callable
import time
import pandas as pd
import numpy as np

print(f"numpy: {np.__version__}")
import pyarrow
import pyarrow.parquet as pq
import pyarrow.dataset as pds

print(f"pyarrow: {pyarrow.__version__}")
from import Dataset
Number of CPUs in this system: 8
numpy: 1.23.3
pyarrow: 6.0.1
import ray

if ray.is_initialized():
2022-12-08 17:04:06,689	INFO -- Using address localhost:9031 set in the environment variable RAY_ADDRESS
2022-12-08 17:04:06,691	INFO -- Connecting to existing Ray cluster at address:
2022-12-08 17:04:06,700	INFO -- Connected to Ray cluster. View the dashboard at 
{'CPU': 8.0, 'object_store_memory': 9093674188.0, 'memory': 18187348379.0, 'node:': 1.0}
# For benchmarking purposes, we can print the times of various operations.
# In order to reduce clutter in the output, this is set to False by default.

def print_time(msg: str):

# To speed things up, we’ll only use a small subset of the full dataset consisting of two last months of 2019.
# You can choose to use the full dataset for 2018-2019 by setting the SMOKE_TEST variable to False.

Creating Ray Dataset #


Ray Datasets uses PyArrow dataset and table for reading or writing large parquet files. Its native multithreaded C++ adpater is faster than pandas read_parquet, even using engine='pyarrow'. For more details see Ray Datasets User Guide.

Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. We will use the Ray Dataset APIs to read the data and quickly inspect it.

First, we will define some global variables we will use throughout the notebook, such as the list of S3 links to the files making up the dataset and the possible location IDs.

# Define some global variables.
TARGET = "trip_duration"
s3_partitions = pds.dataset(
    "s3://[email protected]/ursa-labs-taxi-data/by_year/",
    partitioning=["year", "month"],
s3_files = [f"s3://[email protected]{file}" for file in s3_partitions.files]

# Obtain all location IDs
location_ids = (
    pq.read_table(s3_files[0], columns=["dropoff_location_id"])["dropoff_location_id"]

# Use smoke testing or not.
starting_idx = -1 if SMOKE_TEST else 0
# drop location 199 to test error-handling before final git checkin
sample_locations = [141, 229, 173] if SMOKE_TEST else location_ids

# Display what data will be used.
s3_files = s3_files[starting_idx:]
print(f"NYC Taxi using {len(s3_files)} file(s)!")
print(f"s3_files: {s3_files}")
print(f"Locations: {sample_locations}")
NYC Taxi using 1 file(s)!
s3_files: ['s3://[email protected]/ursa-labs-taxi-data/by_year/2019/06/data.parquet/ab5b9d2b8cc94be19346e260b543ec35_000000.parquet']
Locations: [141, 229, 173]

The easiest way to create a ray dataset is to use to read parquet files in parallel onto the Ray cluster.

Uncomment the cell below if you want to try it out.

# # This cell is commented out because it can take a long time!
# # In the next section "Filtering Read" we make it faster.

# # Read everything in the files list into a ray dataset.
# start = time.time()
# ds =
# print(f"Data loading time: {data_loading_time:.2f} seconds")
# ds

Filtering a Ray Dataset on Read #

Normally there is some last-mile data processing required before training. Let’s just assume we know the data processing steps are:

  • Drop negative trip distances, 0 fares, 0 passengers.

  • Drop 2 unknown zones: ['264', '265'].

  • Calculate trip duration and add it as a new column.

  • Drop trip durations smaller than 1 minute and greater than 24 hours.

Instead of blindly reading all the data, it would be better if we only read the data we needed. This is similar concept to SQL SELECT only rows, columns you need vs SELECT *.


Best practice is to filter as much as you can directly in the Ray Dataset read_parquet().

Note that Ray Datasets’ Parquet reader supports projection (column selection) and row filter pushdown, where we can push the above column selection and the row-based filter to the Parquet read. If we specify column selection at Parquet read time, the unselected columns won’t even be read from disk. This can save a lot of memory, especially with big datasets, and allow us to avoid OOM issues.

The row-based filter is specified via Arrow’s dataset field expressions.

def pushdown_read_data(files_list: list, sample_ids: list) -> Dataset:
    start = time.time()

    filter_expr = (
        (pds.field("passenger_count") > 0)
        & (pds.field("trip_distance") > 0)
        & (pds.field("fare_amount") > 0)
        & (~pds.field("pickup_location_id").isin([264, 265]))
        & (~pds.field("dropoff_location_id").isin([264, 265]))
        & (pds.field("dropoff_location_id").isin(sample_ids))

    dataset =

    data_loading_time = time.time() - start
    print_time(f"Data loading time: {data_loading_time:.2f} seconds")

    return dataset
# Test the pushdown_read_data function
ds_raw = pushdown_read_data(s3_files, sample_locations)
2022-12-08 17:04:09,202	WARNING -- ⚠️  The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.

Inspecting a Ray Dataset #

Let’s get some basic statistics about our newly created Ray Dataset.

As our Ray Dataset is backed by Parquet, we can obtain the number of rows from the metadata without triggering a full data read.

print(f"Number of rows: {ds_raw.count()}")
Number of rows: 6941024

Similarly, we can obtain the Dataset size (in bytes) from the metadata.

print(f"Size bytes (from parquet metadata): {ds_raw.size_bytes()}")
Size bytes (from parquet metadata): 925892280

Let’s fetch and inspect the schema of the underlying Parquet files.

print("\nSchema data types:")
data_types = list(zip(ds_raw.schema().names, ds_raw.schema().types))
for s in data_types:
    print(f"{s[0]}: {s[1]}")
Schema data types:
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
pickup_location_id: int32
dropoff_location_id: int32
passenger_count: int8
trip_distance: float
fare_amount: float

Transforming a Ray Dataset in parallel using custom functions #

Ray Datasets allows you to specify custom data transform functions. These user defined functions (UDFs) can be called using Dataset.map_batches(my_function). The transformation will be conducted in parallel for each data batch.


You may need to call Dataset.repartition(n) first to split the Dataset into more blocks internally. By default, each block corresponds to one file. The upper bound of parallelism is the number of blocks.

You can specify the data format you are using in the batch_format parameter. The dataset will be divided into batches and those batches converted into the specified format. Available data formats you can specify in the batch_format paramater include "pandas", "pyarrow", "numpy". Tabular data will be passed into your UDF by default as a pandas DataFrame. Tensor data will be passed into your UDF as a numpy array.

Here, we will use batch_format="pandas" explicitly for clarity.

# A pandas DataFrame UDF for transforming the Dataset in parallel.
def transform_df(input_df: pd.DataFrame) -> pd.DataFrame:
    df = input_df.copy()

    # calculate trip_duration
    df["trip_duration"] = (df["dropoff_at"] - df["pickup_at"]).dt.seconds
    # filter trip_durations > 1 minute and less than 24 hours
    df = df[df["trip_duration"] > 60]
    df = df[df["trip_duration"] < 24 * 60 * 60]
    # keep only necessary columns
        ["dropoff_at", "pickup_at", "pickup_location_id", "fare_amount"],
    df["dropoff_location_id"] = df["dropoff_location_id"].fillna(-1)
    return df

# Test the transform UDF.
print(f"Number of rows before transformation: {ds_raw.count()}")

# Repartition the dataset to allow for higher parallelism.
# Best practice: repartition to all available cpu except a few, with a cap
num_partitions = min(num_cpu - 2, 32)
ds = ds_raw.repartition(num_partitions)

# .map_batches applies a UDF to each partition of the data in parallel.
ds = ds.map_batches(transform_df, batch_format="pandas")

# Verify row count.
print(f"Number of rows after transformation: {ds.count()}")
Number of rows before transformation: 6941024
Read: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:01<00:00,  1.97s/it]
Repartition: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 6/6 [00:02<00:00,  2.87it/s]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 6/6 [00:02<00:00,  2.90it/s]
Number of rows after transformation: 285323
CPU times: user 320 ms, sys: 114 ms, total: 434 ms
Wall time: 6.19 s

Batch training with Ray Datasets #

Now that we have learned more about our data and written a pandas UDF to transform our data, we are ready to train a model on batches of this data in parallel.

  1. We will use the dropoff_location_id column in the dataset to group the dataset into data batches.

  2. Then we will fit a separate model for each batch to predict trip_duration.

# import standard sklearn libraries
import sklearn
from sklearn.base import BaseEstimator
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_absolute_error

print(f"sklearn: {sklearn.__version__}")
import xgboost as xgb

print(f"xgboost: {xgb.__version__}")
# set global random seed for sklearn models
sklearn: 1.1.2
xgboost: 1.3.3
/home/ray/anaconda3/lib/python3.8/site-packages/xgboost/ FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
  from pandas import MultiIndex, Int64Index

Define search space for training#

In this notebook, we will run parallel training jobs per data batch, drop-off location. The training jobs will be defined using a search space and simple grid search. Depending on your need, fancier search spaces and search algorithms are possible with Ray Tune.

Below, we define our search space consists of:

  • Different algorithms, either:

    • Linear Regression or XGBoost Tree Regression.

We want to train using every algorithm in the search space. What this means is every algorithm will be applied to every NYC Taxi drop-off location.


Define training functions#

We want to fit a linear regression model to the trip duration for each drop-off location. For scoring, we will calculate mean absolute error on the validation set, and report that as model error per drop-off location.

The fit_and_score_sklearn function contains the logic necessary to fit a scikit-learn model and evaluate it using mean absolute error.

def fit_and_score_sklearn(
    train_df: pd.DataFrame, test_df: pd.DataFrame, model: BaseEstimator
) -> pd.DataFrame:

    # Assemble train/test pandas dfs
    train_X = train_df[["passenger_count", "trip_distance"]]
    train_y = train_df[TARGET]
    test_X = test_df[["passenger_count", "trip_distance"]]
    test_y = test_df[TARGET]

    # Start training.
    model =, train_y)
    pred_y = model.predict(test_X)

    # Evaluate.
    error = sklearn.metrics.mean_absolute_error(test_y, pred_y)
    if error is None:
        error = 10000.0

    # Assemble return as a pandas dataframe.
    return_df = pd.DataFrame({"model": [model], "error": [error]})

    # return model, error
    return return_df

The train_and_evaluate function contains the logic for train-test splitting and fitting of a model using the fit_and_score_sklearn function.

As an input, this function takes in a pandas DataFrame. When we call Dataset.map_batches or Dataset.groupby().map_groups(), the Dataset will be batched into multiple pandas DataFrames and this function will run for each batch in parallel. We will return the model and its error. Those results will be collected back into a Ray Dataset.

def train_and_evaluate(
    df: pd.DataFrame, models: List[BaseEstimator], location_id: int
) -> pd.DataFrame:

    # We need at least 4 rows to create a train / test split.
    if len(df) < 4:
            f"Data batch for LocID {location_id} is empty or smaller than 4 rows"
        return None

    start = time.time()

    # Train / test split
    # Randomly split the data into 80/20 train/test.
    train_df, test_df = train_test_split(df, test_size=0.2, shuffle=True)

    # Launch a fit and score task for each model.
    # results is a list of pandas dataframes, one per model
    results = [fit_and_score_sklearn(train_df, test_df, model) for model in models]

    # Assemble location_id, name of model, and metrics in a pandas DataFrame
    results_df = pd.concat(results, axis=0, join="inner", ignore_index=True)
    results_df.insert(0, column="location_id", value=location_id)

    training_time = time.time() - start
    print_time(f"Training time for LocID {location_id}: {training_time:.2f} seconds")

    return results_df

Recall how we wrote a data transform transform_batch UDF? It was called with pattern:

  • Dataset.map_batches(transform_batch, batch_format="pandas")

Similarly, we can write a custom groupy-aggregate function agg_func which will run for each Ray Dataset group-by group in parallel. The usage pattern is:

  • Dataset.groupby(column).map_groups(agg_func, batch_format="pandas").

In the cell below, we define our custom agg_func.

# A Pandas DataFrame aggregation function for processing
# grouped batches of Ray Dataset data.
def agg_func(df: pd.DataFrame) -> pd.DataFrame:
    location_id = df["dropoff_location_id"][0]

    # Handle errors in data groups
        # Transform the input pandas AND fit_and_evaluate the transformed pandas
        results_df = train_and_evaluate(df, ALGORITHMS, location_id)
        assert results_df is not None
    except Exception:
        # assemble a null entry
        print(f"Failed on LocID {location_id}!")
        results_df = pd.DataFrame(
            [[location_id, None, 10000.0]],
            columns=["location_id", "model", "error"],
            dtypes=["int32", BaseEstimator, "float64"],

    return results_df

Run batch training using map_groups#

The main β€œdriver code” reads each Parquet file (where each file corresponds to one month of NYC taxi data) into a Ray Dataset ds.

Then we use Ray Dataset group-by to map each group into a batch of data and run agg_func on each grouping in parallel by calling ds.groupby("dropoff_location_id").map_groups(agg_func, batch_format="pandas").

# Driver code to run this.

start = time.time()

# Read data into Ray Dataset
# ds = pushdown_read_data(s3_files, sample_locations)\
#                         .repartition(14)\
#                         .ds.map_batches(transform_df, batch_format="pandas")

# Use Ray Dataset groupby.map_groups() to process each group in parallel and return a Ray Dataset.
results = ds.groupby("dropoff_location_id").map_groups(agg_func, batch_format="pandas")

total_time_taken = time.time() - start
print(f"Total number of models: {results.count()}")
print(f"TOTAL TIME TAKEN: {total_time_taken:.2f} seconds")
Sort Sample: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 6/6 [00:01<00:00,  4.17it/s]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 6/6 [00:01<00:00,  3.67it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 6/6 [00:01<00:00,  3.61it/s]
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 6/6 [01:43<00:00, 17.31s/it]
Total number of models: 6
TOTAL TIME TAKEN: 108.69 seconds

Finally, we can inspect the models we have trained and their errors.

Dataset(num_blocks=6, num_rows=6, schema={location_id: int32, model: object, error: float64})
# sort values by location id
results_df = results.to_pandas()
results_df.sort_values(by=["location_id"], ascending=True, inplace=True)
location_id model error
0 141 LinearRegression() 535.858862
1 141 XGBRegressor(base_score=0.5, booster='gbtree',... 527.156189
2 173 LinearRegression() 1279.122424
3 173 XGBRegressor(base_score=0.5, booster='gbtree',... 1377.166627
4 229 LinearRegression() 556.860355
5 229 XGBRegressor(base_score=0.5, booster='gbtree',... 559.876944
location_id      int32
model           object
error          float64
dtype: object
# Keep only 1 model per location_id with minimum error
final_df = results_df.copy()
final_df = final_df.loc[(final_df.error > 0), :]
final_df = final_df.loc[final_df.groupby("location_id")["error"].idxmin()]
final_df.sort_values(by=["error"], inplace=True)
final_df.set_index("location_id", inplace=True, drop=True)
model     object
error    float64
dtype: object
model error
141 XGBRegressor(base_score=0.5, booster='gbtree',... 527.156189
229 LinearRegression() 556.860355
173 LinearRegression() 1279.122424
LinearRegression()                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      0.666667
XGBRegressor(base_score=0.5, booster='gbtree', colsample_bylevel=1,\n             colsample_bynode=1, colsample_bytree=1, gamma=0, gpu_id=-1,\n             importance_type='gain', interaction_constraints='',\n             learning_rate=0.300000012, max_delta_step=0, max_depth=4,\n             min_child_weight=1, missing=nan, monotone_constraints='()',\n             n_estimators=100, n_jobs=8, num_parallel_tree=1, random_state=0,\n             reg_alpha=0, reg_lambda=1, scale_pos_weight=1, subsample=1,\n             tree_method='exact', validate_parameters=1, verbosity=None)    0.333333
dtype: float64

Re-load a model and perform batch prediction #

We will restore a regression model and demonstrate it can be used for prediction.

# Choose a dropoff location
sample_location_id = final_df.index[0]
# Get the algorithm used
sample_algorithm = final_df.loc[[sample_location_id]].model.values[0]
print(f"algorithm type:: {type(sample_algorithm)}")

# Get the saved model directly from the pandas dataframe of results
sample_model = final_df.model[sample_location_id]
print(f"sample_model type:: {type(sample_model)}")
algorithm type:: <class 'xgboost.sklearn.XGBRegressor'>
sample_model type:: <class 'xgboost.sklearn.XGBRegressor'>
# Create some test data
df = ds.to_pandas(limit=ds.count())
df = df.loc[(df.dropoff_location_id == sample_location_id), :]
_, test_df = train_test_split(df, test_size=0.2, shuffle=True)
test_X = test_df[["passenger_count", "trip_distance"]]
test_y = np.array(test_df[TARGET])  # actual values
# Perform batch prediction using restored model
pred_y = sample_model.predict(test_X)

# Zip together predictions and actuals to evaluate
pd.DataFrame(zip(pred_y, test_y), columns=["pred_y", "trip_duration"])[0:10]
/home/ray/anaconda3/lib/python3.8/site-packages/xgboost/ FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
  from pandas import MultiIndex, Int64Index
pred_y trip_duration
0 1175.119019 1174
1 381.193146 299
2 1099.755737 1206
3 260.620178 566
4 684.046021 630
5 1038.442139 852
6 1581.762817 1596
7 533.471680 801
8 1618.910889 1363
9 695.661072 715

Compare validation and test error.

During model training we reported error on β€œvalidation” data (random sample). Below, we will report error on a pretend β€œtest” data set (a different random sample).

Do a quick validation that both errors are reasonably close together.

# Evaluate restored model on test data.
error = sklearn.metrics.mean_absolute_error(test_y, pred_y)
print(f"Test error: {error}")
Test error: 930.7620476282492
# Compare test error with training validation error
print(f"Validation error: {final_df.error[sample_location_id]}")

# Validation and test errors should be reasonably close together.
Validation error: 527.1561889430844