Processing NYC taxi data using Ray DatasetsΒΆ

The NYC Taxi dataset is a popular tabular dataset. In this example, we demonstrate some basic data processing on this dataset using Ray Datasets.

OverviewΒΆ

This tutorial will cover:

  • Reading Parquet data

  • Inspecting the metadata and first few rows of a large Ray Dataset

  • Calculating some common global and grouped statistics on the dataset

  • Dropping columns and rows

  • Adding a derived column

  • Shuffling the dataset

  • Sharding the dataset and feeding it to parallel consumers (trainers)

  • Applying batch (offline) inference to the data

WalkthroughΒΆ

Let’s start by importing Ray and initializing a local Ray cluster.

# Import ray and initialize a local Ray cluster.
import ray
ray.init()

Reading and Inspecting the DataΒΆ

Next, we read a few of the files from the dataset. This read is semi-lazy, where reading of the first file is eagerly executed, but reading of all other files is delayed until the underlying data is needed by downstream operations (e.g. consuming the data with ds.take(), or transforming the data with ds.map_batches()).

We could process the entire Dataset in a streaming fashion using pipelining or all of it in parallel using a multi-node Ray cluster, but we’ll save that for our large-scale examples. :)

# Read two Parquet files in parallel.
ds = ray.data.read_parquet([
    "s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_01_data.parquet",
    "s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_02_data.parquet"
])

We can easily inspect the schema of this dataset. For Parquet files, we don’t even have to read the actual data to get the schema; we can read it from the lightweight Parquet metadata!

# Fetch the schema from the underlying Parquet metadata.
ds.schema()
vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
pickup_longitude: float
pickup_latitude: float
rate_code_id: null
store_and_fwd_flag: string
dropoff_longitude: float
dropoff_latitude: float
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
total_amount: float
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 2524

Parquet even stores the number of rows per file in the Parquet metadata, so we can get the number of rows in ds without triggering a full data read.

ds.count()
2749936

We can get a nice, cheap summary of the Dataset by leveraging it’s informative repr:

# Display some metadata about the dataset.
ds
Dataset(num_blocks=2, num_rows=2749936, schema={vendor_id: string, pickup_at: timestamp[us], dropoff_at: timestamp[us], passenger_count: int8, trip_distance: float, pickup_longitude: float, pickup_latitude: float, rate_code_id: null, store_and_fwd_flag: string, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float})

We can also poke at the actual data, taking a peek at a single row. Since this is only returning a row from the first file, reading of the second file is not triggered yet.

ds.take(1)
[ArrowRow({'vendor_id': 'VTS',
           'pickup_at': datetime.datetime(2009, 1, 21, 14, 58),
           'dropoff_at': datetime.datetime(2009, 1, 21, 15, 3),
           'passenger_count': 1,
           'trip_distance': 0.5299999713897705,
           'pickup_longitude': -73.99270629882812,
           'pickup_latitude': 40.7529411315918,
           'rate_code_id': None,
           'store_and_fwd_flag': None,
           'dropoff_longitude': -73.98814392089844,
           'dropoff_latitude': 40.75956344604492,
           'payment_type': 'CASH',
           'fare_amount': 4.5,
           'extra': 0.0,
           'mta_tax': None,
           'tip_amount': 0.0,
           'tolls_amount': 0.0,
           'total_amount': 4.5})]

To get a better sense of the data size, we can calculate the size in bytes of the full dataset. Note that for Parquet files, this size-in-bytes will be pulled from the Parquet metadata (not triggering a data read), and therefore might be significantly different than the in-memory size!

ds.size_bytes()
427503965

In order to get the in-memory size, we can trigger full reading of the dataset and inspect the size in bytes.

ds.fully_executed().size_bytes()
Read progress: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00,  2.50it/s]
226524489

Advanced Aside - Reading Partitioned Parquet DatasetsΒΆ

In addition to being able to read lists of individual files, ray.data.read_parquet() (as well as other ray.data.read_*() APIs) can read directories containing multiple Parquet files. For Parquet in particular, reading Parquet datasets partitioned by a particular column is supported, allowing for path-based (zero-read) partition filtering and (optionally) including the partition column value specified in the file paths directly in the read table data.

For the NYC taxi dataset, instead of reading individual per-month Parquet files, we can read the entire 2009 directory.

Warning

This could be a lot of data (downsampled with 0.01 ratio leads to ~50.2 MB on disk, ~147 MB in memory), so be careful triggering full reads on a limited-memory machine! This is one place where Datasets’ semi-lazy reading comes in handy: Datasets will only read one file eagerly, which allows us to inspect a subset of the data without having to read the entire dataset.

# Read all Parquet data for the year 2009.
year_ds = ray.data.read_parquet("s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet")

The metadata that Datasets prints in its repr is guaranteed to not trigger reads of all files; data such as the row count and the schema is pulled directly from the Parquet metadata.

year_ds.count()
1710629

That’s a lot of rows! Since we’re not going to use this full-year dataset, let’s now delete this dataset to free up some memory in our Ray cluster.

del year_ds

Data Exploration and CleaningΒΆ

Let’s calculate some stats to get a better picture of our data.

# What's the longets trip distance, largest tip amount, and most number of passengers?
ds.max(["trip_distance", "tip_amount", "passenger_count"])
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00, 50.69it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 114.04it/s]
ArrowRow({'max(trip_distance)': 50.0,
          'max(tip_amount)': 100.0,
          'max(passenger_count)': 6})

We don’t have any use for the store_and_fwd_flag or mta_tax columns, so let’s drop those.

# Drop some columns.
ds = ds.drop_columns(["store_and_fwd_flag", "mta_tax"])
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:03<00:00,  1.59s/it]

Let’s say we want to know how many trips there are for each passenger count.

ds.groupby("passenger_count").count().take()
Sort Sample: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00,  5.01it/s]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [03:21<00:00, 100.61s/it]
Shuffle Reduce:   0%|          | 0/2 [00:00<?, ?it/s](map pid=9272) E0725 14:35:16.665638988    9301 chttp2_transport.cc:1103]   Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:01<00:00,  1.97it/s]
[PandasRow({'passenger_count': -48, 'count()': 3}),
 PandasRow({'passenger_count': 0, 'count()': 91}),
 PandasRow({'passenger_count': 1, 'count()': 1865548}),
 PandasRow({'passenger_count': 2, 'count()': 451452}),
 PandasRow({'passenger_count': 3, 'count()': 119406}),
 PandasRow({'passenger_count': 4, 'count()': 55547}),
 PandasRow({'passenger_count': 5, 'count()': 245332}),
 PandasRow({'passenger_count': 6, 'count()': 12557})]

Again, it looks like there are some more nonsensical passenger counts, i.e. the negative ones. Let’s filter those out too.

# Filter our records with negative passenger counts.
ds = ds.map_batches(lambda df: df[df["passenger_count"] > 0])
Map_Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:03<00:00,  1.60s/it]

Do the passenger counts influences the typical trip distance?

# Mean trip distance grouped by passenger count.
ds.groupby("passenger_count").mean("trip_distance").take()
Sort Sample: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00,  4.57it/s]
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [03:23<00:00, 101.59s/it]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00, 178.79it/s]
[PandasRow({'passenger_count': 1, 'mean(trip_distance)': 2.543288084787955}),
 PandasRow({'passenger_count': 2, 'mean(trip_distance)': 2.7043459216040686}),
 PandasRow({'passenger_count': 3, 'mean(trip_distance)': 2.6233412684454716}),
 PandasRow({'passenger_count': 4, 'mean(trip_distance)': 2.642096445352584}),
 PandasRow({'passenger_count': 5, 'mean(trip_distance)': 2.6286944833939314}),
 PandasRow({'passenger_count': 6, 'mean(trip_distance)': 2.5848625579855855})]

See the feature guides for transforming data and ML preprocessing for more information on how we can process our data with Ray Datasets.

Advanced Aside - Projection and Filter PushdownΒΆ

Note that Ray Datasets’ Parquet reader supports projection (column selection) and row filter pushdown, where we can push the above column selection and the row-based filter to the Parquet read. If we specify column selection at Parquet read time, the unselected columns won’t even be read from disk!

The row-based filter is specified via Arrow’s dataset field expressions. See the feature guide for reading Parquet data for more information.

# Only read the passenger_count and trip_distance columns.
import pyarrow as pa
filter_expr = (
    (pa.dataset.field("passenger_count") <= 10)
    & (pa.dataset.field("passenger_count") > 0)
)

pushdown_ds = ray.data.read_parquet(
    [
        "s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_01_data.parquet",
        "s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_02_data.parquet",
    ],
    columns=["passenger_count", "trip_distance"],
    filter=filter_expr,
)

# Force full execution of both of the file reads.
pushdown_ds = pushdown_ds.fully_executed()
pushdown_ds
⚠️  The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Read progress: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00,  9.19it/s]
Dataset(num_blocks=2, num_rows=2749842, schema={passenger_count: int8, trip_distance: float})
# Delete the pushdown dataset. Deleting the Dataset object
# will release the underlying memory in the cluster.
del pushdown_ds

Ingesting into Model TrainersΒΆ

Now that we’ve learned more about our data and we have cleaned up our dataset a bit, we now look at how we can feed this dataset into some dummy model trainers.

First, let’s do a full global random shuffle of the dataset to decorrelate these samples.

ds = ds.random_shuffle()
Shuffle Map: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:01<00:00,  1.34it/s]
Shuffle Reduce: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:01<00:00,  1.09it/s]

We define a dummy Trainer actor, where each trainer will consume a dataset shard in batches and simulate model training.

Note

In a real training workflow, we would feed ds to Ray Train, which would do this sharding and creation of training actors for us, under the hood.

@ray.remote
class Trainer:
    def __init__(self, rank: int):
        pass

    def train(self, shard: ray.data.Dataset) -> int:
        for batch in shard.iter_batches(batch_size=256):
            pass
        return shard.count()

trainers = [Trainer.remote(i) for i in range(4)]
trainers
[Actor(Trainer, 9326d43345699213608f324003000000),
 Actor(Trainer, f0ce2ce44528fbf748c9c1a103000000),
 Actor(Trainer, 7ba39c8f82ebd78c68e92ec903000000),
 Actor(Trainer, b95fe3494b7bc2d8f42abbba03000000)]

Next, we split the dataset into len(trainers) shards, ensuring that the shards are of equal size.

shards = ds.split(n=len(trainers), equal=True)
shards
[Dataset(num_blocks=1, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
 Dataset(num_blocks=1, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
 Dataset(num_blocks=2, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32}),
 Dataset(num_blocks=1, num_rows=687460, schema={vendor_id: object, pickup_at: datetime64[ns], dropoff_at: datetime64[ns], passenger_count: int8, trip_distance: float32, pickup_longitude: float32, pickup_latitude: float32, rate_code_id: object, dropoff_longitude: float32, dropoff_latitude: float32, payment_type: object, fare_amount: float32, extra: float32, tip_amount: float32, tolls_amount: float32, total_amount: float32})]

Finally, we simulate training, passing each shard to the corresponding trainer. The number of rows per shard is returned.

ray.get([w.train.remote(s) for w, s in zip(trainers, shards)])
[687460, 687460, 687460, 687460]
# Delete trainer actor handle references, which should terminate the actors.
del trainers

Parallel Batch InferenceΒΆ

After we’ve trained a model, we may want to perform batch (offline) inference on such a tabular dataset. With Ray Datasets, this is as easy as a ds.map_batches() call!

First, we define a callable class that will cache the loading of the model in its constructor.

import pandas as pd

def load_model():
    # A dummy model.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})
    
    return model

class BatchInferModel:
    def __init__(self):
        self.model = load_model()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)

BatchInferModel’s constructor will only be called once per actor worker when using the actor pool compute strategy in ds.map_batches().

ds.map_batches(BatchInferModel, batch_size=2048, compute="actors").take()
Map Progress (2 actors 1 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:05<00:00,  2.57s/it]
[PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False})]

If wanting to perform batch inference on GPUs, simply specify the number of GPUs you wish to provision for each batch inference worker.

Warning

This will only run successfully if your cluster has nodes with GPUs!

ds.map_batches(
    BatchInferModel,
    batch_size=256,
    #num_gpus=1,  # Uncomment this to run this on GPUs!
    compute="actors",
).take()
Map Progress (15 actors 4 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:21<00:00, 10.67s/it]
[PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False})]

We can also configure the autoscaling actor pool that this inference stage uses, setting upper and lower bounds on the actor pool size, and even tweak the batch prefetching vs. inference task queueing tradeoff.

from ray.data import ActorPoolStrategy

# The actor pool will have at least 2 workers and at most 8 workers.
strategy = ActorPoolStrategy(min_size=2, max_size=8)

ds.map_batches(
    BatchInferModel,
    batch_size=256,
    #num_gpus=1,  # Uncomment this to run this on GPUs!
    compute=strategy,
).take()
Map Progress (8 actors 0 pending): 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:21<00:00, 10.71s/it]
[PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': False}),
 PandasRow({'score': True}),
 PandasRow({'score': False})]