Transforming Datasets

Datasets transformations take in datasets and produce new datasets. For example, map is a transformation that applies a user-defined function (UDF) on each dataset record and returns a new dataset as the result. Datasets transformations can be composed to express a chain of computations.

Transformations

There are two main types of transformations:

  • One-to-one: each input block will contribute to only one output block, such as ds.map_batches().

  • All-to-all: input blocks can contribute to multiple output blocks, such as ds.random_shuffle().

Here is a table listing some common transformations supported by Ray Datasets.

Common Ray Datasets transformations.

Transformation

Type

Description

ds.map_batches()

One-to-one

Apply a given function to batches of records of this dataset.

ds.add_column()

One-to-one

Apply a given function to batches of records to create a new column.

ds.drop_columns()

One-to-one

Drop the given columns from the dataset.

ds.split()

One-to-one

Split the dataset into N disjoint pieces.

ds.repartition(shuffle=False)

One-to-one

Repartition the dataset into N blocks, without shuffling the data.

ds.repartition(shuffle=True)

All-to-all

Repartition the dataset into N blocks, shuffling the data during repartition.

ds.random_shuffle()

All-to-all

Randomly shuffle the elements of this dataset.

ds.sort()

All-to-all

Sort the dataset by a sortkey.

ds.groupby()

All-to-all

Group the dataset by a groupkey.

Tip

Datasets also provides the convenience transformation methods ds.map(), ds.flat_map(), and ds.filter(), which are not vectorized (slower than ds.map_batches()), but may be useful for development.

The following is an example to make use of those transformation APIs for processing the Iris dataset.

import ray
import pandas

# Create a dataset from file with Iris data.
# Tip: "example://" is a convenient protocol to access the
# python/ray/data/examples/data directory.
ds = ray.data.read_csv("example://iris.csv")
# Dataset(num_blocks=1, num_rows=150,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})
ds.show(3)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.7, 'sepal.width': 3.2,
#     'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}

# Repartition the dataset to 5 blocks.
ds = ds.repartition(5)
# Dataset(num_blocks=5, num_rows=150,
#         schema={sepal.length: double, sepal.width: double,
#                 petal.length: double, petal.width: double, variety: string})

# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
    return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]

# Map processing the dataset.
ds.map_batches(transform_batch).show()
# -> {'sepal.length': 5.2, 'sepal.width': 2.7,
#     'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 5.4, 'sepal.width': 3.0,
#     'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'}
# -> {'sepal.length': 4.9, 'sepal.width': 2.5,
#     'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'}

# Split the dataset into 2 datasets
ds.split(2)
# -> [Dataset(num_blocks=3, num_rows=90,
#             schema={sepal.length: double, sepal.width: double,
#                     petal.length: double, petal.width: double, variety: string}),
#     Dataset(num_blocks=2, num_rows=60,
#             schema={sepal.length: double, sepal.width: double,
#                     petal.length: double, petal.width: double, variety: string})]

# Sort the dataset by sepal.length.
ds = ds.sort("sepal.length")
ds.show(3)
# -> {'sepal.length': 4.3, 'sepal.width': 3.0,
#     'petal.length': 1.1, 'petal.width': 0.1, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 2.9,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 3.0,
#     'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}

# Shuffle the dataset.
ds = ds.random_shuffle()
ds.show(3)
# -> {'sepal.length': 6.7, 'sepal.width': 3.1,
#     'petal.length': 4.4, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 6.7, 'sepal.width': 3.3,
#     'petal.length': 5.7, 'petal.width': 2.1, 'variety': 'Virginica'}
# -> {'sepal.length': 4.5, 'sepal.width': 2.3,
#     'petal.length': 1.3, 'petal.width': 0.3, 'variety': 'Setosa'}

# Group by the variety.
ds.groupby("variety").count().show()
# -> {'variety': 'Setosa', 'count()': 50}
# -> {'variety': 'Versicolor', 'count()': 50}
# -> {'variety': 'Virginica', 'count()': 50}

Writing UDFs

User-defined functions (UDFs) are routines that apply on one row (e.g. .map()) or a batch of rows (e.g. .map_batches()) of a dataset. UDFs let you express your customized business logic in transformations. Here we will focus on .map_batches() as it’s the primary mapping API in Datasets.

Here are the basics that you need to know about UDFs:

Callable Class UDFs

When using the actor compute strategy, per-row and per-batch UDFs can also be callable classes, i.e. classes that implement the __call__ magic method. The constructor of the class can be used for stateful setup, and will be only invoked once per worker actor.

Note

These transformation APIs take the uninstantiated callable class as an argument, not an instance of the class.

import ray

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Pandas DataFrame batches.
class ModelUDF:
    def __init__(self):
        self.model = lambda df: df["sepal.length"] > 0.65

    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        # Filter rows.
        df = df[df["variety"] == "Versicolor"]
        # Apply model.
        df["output"] = self.model(df)
        return df

ds.map_batches(ModelUDF, compute="actors").show(2)
# -> {'sepal.length': 7.0, 'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'output': True}
# -> {'sepal.length': 6.4, 'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'output': False}`

UDF Input Batch Format

Choose the batch format of the data given to UDFs by setting the batch_format option of .map_batches(). Here is an overview of the available batch formats:

The “native” batch format presents data as follows for each Dataset type:

  • Tabular Datasets: Each batch will be a pandas.DataFrame. This may incur a conversion cost if the underlying Dataset block is not zero-copy convertible from an Arrow table.

  • Tensor Datasets (single-column): Each batch will be a single numpy.ndarray containing the single tensor column for this batch.

  • Simple Datasets: Each batch will be a Python list.

import ray
import pandas as pd

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
    # Filter rows.
    df = df[df["variety"] == "Versicolor"]
    # Add derived column.
    df["normalized.sepal.length"] =  df["sepal.length"] / df["sepal.length"].max()
    # Drop column.
    df = df.drop(columns=["sepal.length"])
    return df

ds.map_batches(pandas_transform).show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}

The "pandas" batch format presents batches in pandas.DataFrame format. If converting a simple dataset to Pandas DataFrame batches, a single-column dataframe with the column "__value__" will be created.

import ray
import pandas as pd

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
    # Filter rows.
    df = df[df["variety"] == "Versicolor"]
    # Add derived column.
    df["normalized.sepal.length"] =  df["sepal.length"] / df["sepal.length"].max()
    # Drop column.
    df = df.drop(columns=["sepal.length"])
    return df

ds.map_batches(pandas_transform).show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}

The "pyarrow" batch format presents batches in pyarrow.Table format. If converting a simple dataset to Arrow Table batches, a single-column table with the column "__value__" will be created.

import ray
import pyarrow as pa
import pyarrow.compute as pac

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Arrow Table batches.
def pyarrow_transform(batch: pa.Table) -> pa.Table:
    batch = batch.filter(pac.equal(batch["variety"], "Versicolor"))
    batch = batch.append_column(
        "normalized.sepal.length",
        pac.divide(batch["sepal.length"], pac.max(batch["sepal.length"])),
    )
    return batch.drop(["sepal.length"])

ds.map_batches(pyarrow_transform, batch_format="pyarrow").show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}

The "numpy" batch format presents batches in numpy.ndarray format as follows:

  • Tabular Datasets: Each batch will be a dictionary of NumPy ndarrays (Dict[str, np.ndarray]), with each key-value pair representing a column in the table.

  • Tensor Datasets (single-column): Each batch will be a single numpy.ndarray containing the single tensor column for this batch.

  • Simple Datasets: Each batch will be a single NumPy ndarray, where Datasets will attempt to convert each list-batch to an ndarray.

import ray
import numpy as np

# Load dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")

# UDF as a function on NumPy ndarray batches.
def normalize(arr: np.ndarray) -> np.ndarray:
    # Normalizes each image to [0, 1] range.
    mins = arr.min((1, 2))[:, np.newaxis, np.newaxis]
    maxes = arr.max((1, 2))[:, np.newaxis, np.newaxis]
    range_ = maxes - mins
    idx = np.where(range_ == 0)
    mins[idx] = 0
    range_[idx] = 1
    return (arr - mins) / range_

ds.map_batches(normalize, batch_format="numpy")
# -> Dataset(
#        num_blocks=1,
#        num_rows=3,
#        schema={__value__: <ArrowTensorType: shape=(28, 28), dtype=double>}
#    )

Tip

Prefer using vectorized operations on the pandas.DataFrame, pyarrow.Table, and numpy.ndarray types for better performance. For example, suppose you want to compute the sum of a column in pandas.DataFrame: instead of iterating over each row of a batch and summing up values of that column, use df_batch["col_foo"].sum().

Batch UDF Output Types

The following output types are allowed for batch UDFs (e.g., ds.map_batches()). The following describes how they are interpreted to create the transformation result:

Returning pd.DataFrame creates a Tabular dataset as the transformation result:

import ray
import pandas as pd
from typing import List

# Load dataset.
ds = ray.data.read_text("example://sms_spam_collection_subset.txt")
# -> Dataset(num_blocks=1, num_rows=10, schema=<class 'str'>)

# Convert to Pandas.
def convert_to_pandas(text: List[str]) -> pd.DataFrame:
    return pd.DataFrame({"text": text})

ds = ds.map_batches(convert_to_pandas)
# -> Dataset(num_blocks=1, num_rows=10, schema={text: object})

ds.show(2)
# -> {
#        'text': (
#            'ham\tGo until jurong point, crazy.. Available only in bugis n great '
#            'world la e buffet... Cine there got amore wat...'
#        ),
#    }
# -> {'text': 'ham\tOk lar... Joking wif u oni...'}

Returning pa.Table creates a Tabular dataset as the transformation result:

import ray
import pyarrow as pa
from typing import List

# Load dataset.
ds = ray.data.read_text("example://sms_spam_collection_subset.txt")
# -> Dataset(num_blocks=1, num_rows=10, schema=<class 'str'>)

# Convert to Arrow.
def convert_to_arrow(text: List[str]) -> pa.Table:
    return pa.table({"text": text})

ds = ds.map_batches(convert_to_arrow)
# -> Dataset(num_blocks=1, num_rows=10, schema={text: object})

ds.show(2)
# -> {
#        'text': (
#            'ham\tGo until jurong point, crazy.. Available only in bugis n great '
#            'world la e buffet... Cine there got amore wat...'
#        ),
#    }
# -> {'text': 'ham\tOk lar... Joking wif u oni...'}

Returning np.ndarray creates a single-column Tensor dataset as the transformation result:

import ray
import pandas as pd
import numpy as np
from typing import Dict

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert to NumPy.
def convert_to_numpy(df: pd.DataFrame) -> np.ndarray:
    return df[["sepal.length", "sepal.width"]].to_numpy()

ds = ds.map_batches(convert_to_numpy)
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={__value__: <ArrowTensorType: shape=(2,), dtype=double>},
#    )

ds.show(2)
# -> [5.1 3.5]
#    [4.9 3. ]

Returning Dict[str, np.ndarray] creates a multi-column Tensor dataset as the transformation result.

If a column tensor is 1-dimensional, then the native Arrow 1D list type is used; if a column tensor has 2 or more dimensions, then the Dataset tensor extension type to embed these n-dimensional tensors in the Arrow table.

import ray
import pandas as pd
import numpy as np
from typing import Dict

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert to dict of NumPy ndarrays.
def convert_to_numpy(df: pd.DataFrame) -> Dict[str, np.ndarray]:
    return {
        "sepal_len_and_width": df[["sepal.length", "sepal.width"]].to_numpy(),
        "petal_len": df["petal.length"].to_numpy(),
        "petal_width": df["petal.width"].to_numpy(),
    }

ds = ds.map_batches(convert_to_numpy)
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal_len_and_width: <ArrowTensorType: shape=(2,), dtype=double>,
#            petal_len: double,
#            petal_width: double,
#        },
#    )

ds.show(2)
# -> {'sepal_len_and_width': array([5.1, 3.5]), 'petal_len': 1.4, 'petal_width': 0.2}
# -> {'sepal_len_and_width': array([4.9, 3. ]), 'petal_len': 1.4, 'petal_width': 0.2}

Returning list creates a simple Python object dataset as the transformation result:

import ray
import pandas as pd
from typing import List

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert to list of dicts.
def convert_to_list(df: pd.DataFrame) -> List[dict]:
    return df.to_dict("records")

ds = ds.map_batches(convert_to_list)
# -> Dataset(num_blocks=1, num_rows=150, schema=<class 'dict'>)

ds.show(2)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5, 'petal.length': 1.4, 'petal.width': 0.2,
#     'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0, 'petal.length': 1.4, 'petal.width': 0.2,
#     'variety': 'Setosa'}

Row UDF Output Types

The following output types are allowed for per-row UDFs (e.g., ds.map()):

Returning a dict of Arrow-compatible data types creates a Tabular dataset as the transformation result. If any dict values are not Arrow-compatible, then a simple Python object dataset will be created:

import ray
import pandas as pd
from typing import Dict

# Load dataset.
ds = ray.data.range(10)
# -> Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)

# Convert row to dict.
def row_to_dict(row: int) -> Dict[str, int]:
    return {"foo": row}

ds = ds.map(row_to_dict)
# -> Dataset(num_blocks=10, num_rows=10, schema={foo: int64})

ds.show(2)
# -> {'foo': 0}
# -> {'foo': 1}

Returning np.ndarray creates a single-column Tensor dataset as the transformation result:

import ray
import numpy as np
from typing import Dict

# Load dataset.
ds = ray.data.range(10)
# -> Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)

# Convert row to NumPy ndarray.
def row_to_numpy(row: int) -> np.ndarray:
    return np.full(shape=(2, 2), fill_value=row)

ds = ds.map(row_to_numpy)
# -> Dataset(
#        num_blocks=10,
#        num_rows=10,
#        schema={__value__: <ArrowTensorType: shape=(2, 2), dtype=int64>},
#    )

ds.show(2)
# -> [[0 0]
#     [0 0]]
#    [[1 1]
#     [1 1]]

Other return row types will create a simple Python object dataset as the transformation result:

import ray
from typing import List

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert row to simple (opaque) row.
def map_row(row: TableRow) -> tuple:
    return tuple(row.items())

ds = ds.map(map_row)
# -> Dataset(num_blocks=1, num_rows=150, schema=<class 'tuple'>)

ds.show(2)
# -> (('sepal.length', 5.1), ('sepal.width', 3.5), ('petal.length', 1.4),
#     ('petal.width', 0.2), ('variety', 'Setosa'))
# -> (('sepal.length', 4.9), ('sepal.width', 3.0), ('petal.length', 1.4),
#     ('petal.width', 0.2), ('variety', 'Setosa'))

Compute Strategy

Datasets transformations are executed by either Ray tasks or Ray actors across a Ray cluster. By default, Ray tasks are used (with compute="tasks"). For transformations that require expensive setup, it’s preferrable to use Ray actors, which are stateful and allow setup to be reused for efficiency. You can specify compute=ray.data.ActorPoolStrategy(min, max) and Ray will use an autoscaling actor pool of min to max actors to execute your transforms. For a fixed-size actor pool, specify ActorPoolStrategy(n, n).

The following is an example of using the Ray tasks and actors compute strategy for batch inference:

import ray
import pandas
import numpy
from ray.data import ActorPoolStrategy

# Dummy model to predict Iris variety.
def predict_iris(df: pandas.DataFrame) -> pandas.DataFrame:
    conditions = [
        (df["sepal.length"] < 5.0),
        (df["sepal.length"] >= 5.0) & (df["sepal.length"] < 6.0),
        (df["sepal.length"] >= 6.0)
    ]
    values = ["Setosa", "Versicolor", "Virginica"]
    return pandas.DataFrame({"predicted_variety": numpy.select(conditions, values)})

class IrisInferModel:
    def __init__(self):
        self._model = predict_iris

    def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame:
        return self._model(batch)

ds = ray.data.read_csv("example://iris.csv").repartition(10)

# Batch inference processing with Ray tasks (the default compute strategy).
predicted = ds.map_batches(predict_iris)

# Batch inference processing with Ray actors. Autoscale the actors between 3 and 10.
predicted = ds.map_batches(
    IrisInferModel, compute=ActorPoolStrategy(3, 10), batch_size=10)