Processing NYC taxi data using Ray Datasets

The NYC Taxi dataset is a popular tabular dataset. In this example, we demonstrate some basic data processing on this dataset using Ray Datasets.

Overview

This tutorial will cover:

  • Reading Parquet data

  • Inspecting the metadata and first few rows of a large Ray Dataset

  • Calculating some common global and grouped statistics on the dataset

  • Dropping columns and rows

  • Adding a derived column

  • Shuffling the dataset

  • Sharding the dataset and feeding it to parallel consumers (trainers)

  • Applying batch (offline) inference to the data

Walkthrough

Let’s start by importing Ray and initializing a local Ray cluster.

# Import ray and initialize a local Ray cluster.
import ray
ray.init()
2022-05-18 18:37:54,818	INFO services.py:1484 -- View the Ray dashboard at http://127.0.0.1:8266
RayContext(dashboard_url='127.0.0.1:8266', python_version='3.7.13', ray_version='2.0.0.dev0', ray_commit='{{RAY_COMMIT_SHA}}', address_info={'node_ip_address': '172.31.46.244', 'raylet_ip_address': '172.31.46.244', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-05-18_18-37-50_553007_794791/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-05-18_18-37-50_553007_794791/sockets/raylet', 'webui_url': '127.0.0.1:8266', 'session_dir': '/tmp/ray/session_2022-05-18_18-37-50_553007_794791', 'metrics_export_port': 49419, 'gcs_address': '172.31.46.244:58837', 'address': '172.31.46.244:58837', 'node_id': '6ef10d33a5b9227b41e857b3a9488bcb958a092fef0538798a800e97'})

Reading and Inspecting the Data

Next, we read a few of the files from the dataset. This read is semi-lazy, where reading of the first file is eagerly executed, but reading of all other files is delayed until the underlying data is needed by downstream operations (e.g. consuming the data with ds.take(), or transforming the data with ds.map_batches()).

We could process the entire Dataset in a streaming fashion using pipelining or all of it in parallel using a multi-node Ray cluster, but we’ll save that for our large-scale examples. :)

# Read two Parquet files in parallel.
ds = ray.data.read_parquet([
    "s3://ursa-labs-taxi-data/2009/01/data.parquet",
    "s3://ursa-labs-taxi-data/2009/02/data.parquet",
])

We can easily inspect the schema of this dataset. For Parquet files, we don’t even have to read the actual data to get the schema; we can read it from the lightweight Parquet metadata!

# Fetch the schema from the underlying Parquet metadata.
ds.schema()
vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
pickup_longitude: float
pickup_latitude: float
rate_code_id: null
store_and_fwd_flag: string
dropoff_longitude: float
dropoff_latitude: float
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
total_amount: float
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 2527

Parquet even stores the number of rows per file in the Parquet metadata, so we can get the number of rows in ds without triggering a full data read.

ds.count()
27472535

We can get a nice, cheap summary of the Dataset by leveraging it’s informative repr:

# Display some metadata about the dataset.
ds
Dataset(num_blocks=2, num_rows=27472535, schema={vendor_id: string, pickup_at: timestamp[us], dropoff_at: timestamp[us], passenger_count: int8, trip_distance: float, pickup_longitude: float, pickup_latitude: float, rate_code_id: null, store_and_fwd_flag: string, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float})

We can also poke at the actual data, taking a peek at a single row. Since this is only returning a row from the first file, reading of the second file is not triggered yet.

ds.take(1)
[ArrowRow({'vendor_id': 'VTS',
           'pickup_at': datetime.datetime(2009, 1, 4, 2, 52),
           'dropoff_at': datetime.datetime(2009, 1, 4, 3, 2),
           'passenger_count': 1,
           'trip_distance': 2.630000114440918,
           'pickup_longitude': -73.99195861816406,
           'pickup_latitude': 40.72156524658203,
           'rate_code_id': None,
           'store_and_fwd_flag': None,
           'dropoff_longitude': -73.99380493164062,
           'dropoff_latitude': 40.6959228515625,
           'payment_type': 'CASH',
           'fare_amount': 8.899999618530273,
           'extra': 0.5,
           'mta_tax': None,
           'tip_amount': 0.0,
           'tolls_amount': 0.0,
           'total_amount': 9.399999618530273})]

To get a better sense of the data size, we can calculate the size in bytes of the full dataset. Note that for Parquet files, this size-in-bytes will be pulled from the Parquet metadata (not triggering a data read) and will therefore be the on-disk size of the data; this might be significantly smaller than the in-memory size!

ds.size_bytes()
897130464

In order to get the in-memory size, we can trigger full reading of the dataset and inspect the size in bytes.

ds.fully_executed().size_bytes()
Read progress: 100%|██████████| 2/2 [00:04<00:00,  2.25s/it]
2263031675

Advanced Aside - Reading Partitioned Parquet Datasets

In addition to being able to read lists of individual files, ray.data.read_parquet() (as well as other ray.data.read_*() APIs) can read directories containing multiple Parquet files. For Parquet in particular, reading Parquet datasets partitioned by a particular column is supported, allowing for path-based (zero-read) partition filtering and (optionally) including the partition column value specified in the file paths directly in the read table data.

For the NYC taxi dataset, instead of reading individual per-month Parquet files, we can read the entire 2009 directory.

Warning

This will be a lot of data (~5.6 GB on disk, ~14 GB in memory), so be careful trigger full reads on a limited-memory machine! This is one place where Datasets’ semi-lazy reading comes in handy: Datasets will only read one file eagerly, which allows us to inspect a subset of the data without having to read the entire dataset.

# Read all Parquet data for the year 2009.
year_ds = ray.data.read_parquet("s3://ursa-labs-taxi-data/2009")

The metadata that Datasets prints in its repr is guaranteed to not trigger reads of all files; data such as the row count and the schema is pulled directly from the Parquet metadata.

year_ds
Dataset(num_blocks=12, num_rows=170896055, schema={vendor_id: string, pickup_at: timestamp[us], dropoff_at: timestamp[us], passenger_count: int8, trip_distance: float, pickup_longitude: float, pickup_latitude: float, rate_code_id: null, store_and_fwd_flag: string, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float})

That’s a lot of rows! Since we’re not going to use this full-year dataset, let’s now delete this dataset to free up some memory in our Ray cluster.

del year_ds

Data Exploration and Cleaning

Let’s calculate some stats to get a better picture of our data.

# What's the longets trip distance, largest tip amount, and most number of passengers?
ds.max(["trip_distance", "tip_amount", "passenger_count"])
Read: 100%|██████████| 2/2 [00:06<00:00,  3.13s/it]
Shuffle Map: 100%|██████████| 2/2 [00:00<00:00,  6.27it/s]
Shuffle Reduce: 100%|██████████| 1/1 [00:00<00:00, 63.47it/s]
ArrowRow({'max(trip_distance)': 50.0,
          'max(tip_amount)': 100.0,
          'max(passenger_count)': 113})

Whoa, there was a trip with 113 people in the taxi!? Let’s check out these kind of many-passenger records by filtering to just these records using our ds.map_batches() batch mapping API.

Note

Our filtering UDF receives a Pandas DataFrame, which is the default batch format for tabular data, and returns a Pandas DataFrame, which keeps the Dataset in a tabular format.

# Whoa, 113 passengers? I need to see this record and other ones with lots of passengers.
ds.map_batches(lambda df: df[df["passenger_count"] > 10]).take()
Read->Map_Batches: 100%|██████████| 2/2 [00:15<00:00,  7.80s/it]
[PandasRow({'vendor_id': 'VTS',
            'pickup_at': Timestamp('2009-01-22 11:47:00'),
            'dropoff_at': Timestamp('2009-01-22 12:00:00'),
            'passenger_count': 113,
            'trip_distance': 0.0,
            'pickup_longitude': 3555.912841796875,
            'pickup_latitude': 935.5253295898438,
            'rate_code_id': None,
            'store_and_fwd_flag': None,
            'dropoff_longitude': -74.01129913330078,
            'dropoff_latitude': 1809.957763671875,
            'payment_type': 'CASH',
            'fare_amount': 13.300000190734863,
            'extra': 0.0,
            'mta_tax': nan,
            'tip_amount': 0.0,
            'tolls_amount': 0.0,
            'total_amount': 13.300000190734863})]

That seems weird, probably bad data, or at least data points that I’m not interested in. We should filter these out!

# Filter out all records with over 10 passengers.
ds = ds.map_batches(lambda df: df[df["passenger_count"] <= 10])
Read->Map_Batches: 100%|██████████| 2/2 [00:49<00:00, 24.63s/it]

We don’t have any use for the store_and_fwd_flag or mta_tax columns, so let’s drop those.

# Drop some columns.
ds = ds.map_batches(lambda df: df.drop(columns=["store_and_fwd_flag", "mta_tax"]))
Map_Batches: 100%|██████████| 2/2 [00:47<00:00, 23.77s/it]

Let’s say we want to know how many trips there are for each passenger count.

ds.groupby("passenger_count").count().take()
Sort Sample: 100%|██████████| 2/2 [00:04<00:00,  2.15s/it]
Shuffle Map: 100%|██████████| 2/2 [03:36<00:00, 108.13s/it]
Shuffle Reduce: 100%|██████████| 2/2 [00:00<00:00, 112.32it/s]
[ArrowRow({'passenger_count': -127, 'count()': 2}),
 ArrowRow({'passenger_count': -48, 'count()': 45}),
 ArrowRow({'passenger_count': 0, 'count()': 794}),
 ArrowRow({'passenger_count': 1, 'count()': 18634337}),
 ArrowRow({'passenger_count': 2, 'count()': 4503747}),
 ArrowRow({'passenger_count': 3, 'count()': 1196381}),
 ArrowRow({'passenger_count': 4, 'count()': 559279}),
 ArrowRow({'passenger_count': 5, 'count()': 2452176}),
 ArrowRow({'passenger_count': 6, 'count()': 125773})]

Again, it looks like there are some more nonsensical passenger counts, i.e. the negative ones. Let’s filter those out too.

# Filter our records with negative passenger counts.
ds = ds.map_batches(lambda df: df[df["passenger_count"] > 0])
Map_Batches: 100%|██████████| 2/2 [00:47<00:00, 23.69s/it]

Advanced Aside - Projection and Filter Pushdown

Note that Ray Datasets’ Parquet reader supports projection (column selection) and row filter pushdown, where we can push the above column selection and the row-based filter to the Parquet read. If we specify column selection at Parquet read time, the unselected columns won’t even be read from disk!

The row-based filter is specified via Arrow’s dataset field expressions. See the feature guide for reading Parquet data for more information.

# Only read the passenger_count and trip_distance columns.
import pyarrow as pa
filter_expr = (
    (pa.dataset.field("passenger_count") <= 10)
    & (pa.dataset.field("passenger_count") > 0)
)

pushdown_ds = ray.data.read_parquet(
    [
        "s3://ursa-labs-taxi-data/2009/01/data.parquet",
        "s3://ursa-labs-taxi-data/2009/02/data.parquet",
    ],
    columns=["passenger_count", "trip_distance"],
    filter=filter_expr,
)

# Force full execution of both of the file reads.
pushdown_ds = pushdown_ds.fully_executed()
pushdown_ds
Read progress: 100%|██████████| 2/2 [00:00<00:00,  2.76it/s]
Dataset(num_blocks=2, num_rows=27471693, schema={passenger_count: int8, trip_distance: float})
# Delete the pushdown dataset. Deleting the Dataset object
# will release the underlying memory in the cluster.
del pushdown_ds

Do the passenger counts influences the typical trip distance?

# Mean trip distance grouped by passenger count.
ds.groupby("passenger_count").mean("trip_distance").take()
Sort Sample: 100%|██████████| 2/2 [00:04<00:00,  2.24s/it]
Shuffle Map: 100%|██████████| 2/2 [03:28<00:00, 104.24s/it]
Shuffle Reduce: 100%|██████████| 2/2 [00:00<00:00, 123.35it/s]
[ArrowRow({'passenger_count': 1, 'mean(trip_distance)': 2.5442271984282017}),
 ArrowRow({'passenger_count': 2, 'mean(trip_distance)': 2.701997813992574}),
 ArrowRow({'passenger_count': 3, 'mean(trip_distance)': 2.624621515664268}),
 ArrowRow({'passenger_count': 4, 'mean(trip_distance)': 2.6351745332066048}),
 ArrowRow({'passenger_count': 5, 'mean(trip_distance)': 2.628660744359485}),
 ArrowRow({'passenger_count': 6, 'mean(trip_distance)': 2.5804354108726586})]

See the feature guides for transforming data and ML preprocessing for more information on how we can process our data with Ray Datasets.

Ingesting into Model Trainers

Now that we’ve learned more about our data and we have cleaned up our dataset a bit, we now look at how we can feed this dataset into some dummy model trainers.

First, let’s do a full global random shuffle of the dataset to decorrelate these samples.

ds = ds.random_shuffle()
Shuffle Map: 100%|██████████| 2/2 [00:21<00:00, 10.92s/it]
Shuffle Reduce: 100%|██████████| 2/2 [00:35<00:00, 17.59s/it]

We define a dummy Trainer actor, where each trainer will consume a dataset shard in batches and simulate model training.

Note

In a real training workflow, we would feed ds to Ray Train, which would do this sharding and creation of training actors for us, under the hood.

@ray.remote
class Trainer:
    def __init__(self, rank: int):
        pass

    def train(self, shard: ray.data.Dataset) -> int:
        for batch in shard.iter_batches(batch_size=256):
            pass
        return shard.count()

trainers = [Trainer.remote(i) for i in range(4)]
trainers
[Actor(Trainer, 6d81e32e1d1582f89ca75e3c01000000),
 Actor(Trainer, 84887785bc1a9d5b697728be01000000),
 Actor(Trainer, b57750338c40513819fe4d8301000000),
 Actor(Trainer, a393b1c25a8a1b42754959cf01000000)]

Next, we split the dataset into len(trainers) shards, ensuring that the shards are of equal size, and providing the trainer actor handles to Ray Datasets as locality hints, so Datasets can try to colocate shard data with trainers in order to decrease data movement.

shards = ds.split(n=len(trainers), equal=True, locality_hints=trainers)
shards
[Dataset(num_blocks=1, num_rows=6867923, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
 Dataset(num_blocks=1, num_rows=6867923, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
 Dataset(num_blocks=1, num_rows=6867923, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
 Dataset(num_blocks=1, num_rows=6867923, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32})]

Finally, we simulate training, passing each shard to the corresponding trainer. The number of rows per shard is returned.

ray.get([w.train.remote(s) for w, s in zip(trainers, shards)])
[6867923, 6867923, 6867923, 6867923]
# Delete trainer actor handle references, which should terminate the actors.
del trainers

Parallel Batch Inference

After we’ve trained a model, we may want to perform batch (offline) inference on such a tabular dataset. With Ray Datasets, this is as easy as a ds.map_batches() call!

First, we define a callable class that will cache the loading of the model in its constructor.

import pandas as pd

def load_model():
    # A dummy model.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})
    
    return model

class BatchInferModel:
    def __init__(self):
        self.model = load_model()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)

BatchInferModel’s constructor will only be called once per actor worker when using the actor pool compute strategy in ds.map_batches().

ds.map_batches(BatchInferModel, batch_size=2048, compute="actors").take()
Map Progress (8 actors 2 pending):  50%|█████     | 1/2 [00:14<00:14, 14.75s/it]
Map Progress (8 actors 2 pending): 100%|██████████| 2/2 [00:28<00:00, 14.36s/it]
[PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False})]

If wanting to perform batch inference on GPUs, simply specify the number of GPUs you wish to provision for each batch inference worker.

Warning

This will only run successfully if your cluster has nodes with GPUs!

ds.map_batches(
    BatchInferModel,
    batch_size=256,
    #num_gpus=1,  # Uncomment this to run this on GPUs!
    compute="actors",
).take()
Map Progress (8 actors 2 pending):   0%|          | 0/2 [00:06<?, ?it/s]
Map Progress (8 actors 2 pending):  50%|█████     | 1/2 [01:31<01:31, 91.10s/it]
Map Progress (8 actors 2 pending): 100%|██████████| 2/2 [03:00<00:00, 90.33s/it]
[PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False})]

We can also configure the autoscaling actor pool that this inference stage uses, setting upper and lower bounds on the actor pool size, and even tweak the batch prefetching vs. inference task queueing tradeoff.

from ray.data import ActorPoolStrategy

# The actor pool will have at least 2 workers and at most 8 workers.
strategy = ActorPoolStrategy(min_size=2, max_size=8)

ds.map_batches(
    BatchInferModel,
    batch_size=256,
    #num_gpus=1,  # Uncomment this to run this on GPUs!
    compute=strategy,
).take()
Map Progress (8 actors 0 pending): 100%|██████████| 2/2 [02:56<00:00, 88.40s/it]
[PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False})]