Processing NYC taxi data using Ray Data
Contents
Processing NYC taxi data using Ray Data#
The NYC Taxi dataset is a popular tabular dataset. In this example, we demonstrate some basic data processing on this dataset using Ray Data.
Overview#
This tutorial will cover:
Reading Parquet data
Inspecting the metadata and first few rows of a large Ray
Dataset
Calculating some common global and grouped statistics on the dataset
Dropping columns and rows
Adding a derived column
Shuffling the data
Sharding the data and feeding it to parallel consumers (trainers)
Applying batch (offline) inference to the data
Walkthrough#
Let’s start by importing Ray and initializing a local Ray cluster.
# Import ray and initialize a local Ray cluster.
import ray
ray.init()
Reading and Inspecting the Data#
Next, we read a few of the files from the dataset. This read is lazy, where reading and all future transformations are delayed until a downstream operation triggers execution (e.g. consuming the data with ds.take()
)
# Read two Parquet files in parallel.
ds = ray.data.read_parquet([
"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_01_data.parquet",
"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_02_data.parquet"
])
We can easily inspect the schema of this dataset. For Parquet files, we don’t even have to read the actual data to get the schema; we can read it from the lightweight Parquet metadata!
# Fetch the schema from the underlying Parquet metadata.
ds.schema()
vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
pickup_longitude: float
pickup_latitude: float
rate_code_id: null
store_and_fwd_flag: string
dropoff_longitude: float
dropoff_latitude: float
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
total_amount: float
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 2524
Parquet even stores the number of rows per file in the Parquet metadata, so we can get the number of rows in ds
without triggering a full data read.
ds.count()
2749936
We can get a nice, cheap summary of the Dataset
by leveraging it’s informative repr:
# Display some metadata about the dataset.
ds
Dataset(num_blocks=2, num_rows=2749936, schema={vendor_id: string, pickup_at: timestamp[us], dropoff_at: timestamp[us], passenger_count: int8, trip_distance: float, pickup_longitude: float, pickup_latitude: float, rate_code_id: null, store_and_fwd_flag: string, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float})
We can also poke at the actual data, taking a peek at a single row. Since this is only returning a row from the first file, reading of the second file is not triggered yet.
ds.take(1)
[ArrowRow({'vendor_id': 'VTS',
'pickup_at': datetime.datetime(2009, 1, 21, 14, 58),
'dropoff_at': datetime.datetime(2009, 1, 21, 15, 3),
'passenger_count': 1,
'trip_distance': 0.5299999713897705,
'pickup_longitude': -73.99270629882812,
'pickup_latitude': 40.7529411315918,
'rate_code_id': None,
'store_and_fwd_flag': None,
'dropoff_longitude': -73.98814392089844,
'dropoff_latitude': 40.75956344604492,
'payment_type': 'CASH',
'fare_amount': 4.5,
'extra': 0.0,
'mta_tax': None,
'tip_amount': 0.0,
'tolls_amount': 0.0,
'total_amount': 4.5})]
To get a better sense of the data size, we can calculate the size in bytes of the full dataset. Note that for Parquet files, this size-in-bytes will be pulled from the Parquet metadata (not triggering a data read), and therefore might be significantly different than the in-memory size!
ds.size_bytes()
427503965
In order to get the in-memory size, we can trigger full reading of the dataset and inspect the size in bytes.
ds.materialize().size_bytes()
Read progress: 100%|██████████| 2/2 [00:00<00:00, 2.50it/s]
226524489
Advanced Aside - Reading Partitioned Parquet Datasets#
In addition to being able to read lists of individual files, ray.data.read_parquet()
(as well as other ray.data.read_*()
APIs) can read directories containing multiple Parquet files. For Parquet in particular, reading Parquet datasets partitioned by a particular column is supported, allowing for path-based (zero-read) partition filtering and (optionally) including the partition column value specified in the file paths directly in the read table data.
For the NYC taxi dataset, instead of reading individual per-month Parquet files, we can read the entire 2009 directory.
Warning
This could be a lot of data (downsampled with 0.01 ratio leads to ~50.2 MB on disk, ~147 MB in memory), so be careful triggering full reads on a limited-memory machine! This is one place where Dataset’s lazy reading comes in handy: Dataset will not execute any read tasks eagerly and will execute the minimum number of file reads to satisfy downstream operations, which allows us to inspect a subset of the data without having to read the entire dataset.
# Read all Parquet data for the year 2009.
year_ds = ray.data.read_parquet("s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet")
The metadata that Dataset prints in its repr is guaranteed to not trigger reads of all files; data such as the row count and the schema is pulled directly from the Parquet metadata.
year_ds.count()
1710629
That’s a lot of rows! Since we’re not going to use this full-year data, let’s now delete this dataset to free up some memory in our Ray cluster.
del year_ds
Data Exploration and Cleaning#
Let’s calculate some stats to get a better picture of our data.
# What's the longets trip distance, largest tip amount, and most number of passengers?
ds.max(["trip_distance", "tip_amount", "passenger_count"])
Shuffle Map: 100%|██████████| 2/2 [00:00<00:00, 50.69it/s]
Shuffle Reduce: 100%|██████████| 1/1 [00:00<00:00, 114.04it/s]
ArrowRow({'max(trip_distance)': 50.0,
'max(tip_amount)': 100.0,
'max(passenger_count)': 6})
We don’t have any use for the store_and_fwd_flag
or mta_tax
columns, so let’s drop those.
# Drop some columns.
ds = ds.drop_columns(["store_and_fwd_flag", "mta_tax"])
Map_Batches: 100%|██████████| 2/2 [00:03<00:00, 1.59s/it]
Let’s say we want to know how many trips there are for each passenger count.
ds.groupby("passenger_count").count().take()
Sort Sample: 100%|██████████| 2/2 [00:00<00:00, 5.01it/s]
Shuffle Map: 100%|██████████| 2/2 [03:21<00:00, 100.61s/it]
Shuffle Reduce: 0%| | 0/2 [00:00<?, ?it/s](map pid=9272) E0725 14:35:16.665638988 9301 chttp2_transport.cc:1103] Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
Shuffle Reduce: 100%|██████████| 2/2 [00:01<00:00, 1.97it/s]
[PandasRow({'passenger_count': -48, 'count()': 3}),
PandasRow({'passenger_count': 0, 'count()': 91}),
PandasRow({'passenger_count': 1, 'count()': 1865548}),
PandasRow({'passenger_count': 2, 'count()': 451452}),
PandasRow({'passenger_count': 3, 'count()': 119406}),
PandasRow({'passenger_count': 4, 'count()': 55547}),
PandasRow({'passenger_count': 5, 'count()': 245332}),
PandasRow({'passenger_count': 6, 'count()': 12557})]
Again, it looks like there are some more nonsensical passenger counts, i.e. the negative ones. Let’s filter those out too.
# Filter our records with negative passenger counts.
ds = ds.map_batches(lambda df: df[df["passenger_count"] > 0])
Map_Batches: 100%|██████████| 2/2 [00:03<00:00, 1.60s/it]
Do the passenger counts influences the typical trip distance?
# Mean trip distance grouped by passenger count.
ds.groupby("passenger_count").mean("trip_distance").take()
Sort Sample: 100%|██████████| 2/2 [00:00<00:00, 4.57it/s]
Shuffle Map: 100%|██████████| 2/2 [03:23<00:00, 101.59s/it]
Shuffle Reduce: 100%|██████████| 2/2 [00:00<00:00, 178.79it/s]
[PandasRow({'passenger_count': 1, 'mean(trip_distance)': 2.543288084787955}),
PandasRow({'passenger_count': 2, 'mean(trip_distance)': 2.7043459216040686}),
PandasRow({'passenger_count': 3, 'mean(trip_distance)': 2.6233412684454716}),
PandasRow({'passenger_count': 4, 'mean(trip_distance)': 2.642096445352584}),
PandasRow({'passenger_count': 5, 'mean(trip_distance)': 2.6286944833939314}),
PandasRow({'passenger_count': 6, 'mean(trip_distance)': 2.5848625579855855})]
See Transforming Data for more information on how we can process our data with Ray Data.
Advanced Aside - Projection and Filter Pushdown#
Note that Ray Data’ Parquet reader supports projection (column selection) and row filter pushdown, where we can push the above column selection and the row-based filter to the Parquet read. If we specify column selection at Parquet read time, the unselected columns won’t even be read from disk!
The row-based filter is specified via Arrow’s dataset field expressions. See the Parquet row pruning tips for more information.
# Only read the passenger_count and trip_distance columns.
import pyarrow as pa
filter_expr = (
(pa.dataset.field("passenger_count") <= 10)
& (pa.dataset.field("passenger_count") > 0)
)
pushdown_ds = ray.data.read_parquet(
[
"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_01_data.parquet",
"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_02_data.parquet",
],
columns=["passenger_count", "trip_distance"],
filter=filter_expr,
)
# Force full execution of both of the file reads.
pushdown_ds = pushdown_ds.materialize()
pushdown_ds
⚠️ The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Read progress: 100%|██████████| 2/2 [00:00<00:00, 9.19it/s]
Dataset(num_blocks=2, num_rows=2749842, schema={passenger_count: int8, trip_distance: float})
# Delete the pushdown dataset. Deleting the Dataset object
# will release the underlying memory in the cluster.
del pushdown_ds
Ingesting into Model Trainers#
Now that we’ve learned more about our data and we have cleaned up our dataset a bit, we now look at how we can feed this dataset into some dummy model trainers.
First, let’s do a full global random shuffle of the dataset to decorrelate these samples.
ds = ds.random_shuffle()
Shuffle Map: 100%|██████████| 2/2 [00:01<00:00, 1.34it/s]
Shuffle Reduce: 100%|██████████| 2/2 [00:01<00:00, 1.09it/s]
We define a dummy Trainer
actor, where each trainer will consume a dataset shard in batches and simulate model training.
Note
In a real training workflow, we would feed ds
to Ray Train, which would do this sharding and creation of training actors for us, under the hood.
@ray.remote
class Trainer:
def __init__(self, rank: int):
pass
def train(self, shard: ray.data.Dataset) -> int:
for batch in shard.iter_batches(batch_size=256):
pass
return shard.count()
trainers = [Trainer.remote(i) for i in range(4)]
trainers
[Actor(Trainer, 9326d43345699213608f324003000000),
Actor(Trainer, f0ce2ce44528fbf748c9c1a103000000),
Actor(Trainer, 7ba39c8f82ebd78c68e92ec903000000),
Actor(Trainer, b95fe3494b7bc2d8f42abbba03000000)]
Next, we split the dataset into len(trainers)
shards, ensuring that the shards are of equal size.
shards = ds.split(n=len(trainers), equal=True)
shards
[Dataset(num_blocks=1, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
Dataset(num_blocks=1, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
Dataset(num_blocks=2, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
Dataset(num_blocks=1, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32})]
Finally, we simulate training, passing each shard to the corresponding trainer. The number of rows per shard is returned.
ray.get([w.train.remote(s) for w, s in zip(trainers, shards)])
[687460, 687460, 687460, 687460]
# Delete trainer actor handle references, which should terminate the actors.
del trainers
Parallel Batch Inference#
Tip
Refer to the blog on Model Batch Inference in Ray for an overview of batch inference strategies in Ray and additional examples.
After we’ve trained a model, we may want to perform batch (offline) inference on such a tabular dataset. With Ray Data, this is as easy as a ds.map_batches()
call!
First, we define a callable class that will cache the loading of the model in its constructor.
import pandas as pd
def load_model():
# A dummy model.
def model(batch: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})
return model
class BatchInferModel:
def __init__(self):
self.model = load_model()
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
return self.model(batch)
BatchInferModel
’s constructor will only be called once per actor worker when using the actor pool compute strategy in ds.map_batches()
.
ds.map_batches(BatchInferModel, batch_size=2048, compute=ray.data.ActorPoolStrategy()).take()
Map Progress (2 actors 1 pending): 100%|██████████| 2/2 [00:05<00:00, 2.57s/it]
[PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False})]
If wanting to perform batch inference on GPUs, simply specify the number of GPUs you wish to provision for each batch inference worker.
Warning
This will only run successfully if your cluster has nodes with GPUs!
ds.map_batches(
BatchInferModel,
batch_size=256,
#num_gpus=1, # Uncomment this to run this on GPUs!
compute=ray.data.ActorPoolStrategy(),
).take()
Map Progress (15 actors 4 pending): 100%|██████████| 2/2 [00:21<00:00, 10.67s/it]
[PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False})]
We can also configure the autoscaling actor pool that this inference stage uses, setting upper and lower bounds on the actor pool size, and even tweak the batch prefetching vs. inference task queueing tradeoff.
from ray.data import ActorPoolStrategy
# The actor pool will have at least 2 workers and at most 8 workers.
strategy = ActorPoolStrategy(min_size=2, max_size=8)
ds.map_batches(
BatchInferModel,
batch_size=256,
#num_gpus=1, # Uncomment this to run this on GPUs!
compute=strategy,
).take()
Map Progress (8 actors 0 pending): 100%|██████████| 2/2 [00:21<00:00, 10.71s/it]
[PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': False}),
PandasRow({'score': True}),
PandasRow({'score': False})]