Transforming Datasets#

Datasets transformations take in datasets and produce new datasets. For example, map is a transformation that applies a user-defined function on each dataset record and returns a new dataset as the result. Datasets transformations can be composed to express a chain of computations.

Tip

If you’re performing common ML transformations like normalization and label encoding, create a Preprocessor instead. To learn more, read Using Preprocessors.

Transformations#

There are two main types of transformations:

  • One-to-one: each input block will contribute to only one output block, such as ds.map_batches().

  • All-to-all: input blocks can contribute to multiple output blocks, such as ds.random_shuffle().

Here is a table listing some common transformations supported by Ray Datasets.

Common Ray Datasets transformations.#

Transformation

Type

Description

ds.map_batches()

One-to-one

Apply a given function to batches of records of this dataset.

ds.add_column()

One-to-one

Apply a given function to batches of records to create a new column.

ds.drop_columns()

One-to-one

Drop the given columns from the dataset.

ds.split()

One-to-one

Split the dataset into N disjoint pieces.

ds.repartition(shuffle=False)

One-to-one

Repartition the dataset into N blocks, without shuffling the data.

ds.repartition(shuffle=True)

All-to-all

Repartition the dataset into N blocks, shuffling the data during repartition.

ds.random_shuffle()

All-to-all

Randomly shuffle the elements of this dataset.

ds.sort()

All-to-all

Sort the dataset by a sortkey.

ds.groupby()

All-to-all

Group the dataset by a groupkey.

Tip

Datasets also provides the convenience transformation methods ds.map(), ds.flat_map(), and ds.filter(), which are not vectorized (slower than ds.map_batches()), but may be useful for development.

The following is an example to make use of those transformation APIs for processing the Iris dataset.

import ray
import pandas

# Create a dataset from file with Iris data.
# Tip: "example://" is a convenient protocol to access the
# python/ray/data/examples/data directory.
ds = ray.data.read_csv("example://iris.csv")
# Dataset(num_blocks=1, num_rows=150,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})
ds.show(3)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.7, 'sepal.width': 3.2,
#     'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}

# Repartition the dataset to 5 blocks.
ds = ds.repartition(5)
# -> Repartition
#    +- Dataset(num_blocks=1, num_rows=150,
#               schema={sepal.length: float64, sepal.width: float64,
#                       petal.length: float64, petal.width: float64, variety: object})

# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
    return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]

# Map processing the dataset.
ds.map_batches(transform_batch).show()
# -> {'sepal.length': 5.2, 'sepal.width': 2.7,
#     'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 5.4, 'sepal.width': 3.0,
#     'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'}
# -> {'sepal.length': 4.9, 'sepal.width': 2.5,
#     'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'}

# Split the dataset into 2 datasets
ds.split(2)
# -> [Dataset(num_blocks=3, num_rows=90,
#             schema={sepal.length: double, sepal.width: double,
#                     petal.length: double, petal.width: double, variety: string}),
#     Dataset(num_blocks=2, num_rows=60,
#             schema={sepal.length: double, sepal.width: double,
#                     petal.length: double, petal.width: double, variety: string})]

# Sort the dataset by sepal.length.
ds = ds.sort("sepal.length")
ds.show(3)
# -> {'sepal.length': 4.3, 'sepal.width': 3.0,
#     'petal.length': 1.1, 'petal.width': 0.1, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 2.9,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 3.0,
#     'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}

# Shuffle the dataset.
ds = ds.random_shuffle()
ds.show(3)
# -> {'sepal.length': 6.7, 'sepal.width': 3.1,
#     'petal.length': 4.4, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 6.7, 'sepal.width': 3.3,
#     'petal.length': 5.7, 'petal.width': 2.1, 'variety': 'Virginica'}
# -> {'sepal.length': 4.5, 'sepal.width': 2.3,
#     'petal.length': 1.3, 'petal.width': 0.3, 'variety': 'Setosa'}

# Group by the variety.
ds.groupby("variety").count().show()
# -> {'variety': 'Setosa', 'count()': 50}
# -> {'variety': 'Versicolor', 'count()': 50}
# -> {'variety': 'Virginica', 'count()': 50}

Writing User-defined Functions (UDFs)#

User-defined functions (UDFs) are routines that apply on one row (e.g. .map()) or a batch of rows (e.g. .map_batches()) of a dataset. UDFs let you express your customized business logic in transformations. Here we will focus on .map_batches() as it’s the primary mapping API in Datasets.

Here are the basics that you need to know about UDFs:

  • A UDF can be either a function, a generator, or if using the actor compute strategy, a callable class.

  • Select the UDF input batch format using the batch_format argument.

  • The UDF output type determines the Dataset schema of the transformation result.

Types of UDFs#

There are three types of UDFs that you can use with Ray Data: Function UDFs, Callable Class UDFs, and Generator UDFs.

The most basic UDFs are functions that take in a batch or row as input, and returns a batch or row as output. See UDF Input Batch Format for the supported batch formats.

import ray
import pandas as pd

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
print(ds.default_batch_format())
# <class 'pandas.core.frame.DataFrame'>

# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df_batch: pd.DataFrame) -> pd.DataFrame:
    # Filter rows.
    df_batch = df_batch[df_batch["variety"] == "Versicolor"]
    # Add derived column.
    # Notice here that `df["sepal.length"].max()` is only the max value of the column
    # within a given batch (instead of globally)!!
    df_batch.loc[:, "normalized.sepal.length"] = df_batch["sepal.length"] / df_batch["sepal.length"].max()
    # Drop column.
    df_batch = df_batch.drop(columns=["sepal.length"])
    return df_batch

ds.map_batches(pandas_transform).show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}

With the actor compute strategy, you can use per-row and per-batch UDFs callable classes, i.e., classes that implement the __call__ magic method. You can use the constructor of the class for stateful setup, and it is only invoked once per worker actor.

Callable classes are useful if you need to load expensive state (such as a model) for the UDF. By using an actor class, you only need to load the state once in the beginning, rather than for each batch.

Note

These transformation APIs take the uninstantiated callable class as an argument, not an instance of the class.

import ray

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Pandas DataFrame batches.
class ModelUDF:
    def __init__(self):
        self.model = lambda df: df["sepal.length"] > 0.65

    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        # Filter rows.
        df = df[df["variety"] == "Versicolor"]
        # Apply model.
        df["output"] = self.model(df)
        return df

ds.map_batches(ModelUDF, compute="actors").show(2)
# -> {'sepal.length': 7.0, 'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'output': True}
# -> {'sepal.length': 6.4, 'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'output': False}`

UDFs can also be written as Python generators, yielding multiple outputs for a batch or row instead of a single item. Generator UDFs are useful when returning large objects. Instead of returning a very large output batch, fn can instead yield the output batch in chunks to avoid excessive heap memory usage.

Warning

When applying a generator UDF on individual rows, make sure to use the .flat_map() API and not the .map() API.

import ray
from typing import Iterator

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF to repeat the dataframe 100 times, in chunks of 20.
def repeat_dataframe(df: pd.DataFrame) -> Iterator[pd.DataFrame]:
    for _ in range(5):
        yield pd.concat([df]*20)

ds.map_batches(repeat_dataframe).show(2)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}

UDF Input Batch Format#

Choose the batch format of the data given to UDFs by setting the batch_format option of .map_batches(). Here is an overview of the available batch formats:

The “default” batch format presents data as follows for each Dataset type:

  • Tabular Datasets: Each batch will be a pandas.DataFrame. This may incur a conversion cost if the underlying Dataset block is not zero-copy convertible from an Arrow table.

    import ray
    import pandas as pd
    
    # Load dataset.
    ds = ray.data.read_csv("example://iris.csv")
    print(ds.default_batch_format())
    # <class 'pandas.core.frame.DataFrame'>
    
    # UDF as a function on Pandas DataFrame batches.
    def pandas_transform(df_batch: pd.DataFrame) -> pd.DataFrame:
        # Filter rows.
        df_batch = df_batch[df_batch["variety"] == "Versicolor"]
        # Add derived column.
        # Notice here that `df["sepal.length"].max()` is only the max value of the column
        # within a given batch (instead of globally)!!
        df_batch.loc[:, "normalized.sepal.length"] = df_batch["sepal.length"] / df_batch["sepal.length"].max()
        # Drop column.
        df_batch = df_batch.drop(columns=["sepal.length"])
        return df_batch
    
    ds.map_batches(pandas_transform).show(2)
    # -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
    #     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
    # -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
    #     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
    
  • Tensor Datasets (single-column): Each batch will be a single numpy.ndarray containing the single tensor column for this batch.

    import ray
    import numpy as np
    
    # Load dataset.
    ds = ray.data.range_tensor(1000, shape=(2, 2))
    print(ds.default_batch_format())
    # <class 'numpy.ndarray'>
    
    # UDF as a function on NumPy ndarray batches.
    def tensor_transform(arr: np.ndarray) -> np.ndarray:
        # Notice here that the ndarray is of shape (batch_size, 2, 2)
        # Multiply each element in the ndarray by a factor of 2
        return arr * 2
    
    ds.map_batches(tensor_transform).show(2)
    # [array([[0, 0],
    #         [0, 0]]),
    # array([[2, 2],
    #         [2, 2]])]
    
    
  • Simple Datasets: Each batch will be a Python list.

    import ray
    
    # Load dataset.
    ds = ray.data.range(1000)
    print(ds.default_batch_format())
    # <class 'list'>
    
    # UDF as a function on Python list batches.
    def list_transform(list) -> list:
        # Notice here that the list is of length batch_size
        # Multiply each element in the list by a factor of 2
        return [x * 2 for x in list]
    
    ds.map_batches(list_transform).show(2)
    # 0
    # 2
    
    

The "pandas" batch format presents batches in pandas.DataFrame format. If converting a simple dataset to Pandas DataFrame batches, a single-column dataframe with the column "__value__" will be created.

import ray
import pandas as pd

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
    # Filter rows.
    df = df[df["variety"] == "Versicolor"]
    # Add derived column.
    df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max()
    # Drop column.
    df = df.drop(columns=["sepal.length"])
    return df

ds.map_batches(pandas_transform).show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}

The "pyarrow" batch format presents batches in pyarrow.Table format. If converting a simple dataset to Arrow Table batches, a single-column table with the column "__value__" will be created.

import ray
import pyarrow as pa
import pyarrow.compute as pac

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Arrow Table batches.
def pyarrow_transform(batch: pa.Table) -> pa.Table:
    batch = batch.filter(pac.equal(batch["variety"], "Versicolor"))
    batch = batch.append_column(
        "normalized.sepal.length",
        pac.divide(batch["sepal.length"], pac.max(batch["sepal.length"])),
    )
    return batch.drop(["sepal.length"])

ds.map_batches(pyarrow_transform, batch_format="pyarrow").show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}

The "numpy" batch format presents batches in numpy.ndarray format as follows:

  • Tabular Datasets: Each batch will be a dictionary of NumPy ndarrays (Dict[str, np.ndarray]), with each key-value pair representing a column in the table.

  • Tensor Datasets (single-column): Each batch will be a single numpy.ndarray containing the single tensor column for this batch.

  • Simple Datasets: Each batch will be a single NumPy ndarray, where Datasets will attempt to convert each list-batch to an ndarray.

import ray
import numpy as np

# Load dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")

# UDF as a function on NumPy ndarray batches.
def normalize(arr: np.ndarray) -> np.ndarray:
    # Normalizes each image to [0, 1] range.
    mins = arr.min((1, 2))[:, np.newaxis, np.newaxis]
    maxes = arr.max((1, 2))[:, np.newaxis, np.newaxis]
    range_ = maxes - mins
    idx = np.where(range_ == 0)
    mins[idx] = 0
    range_[idx] = 1
    return (arr - mins) / range_

ds = ds.map_batches(normalize, batch_format="numpy")
# -> MapBatches(normalize)
#    +- Dataset(num_blocks=1, num_rows=3,
#               schema={__value__: <ArrowTensorType: shape=(28, 28), dtype=double>})

Converting between the underlying Datasets data representations (Arrow, Pandas, and Python lists) and the requested batch format ("default", "pandas", "pyarrow", "numpy") may incur data copies; which conversions cause data copying is given in the below table:

Data Format Conversion Costs#

Dataset Format x Batch Format

"default"

"pandas"

"numpy"

"pyarrow"

None

"pandas"

Zero-copy

Zero-copy

Copy*

Copy*

Zero-copy

"arrow"

Copy*

Copy*

Zero-copy*

Zero-copy

Zero-copy

"simple"

Copy

Copy

Copy

Copy

Copy

Note

* No copies occur when converting between Arrow, Pandas, and NumPy formats for columns represented in our tensor extension type (unless data is boolean). Copies always occur when converting boolean data from/to Arrow to/from Pandas/NumPy, since Arrow bitpacks boolean data while Pandas/NumPy does not.

Tip

Prefer using vectorized operations on the pandas.DataFrame, pyarrow.Table, and numpy.ndarray types for better performance. For example, suppose you want to compute the sum of a column in pandas.DataFrame: instead of iterating over each row of a batch and summing up values of that column, use df_batch["col_foo"].sum().

Tip

If the UDF for ds.map_batches() does not mutate its input, we can prevent an unnecessary data batch copy by specifying zero_copy_batch=True, which will provide the UDF with zero-copy, read-only batches. See the ds.map_batches() docstring for more information.

Batch UDF Output Types#

The following output types are allowed for batch UDFs (e.g., ds.map_batches()). The following describes how they are interpreted to create the transformation result:

Returning pd.DataFrame creates a Tabular dataset as the transformation result:

import ray
import pandas as pd
from typing import List

# Load dataset.
ds = ray.data.from_items(["test", "string", "teststring"])
# -> Dataset(num_blocks=1, num_rows=3, schema=<class 'str'>)

# Convert to Pandas.
def convert_to_pandas(text: List[str]) -> pd.DataFrame:
    return pd.DataFrame({"text": text}, dtype="string")

ds = ds.map_batches(convert_to_pandas)
# -> MapBatches(convert_to_pandas)
#    +- Dataset(num_blocks=3, num_rows=3, schema=<class 'str'>)

ds.show(2)
# -> {'text': 'test'}
# -> {'text': 'string'}

print(ds)
# -> Dataset(num_blocks=3, num_rows=3, schema={text: string})

Returning pa.Table creates a Tabular dataset as the transformation result:

import ray
import pyarrow as pa
from typing import List

# Load dataset.
ds = ray.data.from_items(["test", "string", "teststring"])
# -> Dataset(num_blocks=1, num_rows=3, schema=<class 'str'>)

# Convert to Arrow.
def convert_to_arrow(text: List[str]) -> pa.Table:
    return pa.table({"text": text})

ds = ds.map_batches(convert_to_arrow)
# -> MapBatches(convert_to_arrow)
#    +- Dataset(num_blocks=1, num_rows=3, schema=<class 'str'>)

ds.show(2)
# -> {'text': 'test'}
# -> {'text': 'string'}

print(ds)
# -> Dataset(num_blocks=3, num_rows=3, schema={text: string})

Returning np.ndarray creates a single-column Tensor dataset as the transformation result:

import ray
import pandas as pd
import numpy as np
from typing import Dict

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert to NumPy.
def convert_to_numpy(df: pd.DataFrame) -> np.ndarray:
    return df[["sepal.length", "sepal.width"]].to_numpy()

ds = ds.map_batches(convert_to_numpy)
# -> MapBatches(convert_to_numpy)
#    +- Dataset(
#           num_blocks=1,
#           num_rows=150,
#           schema={
#               sepal.length: double,
#               sepal.width: double,
#               petal.length: double,
#               petal.width: double,
#               variety: string,
#           },
#      )

ds.show(2)
# -> [5.1 3.5]
#    [4.9 3. ]

Returning Dict[str, np.ndarray] creates a multi-column Tensor dataset as the transformation result.

If a column tensor is 1-dimensional, then the native Arrow 1D list type is used; if a column tensor has 2 or more dimensions, then the Dataset tensor extension type to embed these n-dimensional tensors in the Arrow table.

import ray
import pandas as pd
import numpy as np
from typing import Dict

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert to dict of NumPy ndarrays.
def convert_to_numpy(df: pd.DataFrame) -> Dict[str, np.ndarray]:
    return {
        "sepal_len_and_width": df[["sepal.length", "sepal.width"]].to_numpy(),
        "petal_len": df["petal.length"].to_numpy(),
        "petal_width": df["petal.width"].to_numpy(),
    }

ds = ds.map_batches(convert_to_numpy)
# -> MapBatches(convert_to_numpy)
#    +- Dataset(
#           num_blocks=1,
#           num_rows=150,
#           schema={
#               sepal.length: double,
#               sepal.width: double,
#               petal.length: double,
#               petal.width: double,
#               variety: string,
#           },
#      )

ds.show(2)
# -> {'sepal_len_and_width': array([5.1, 3.5]), 'petal_len': 1.4, 'petal_width': 0.2}
# -> {'sepal_len_and_width': array([4.9, 3. ]), 'petal_len': 1.4, 'petal_width': 0.2}

Returning list creates a simple Python object dataset as the transformation result:

import ray
import pandas as pd
from typing import List

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert to list of dicts.
def convert_to_list(df: pd.DataFrame) -> List[dict]:
    return df.to_dict("records")

ds = ds.map_batches(convert_to_list)
# -> MapBatches(convert_to_list)
#    +- Dataset(
#           num_blocks=1,
#           num_rows=150,
#           schema={
#               sepal.length: double,
#               sepal.width: double,
#               petal.length: double,
#               petal.width: double,
#               variety: string,
#           },
#      )

ds.show(2)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5, 'petal.length': 1.4, 'petal.width': 0.2,
#     'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0, 'petal.length': 1.4, 'petal.width': 0.2,
#     'variety': 'Setosa'}

Row UDF Output Types#

The following output types are allowed for per-row UDFs (e.g., ds.map()):

Returning a dict of Arrow-compatible data types creates a Tabular dataset as the transformation result. If any dict values are not Arrow-compatible, then a simple Python object dataset will be created:

import ray
import pandas as pd
from typing import Dict

# Load dataset.
ds = ray.data.range(10)
# -> Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)

# Convert row to dict.
def row_to_dict(row: int) -> Dict[str, int]:
    return {"foo": row}

ds = ds.map(row_to_dict)
# -> Map
#    +- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)

ds.show(2)
# -> {'foo': 0}
# -> {'foo': 1}

Returning np.ndarray creates a single-column Tensor dataset as the transformation result:

import ray
import numpy as np
from typing import Dict

# Load dataset.
ds = ray.data.range(10)
# -> Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)

# Convert row to NumPy ndarray.
def row_to_numpy(row: int) -> np.ndarray:
    return np.full(shape=(2, 2), fill_value=row)

ds = ds.map(row_to_numpy)
# -> Map
#    +- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)

ds.show(2)
# -> [[0 0]
#     [0 0]]
#    [[1 1]
#     [1 1]]

Other return row types will create a simple Python object dataset as the transformation result:

import ray
from ray.data.row import TableRow
from typing import List

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        },
#   )

# Convert row to simple (opaque) row.
def map_row(row: TableRow) -> tuple:
    return tuple(row.items())

ds = ds.map(map_row)
# -> Map
#    +- Dataset(
#           num_blocks=1,
#           num_rows=150,
#           schema={
#               sepal.length: double,
#               sepal.width: double,
#               petal.length: double,
#               petal.width: double,
#               variety: string,
#          },
#     )

ds.show(2)
# -> (('sepal.length', 5.1), ('sepal.width', 3.5), ('petal.length', 1.4),
#     ('petal.width', 0.2), ('variety', 'Setosa'))
# -> (('sepal.length', 4.9), ('sepal.width', 3.0), ('petal.length', 1.4),
#     ('petal.width', 0.2), ('variety', 'Setosa'))

Configuring Batch Size#

ds.map_batches() is the canonical parallel transformation API for Datasets: it launches parallel tasks over the underlying Datasets blocks and maps UDFs over data batches within those tasks, allowing the UDF to implement vectorized operations on batches. An important parameter to set is batch_size, which controls the size of the batches provided to the UDF.

import ray
import pandas as pd

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
    # Filter rows.
    df = df[df["variety"] == "Versicolor"]
    # Add derived column.
    df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max()
    # Drop column.
    df = df.drop(columns=["sepal.length"])
    return df

# Have each batch that pandas_transform receives contain 10 rows.
ds = ds.map_batches(pandas_transform, batch_size=10)
# -> MapBatches(pandas_transform)
#    +- Dataset(
#           num_blocks=1,
#           num_rows=150,
#           schema={
#               sepal.length: double,
#               sepal.width: double,
#               petal.length: double,
#               petal.width: double,
#               variety: string,
#           },
#      )

ds.show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
#     'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
#     'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}

Increasing batch_size can result in faster execution by better leveraging vectorized operations and hardware, reducing batch slicing and concatenation overhead, and overall saturation of CPUs/GPUs, but will also result in higher memory utilization, which can lead to out-of-memory failures. If encountering OOMs, decreasing your batch_size may help.

Note

The default batch_size of 4096 may be too large for datasets with large rows (e.g. tables with many columns or a collection of large images).

If you specify a batch_size that’s larger than your Dataset blocks, Datasets will bundle multiple blocks together for a single task in order to better satisfy batch_size. If batch_size is a lot larger than your Dataset blocks (e.g. if your dataset was created with too large of a parallelism and/or the batch_size is set to too large of a value for your dataset), the number of parallel tasks may be less than expected.

If your Dataset blocks are smaller than your batch_size and you want to increase ds.map_batches() parallelism, decrease your batch_size to prevent this block bundling. If you think that your Dataset blocks are too small, try decreasing parallelism during the read to create larger blocks.

Note

The size of the batches provided to the UDF may be smaller than the provided batch_size if batch_size doesn’t evenly divide the block(s) sent to a given task.

Note

Block bundling (processing multiple blocks in a single task) will not occur if batch_size is not set; instead, each task will receive a single block. If a block is smaller than the default batch_size (4096), then the batch provided to the UDF in that task will the same size as the block, and will therefore be smaller than the default batch_size.

Compute Strategy#

Datasets transformations are executed by either Ray tasks or Ray actors across a Ray cluster. By default, Ray tasks are used (with compute="tasks"). For transformations that require expensive setup, it’s preferrable to use Ray actors, which are stateful and allow setup to be reused for efficiency. You can specify compute=ray.data.ActorPoolStrategy(min, max) and Ray will use an autoscaling actor pool of min to max actors to execute your transforms. For a fixed-size actor pool, specify ActorPoolStrategy(n, n).

The following is an example of using the Ray tasks and actors compute strategy for batch inference:

import ray
import pandas
import numpy
from ray.data import ActorPoolStrategy

# Dummy model to predict Iris variety.
def predict_iris(df: pandas.DataFrame) -> pandas.DataFrame:
    conditions = [
        (df["sepal.length"] < 5.0),
        (df["sepal.length"] >= 5.0) & (df["sepal.length"] < 6.0),
        (df["sepal.length"] >= 6.0)
    ]
    values = ["Setosa", "Versicolor", "Virginica"]
    return pandas.DataFrame({"predicted_variety": numpy.select(conditions, values)})

class IrisInferModel:
    def __init__(self):
        self._model = predict_iris

    def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame:
        return self._model(batch)

ds = ray.data.read_csv("example://iris.csv").repartition(10)

# Batch inference processing with Ray tasks (the default compute strategy).
predicted = ds.map_batches(predict_iris)

# Batch inference processing with Ray actors. Autoscale the actors between 3 and 10.
predicted = ds.map_batches(
    IrisInferModel, compute=ActorPoolStrategy(3, 10), batch_size=10)

Group-bys and aggregations#

Unlike mapping operations, groupbys and aggregations are global. Grouped aggregations are executed lazily. Global aggregations are executed eagerly and block until the aggregation has been computed.

ds: ray.data.Dataset = ray.data.from_items([
    {"A": x % 3, "B": 2 * x, "C": 3 * x}
    for x in range(10)])

# Group by the A column and calculate the per-group mean for B and C columns.
agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).materialize()
# -> Sort Sample: 100%|███████████████████████████████████████| 10/10 [00:01<00:00,  9.04it/s]
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 23.66it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 937.21it/s]
# -> Dataset(num_blocks=10, num_rows=3, schema={})
agg_ds.to_pandas()
# ->
#    A  mean(B)  mean(C)
# 0  0      9.0     13.5
# 1  1      8.0     12.0
# 2  2     10.0     15.0

# Global mean on B column.
ds.mean("B")
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 2851.91it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 319.69it/s]
# -> 9.0

# Global mean on multiple columns.
ds.mean(["B", "C"])
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1730.32it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 231.41it/s]
# -> {'mean(B)': 9.0, 'mean(C)': 13.5}

# Multiple global aggregations on multiple columns.
from ray.data.aggregate import Mean, Std
ds.aggregate(Mean("B"), Std("B", ddof=0), Mean("C"), Std("C", ddof=0))
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1568.73it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 133.51it/s]
# -> {'mean(A)': 0.9, 'std(A)': 0.8306623862918076, 'mean(B)': 9.0, 'std(B)': 5.744562646538029}

Combine aggreations with batch mapping to transform datasets using computed statistics. For example, you can efficiently standardize feature columns and impute missing values with calculated column means.

# Impute missing values with the column mean.
b_mean = ds.mean("B")
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 4054.03it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 359.22it/s]
# -> 9.0

def impute_b(df: pd.DataFrame):
    df["B"].fillna(b_mean)
    return df

ds = ds.map_batches(impute_b, batch_format="pandas")
# -> MapBatches(impute_b)
#    +- Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: int64, C: int64})

# Standard scaling of all feature columns.
stats = ds.aggregate(Mean("B"), Std("B"), Mean("C"), Std("C"))
# -> MapBatches(impute_b): 100%|██████████████████████████████| 10/10 [00:01<00:00,  7.16it/s]
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1260.99it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 128.77it/s]
# -> {'mean(B)': 9.0, 'std(B)': 6.0553007081949835, 'mean(C)': 13.5, 'std(C)': 9.082951062292475}

def batch_standard_scaler(df: pd.DataFrame):
    def column_standard_scaler(s: pd.Series):
        s_mean = stats[f"mean({s.name})"]
        s_std = stats[f"std({s.name})"]
        return (s - s_mean) / s_std

    cols = df.columns.difference(["A"])
    df.loc[:, cols] = df.loc[:, cols].transform(column_standard_scaler)
    return df

ds = ds.map_batches(batch_standard_scaler, batch_format="pandas")
ds.materialize()
# -> Map Progress: 100%|██████████████████████████████████████| 10/10 [00:00<00:00, 144.79it/s]
# -> Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: double, C: double})

Shuffling data#

Call Dataset.random_shuffle() to perform a global shuffle.

>>> import ray
>>> dataset = ray.data.range(10)
>>> dataset.random_shuffle().take_all()  
[7, 0, 9, 3, 5, 1, 4, 2, 8, 6]

For better performance, perform a local shuffle. Read Shuffling Data in the AIR user guide to learn more.