Transforming Datasets
Contents
Transforming Datasets#
Datasets transformations take in datasets and produce new datasets. For example, map is a transformation that applies a user-defined function on each dataset record and returns a new dataset as the result. Datasets transformations can be composed to express a chain of computations.
Tip
If you’re performing common ML transformations like normalization and label
encoding, create a Preprocessor
instead. To learn
more, read Using Preprocessors.
Transformations#
There are two main types of transformations:
One-to-one: each input block will contribute to only one output block, such as
ds.map_batches()
.All-to-all: input blocks can contribute to multiple output blocks, such as
ds.random_shuffle()
.
Here is a table listing some common transformations supported by Ray Datasets.
Transformation |
Type |
Description |
---|---|---|
One-to-one |
Apply a given function to batches of records of this dataset. |
|
One-to-one |
Apply a given function to batches of records to create a new column. |
|
One-to-one |
Drop the given columns from the dataset. |
|
One-to-one |
Split the dataset into N disjoint pieces.
|
|
One-to-one |
Repartition the dataset into N blocks, without shuffling the data.
|
|
All-to-all |
Repartition the dataset into N blocks, shuffling the data during repartition.
|
|
All-to-all |
Randomly shuffle the elements of this dataset.
|
|
All-to-all |
Sort the dataset by a sortkey.
|
|
All-to-all |
Group the dataset by a groupkey.
|
Tip
Datasets also provides the convenience transformation methods ds.map()
,
ds.flat_map()
, and ds.filter()
,
which are not vectorized (slower than ds.map_batches()
), but
may be useful for development.
The following is an example to make use of those transformation APIs for processing the Iris dataset.
import ray
import pandas
# Create a dataset from file with Iris data.
# Tip: "example://" is a convenient protocol to access the
# python/ray/data/examples/data directory.
ds = ray.data.read_csv("example://iris.csv")
# Dataset(num_blocks=1, num_rows=150,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
ds.show(3)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.7, 'sepal.width': 3.2,
# 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}
# Repartition the dataset to 5 blocks.
ds = ds.repartition(5)
# -> Repartition
# +- Dataset(num_blocks=1, num_rows=150,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]
# Map processing the dataset.
ds.map_batches(transform_batch).show()
# -> {'sepal.length': 5.2, 'sepal.width': 2.7,
# 'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 5.4, 'sepal.width': 3.0,
# 'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'}
# -> {'sepal.length': 4.9, 'sepal.width': 2.5,
# 'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'}
# Split the dataset into 2 datasets
ds.split(2)
# -> [Dataset(num_blocks=3, num_rows=90,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string}),
# Dataset(num_blocks=2, num_rows=60,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string})]
# Sort the dataset by sepal.length.
ds = ds.sort("sepal.length")
ds.show(3)
# -> {'sepal.length': 4.3, 'sepal.width': 3.0,
# 'petal.length': 1.1, 'petal.width': 0.1, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 2.9,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.4, 'sepal.width': 3.0,
# 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}
# Shuffle the dataset.
ds = ds.random_shuffle()
ds.show(3)
# -> {'sepal.length': 6.7, 'sepal.width': 3.1,
# 'petal.length': 4.4, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 6.7, 'sepal.width': 3.3,
# 'petal.length': 5.7, 'petal.width': 2.1, 'variety': 'Virginica'}
# -> {'sepal.length': 4.5, 'sepal.width': 2.3,
# 'petal.length': 1.3, 'petal.width': 0.3, 'variety': 'Setosa'}
# Group by the variety.
ds.groupby("variety").count().show()
# -> {'variety': 'Setosa', 'count()': 50}
# -> {'variety': 'Versicolor', 'count()': 50}
# -> {'variety': 'Virginica', 'count()': 50}
Writing User-defined Functions (UDFs)#
User-defined functions (UDFs) are routines that apply on one row (e.g.
.map()
) or a batch of rows (e.g.
.map_batches()
) of a dataset. UDFs let you
express your customized business logic in transformations. Here we will focus on
.map_batches()
as it’s the primary mapping
API in Datasets.
Here are the basics that you need to know about UDFs:
A UDF can be either a function, a generator, or if using the actor compute strategy, a callable class.
Select the UDF input batch format using the
batch_format
argument.The UDF output type determines the Dataset schema of the transformation result.
Types of UDFs#
There are three types of UDFs that you can use with Ray Data: Function UDFs, Callable Class UDFs, and Generator UDFs.
The most basic UDFs are functions that take in a batch or row as input, and returns a batch or row as output. See UDF Input Batch Format for the supported batch formats.
import ray
import pandas as pd
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
print(ds.default_batch_format())
# <class 'pandas.core.frame.DataFrame'>
# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df_batch: pd.DataFrame) -> pd.DataFrame:
# Filter rows.
df_batch = df_batch[df_batch["variety"] == "Versicolor"]
# Add derived column.
# Notice here that `df["sepal.length"].max()` is only the max value of the column
# within a given batch (instead of globally)!!
df_batch.loc[:, "normalized.sepal.length"] = df_batch["sepal.length"] / df_batch["sepal.length"].max()
# Drop column.
df_batch = df_batch.drop(columns=["sepal.length"])
return df_batch
ds.map_batches(pandas_transform).show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
With the actor compute strategy, you can use per-row and per-batch UDFs
callable classes, i.e., classes that implement the __call__
magic method. You
can use the constructor of the class for stateful setup, and it is only invoked once
per worker actor.
Callable classes are useful if you need to load expensive state (such as a model) for the UDF. By using an actor class, you only need to load the state once in the beginning, rather than for each batch.
Note
These transformation APIs take the uninstantiated callable class as an argument, not an instance of the class.
import ray
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# UDF as a function on Pandas DataFrame batches.
class ModelUDF:
def __init__(self):
self.model = lambda df: df["sepal.length"] > 0.65
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
# Filter rows.
df = df[df["variety"] == "Versicolor"]
# Apply model.
df["output"] = self.model(df)
return df
ds.map_batches(ModelUDF, compute="actors").show(2)
# -> {'sepal.length': 7.0, 'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'output': True}
# -> {'sepal.length': 6.4, 'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'output': False}`
UDFs can also be written as Python generators, yielding multiple outputs for a batch or row instead of a single item. Generator UDFs are useful when returning large objects. Instead of returning a very large output batch, fn
can instead yield the output batch in chunks to avoid excessive heap memory usage.
Warning
When applying a generator UDF on individual rows, make sure to use the .flat_map()
API and not the .map()
API.
import ray
from typing import Iterator
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# UDF to repeat the dataframe 100 times, in chunks of 20.
def repeat_dataframe(df: pd.DataFrame) -> Iterator[pd.DataFrame]:
for _ in range(5):
yield pd.concat([df]*20)
ds.map_batches(repeat_dataframe).show(2)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0, 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
UDF Input Batch Format#
Choose the batch format of the data given to UDFs
by setting the batch_format
option of .map_batches()
.
Here is an overview of the available batch formats:
The “default” batch format presents data as follows for each Dataset type:
Tabular Datasets: Each batch will be a pandas.DataFrame. This may incur a conversion cost if the underlying Dataset block is not zero-copy convertible from an Arrow table.
import ray import pandas as pd # Load dataset. ds = ray.data.read_csv("example://iris.csv") print(ds.default_batch_format()) # <class 'pandas.core.frame.DataFrame'> # UDF as a function on Pandas DataFrame batches. def pandas_transform(df_batch: pd.DataFrame) -> pd.DataFrame: # Filter rows. df_batch = df_batch[df_batch["variety"] == "Versicolor"] # Add derived column. # Notice here that `df["sepal.length"].max()` is only the max value of the column # within a given batch (instead of globally)!! df_batch.loc[:, "normalized.sepal.length"] = df_batch["sepal.length"] / df_batch["sepal.length"].max() # Drop column. df_batch = df_batch.drop(columns=["sepal.length"]) return df_batch ds.map_batches(pandas_transform).show(2) # -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4, # 'variety': 'Versicolor', 'normalized.sepal.length': 1.0} # -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5, # 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
Tensor Datasets (single-column): Each batch will be a single numpy.ndarray containing the single tensor column for this batch.
import ray import numpy as np # Load dataset. ds = ray.data.range_tensor(1000, shape=(2, 2)) print(ds.default_batch_format()) # <class 'numpy.ndarray'> # UDF as a function on NumPy ndarray batches. def tensor_transform(arr: np.ndarray) -> np.ndarray: # Notice here that the ndarray is of shape (batch_size, 2, 2) # Multiply each element in the ndarray by a factor of 2 return arr * 2 ds.map_batches(tensor_transform).show(2) # [array([[0, 0], # [0, 0]]), # array([[2, 2], # [2, 2]])]
Simple Datasets: Each batch will be a Python list.
import ray # Load dataset. ds = ray.data.range(1000) print(ds.default_batch_format()) # <class 'list'> # UDF as a function on Python list batches. def list_transform(list) -> list: # Notice here that the list is of length batch_size # Multiply each element in the list by a factor of 2 return [x * 2 for x in list] ds.map_batches(list_transform).show(2) # 0 # 2
The "pandas"
batch format presents batches in
pandas.DataFrame
format. If converting a simple dataset to Pandas DataFrame batches, a single-column
dataframe with the column "__value__"
will be created.
import ray
import pandas as pd
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
# Filter rows.
df = df[df["variety"] == "Versicolor"]
# Add derived column.
df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max()
# Drop column.
df = df.drop(columns=["sepal.length"])
return df
ds.map_batches(pandas_transform).show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
The "pyarrow"
batch format presents batches in
pyarrow.Table
format. If converting a simple dataset to Arrow Table batches, a single-column table
with the column "__value__"
will be created.
import ray
import pyarrow as pa
import pyarrow.compute as pac
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# UDF as a function on Arrow Table batches.
def pyarrow_transform(batch: pa.Table) -> pa.Table:
batch = batch.filter(pac.equal(batch["variety"], "Versicolor"))
batch = batch.append_column(
"normalized.sepal.length",
pac.divide(batch["sepal.length"], pac.max(batch["sepal.length"])),
)
return batch.drop(["sepal.length"])
ds.map_batches(pyarrow_transform, batch_format="pyarrow").show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
The "numpy"
batch format presents batches in
numpy.ndarray
format as follows:
Tabular Datasets: Each batch will be a dictionary of NumPy ndarrays (
Dict[str, np.ndarray]
), with each key-value pair representing a column in the table.Tensor Datasets (single-column): Each batch will be a single numpy.ndarray containing the single tensor column for this batch.
Simple Datasets: Each batch will be a single NumPy ndarray, where Datasets will attempt to convert each list-batch to an ndarray.
import ray
import numpy as np
# Load dataset.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# UDF as a function on NumPy ndarray batches.
def normalize(arr: np.ndarray) -> np.ndarray:
# Normalizes each image to [0, 1] range.
mins = arr.min((1, 2))[:, np.newaxis, np.newaxis]
maxes = arr.max((1, 2))[:, np.newaxis, np.newaxis]
range_ = maxes - mins
idx = np.where(range_ == 0)
mins[idx] = 0
range_[idx] = 1
return (arr - mins) / range_
ds = ds.map_batches(normalize, batch_format="numpy")
# -> MapBatches(normalize)
# +- Dataset(num_blocks=1, num_rows=3,
# schema={__value__: <ArrowTensorType: shape=(28, 28), dtype=double>})
Converting between the underlying Datasets data representations (Arrow, Pandas, and
Python lists) and the requested batch format ("default"
, "pandas"
,
"pyarrow"
, "numpy"
) may incur data copies; which conversions cause data copying
is given in the below table:
Dataset Format x Batch Format |
|
|
|
|
|
---|---|---|---|---|---|
|
Zero-copy |
Zero-copy |
Copy* |
Copy* |
Zero-copy |
|
Copy* |
Copy* |
Zero-copy* |
Zero-copy |
Zero-copy |
|
Copy |
Copy |
Copy |
Copy |
Copy |
Note
* No copies occur when converting between Arrow, Pandas, and NumPy formats for columns represented in our tensor extension type (unless data is boolean). Copies always occur when converting boolean data from/to Arrow to/from Pandas/NumPy, since Arrow bitpacks boolean data while Pandas/NumPy does not.
Tip
Prefer using vectorized operations on the pandas.DataFrame
,
pyarrow.Table
, and numpy.ndarray
types for better performance. For
example, suppose you want to compute the sum of a column in pandas.DataFrame
:
instead of iterating over each row of a batch and summing up values of that column,
use df_batch["col_foo"].sum()
.
Tip
If the UDF for ds.map_batches()
does not
mutate its input, we can prevent an unnecessary data batch copy by specifying
zero_copy_batch=True
, which will provide the UDF with zero-copy, read-only
batches. See the ds.map_batches()
docstring for
more information.
Batch UDF Output Types#
The following output types are allowed for batch UDFs (e.g.,
ds.map_batches()
). The following describes
how they are interpreted to create the transformation result:
Returning pd.DataFrame
creates a Tabular dataset as the transformation result:
import ray
import pandas as pd
from typing import List
# Load dataset.
ds = ray.data.from_items(["test", "string", "teststring"])
# -> Dataset(num_blocks=1, num_rows=3, schema=<class 'str'>)
# Convert to Pandas.
def convert_to_pandas(text: List[str]) -> pd.DataFrame:
return pd.DataFrame({"text": text}, dtype="string")
ds = ds.map_batches(convert_to_pandas)
# -> MapBatches(convert_to_pandas)
# +- Dataset(num_blocks=3, num_rows=3, schema=<class 'str'>)
ds.show(2)
# -> {'text': 'test'}
# -> {'text': 'string'}
print(ds)
# -> Dataset(num_blocks=3, num_rows=3, schema={text: string})
Returning pa.Table
creates a Tabular dataset as the transformation result:
import ray
import pyarrow as pa
from typing import List
# Load dataset.
ds = ray.data.from_items(["test", "string", "teststring"])
# -> Dataset(num_blocks=1, num_rows=3, schema=<class 'str'>)
# Convert to Arrow.
def convert_to_arrow(text: List[str]) -> pa.Table:
return pa.table({"text": text})
ds = ds.map_batches(convert_to_arrow)
# -> MapBatches(convert_to_arrow)
# +- Dataset(num_blocks=1, num_rows=3, schema=<class 'str'>)
ds.show(2)
# -> {'text': 'test'}
# -> {'text': 'string'}
print(ds)
# -> Dataset(num_blocks=3, num_rows=3, schema={text: string})
Returning np.ndarray
creates a single-column Tensor dataset as the transformation result:
import ray
import pandas as pd
import numpy as np
from typing import Dict
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
# Convert to NumPy.
def convert_to_numpy(df: pd.DataFrame) -> np.ndarray:
return df[["sepal.length", "sepal.width"]].to_numpy()
ds = ds.map_batches(convert_to_numpy)
# -> MapBatches(convert_to_numpy)
# +- Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
ds.show(2)
# -> [5.1 3.5]
# [4.9 3. ]
Returning Dict[str, np.ndarray]
creates a multi-column Tensor dataset as the transformation result.
If a column tensor is 1-dimensional, then the native Arrow 1D list type is used; if a column tensor has 2 or more dimensions, then the Dataset tensor extension type to embed these n-dimensional tensors in the Arrow table.
import ray
import pandas as pd
import numpy as np
from typing import Dict
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
# Convert to dict of NumPy ndarrays.
def convert_to_numpy(df: pd.DataFrame) -> Dict[str, np.ndarray]:
return {
"sepal_len_and_width": df[["sepal.length", "sepal.width"]].to_numpy(),
"petal_len": df["petal.length"].to_numpy(),
"petal_width": df["petal.width"].to_numpy(),
}
ds = ds.map_batches(convert_to_numpy)
# -> MapBatches(convert_to_numpy)
# +- Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
ds.show(2)
# -> {'sepal_len_and_width': array([5.1, 3.5]), 'petal_len': 1.4, 'petal_width': 0.2}
# -> {'sepal_len_and_width': array([4.9, 3. ]), 'petal_len': 1.4, 'petal_width': 0.2}
Returning list
creates a simple Python object dataset as the transformation result:
import ray
import pandas as pd
from typing import List
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
# Convert to list of dicts.
def convert_to_list(df: pd.DataFrame) -> List[dict]:
return df.to_dict("records")
ds = ds.map_batches(convert_to_list)
# -> MapBatches(convert_to_list)
# +- Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
ds.show(2)
# -> {'sepal.length': 5.1, 'sepal.width': 3.5, 'petal.length': 1.4, 'petal.width': 0.2,
# 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0, 'petal.length': 1.4, 'petal.width': 0.2,
# 'variety': 'Setosa'}
Row UDF Output Types#
The following output types are allowed for per-row UDFs (e.g.,
ds.map()
):
Returning a dict
of Arrow-compatible data types creates a Tabular dataset
as the transformation result. If any dict values are not Arrow-compatible, then
a simple Python object dataset will be created:
import ray
import pandas as pd
from typing import Dict
# Load dataset.
ds = ray.data.range(10)
# -> Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)
# Convert row to dict.
def row_to_dict(row: int) -> Dict[str, int]:
return {"foo": row}
ds = ds.map(row_to_dict)
# -> Map
# +- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)
ds.show(2)
# -> {'foo': 0}
# -> {'foo': 1}
Returning np.ndarray
creates a single-column Tensor dataset as the transformation result:
import ray
import numpy as np
from typing import Dict
# Load dataset.
ds = ray.data.range(10)
# -> Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)
# Convert row to NumPy ndarray.
def row_to_numpy(row: int) -> np.ndarray:
return np.full(shape=(2, 2), fill_value=row)
ds = ds.map(row_to_numpy)
# -> Map
# +- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)
ds.show(2)
# -> [[0 0]
# [0 0]]
# [[1 1]
# [1 1]]
Other return row types will create a simple Python object dataset as the transformation result:
import ray
from ray.data.row import TableRow
from typing import List
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
# Convert row to simple (opaque) row.
def map_row(row: TableRow) -> tuple:
return tuple(row.items())
ds = ds.map(map_row)
# -> Map
# +- Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
ds.show(2)
# -> (('sepal.length', 5.1), ('sepal.width', 3.5), ('petal.length', 1.4),
# ('petal.width', 0.2), ('variety', 'Setosa'))
# -> (('sepal.length', 4.9), ('sepal.width', 3.0), ('petal.length', 1.4),
# ('petal.width', 0.2), ('variety', 'Setosa'))
Configuring Batch Size#
ds.map_batches()
is the canonical parallel
transformation API for Datasets: it launches parallel tasks over the underlying Datasets
blocks and maps UDFs over data batches within those tasks, allowing the UDF to
implement vectorized operations on batches. An important parameter to
set is batch_size
, which controls the size of the batches provided to the UDF.
import ray
import pandas as pd
# Load dataset.
ds = ray.data.read_csv("example://iris.csv")
# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
# Filter rows.
df = df[df["variety"] == "Versicolor"]
# Add derived column.
df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max()
# Drop column.
df = df.drop(columns=["sepal.length"])
return df
# Have each batch that pandas_transform receives contain 10 rows.
ds = ds.map_batches(pandas_transform, batch_size=10)
# -> MapBatches(pandas_transform)
# +- Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# },
# )
ds.show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
Increasing batch_size
can result in faster execution by better leveraging vectorized
operations and hardware, reducing batch slicing and concatenation overhead, and overall
saturation of CPUs/GPUs, but will also result in higher memory utilization, which can
lead to out-of-memory failures. If encountering OOMs, decreasing your batch_size
may
help.
Note
The default batch_size
of 4096
may be too large for datasets with large rows
(e.g. tables with many columns or a collection of large images).
If you specify a batch_size
that’s larger than your Dataset
blocks, Datasets
will bundle multiple blocks together for a single task in order to better satisfy
batch_size
. If batch_size
is a lot larger than your Dataset
blocks (e.g. if
your dataset was created with too large of a parallelism
and/or the batch_size
is set to too large of a value for your dataset), the number of parallel tasks
may be less than expected.
If your Dataset
blocks are smaller than your batch_size
and you want to increase
ds.map_batches()
parallelism, decrease your
batch_size
to prevent this block bundling. If you think that your Dataset
blocks
are too small, try decreasing parallelism
during the read to create larger blocks.
Note
The size of the batches provided to the UDF may be smaller than the provided
batch_size
if batch_size
doesn’t evenly divide the block(s) sent to a given
task.
Note
Block bundling (processing multiple blocks in a single task) will not occur if
batch_size
is not set; instead, each task will receive a single block. If a block
is smaller than the default batch_size
(4096), then the batch provided to the UDF
in that task will the same size as the block, and will therefore be smaller than the
default batch_size
.
Compute Strategy#
Datasets transformations are executed by either Ray tasks
or Ray actors across a Ray cluster. By default, Ray tasks are
used (with compute="tasks"
). For transformations that require expensive setup,
it’s preferrable to use Ray actors, which are stateful and allow setup to be reused
for efficiency. You can specify compute=ray.data.ActorPoolStrategy(min, max)
and
Ray will use an autoscaling actor pool of min
to max
actors to execute your
transforms. For a fixed-size actor pool, specify ActorPoolStrategy(n, n)
.
The following is an example of using the Ray tasks and actors compute strategy for batch inference:
import ray
import pandas
import numpy
from ray.data import ActorPoolStrategy
# Dummy model to predict Iris variety.
def predict_iris(df: pandas.DataFrame) -> pandas.DataFrame:
conditions = [
(df["sepal.length"] < 5.0),
(df["sepal.length"] >= 5.0) & (df["sepal.length"] < 6.0),
(df["sepal.length"] >= 6.0)
]
values = ["Setosa", "Versicolor", "Virginica"]
return pandas.DataFrame({"predicted_variety": numpy.select(conditions, values)})
class IrisInferModel:
def __init__(self):
self._model = predict_iris
def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame:
return self._model(batch)
ds = ray.data.read_csv("example://iris.csv").repartition(10)
# Batch inference processing with Ray tasks (the default compute strategy).
predicted = ds.map_batches(predict_iris)
# Batch inference processing with Ray actors. Autoscale the actors between 3 and 10.
predicted = ds.map_batches(
IrisInferModel, compute=ActorPoolStrategy(3, 10), batch_size=10)
Group-bys and aggregations#
Unlike mapping operations, groupbys and aggregations are global. Grouped aggregations are executed lazily. Global aggregations are executed eagerly and block until the aggregation has been computed.
ds: ray.data.Dataset = ray.data.from_items([
{"A": x % 3, "B": 2 * x, "C": 3 * x}
for x in range(10)])
# Group by the A column and calculate the per-group mean for B and C columns.
agg_ds: ray.data.Dataset = ds.groupby("A").mean(["B", "C"]).materialize()
# -> Sort Sample: 100%|███████████████████████████████████████| 10/10 [00:01<00:00, 9.04it/s]
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 23.66it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 10/10 [00:00<00:00, 937.21it/s]
# -> Dataset(num_blocks=10, num_rows=3, schema={})
agg_ds.to_pandas()
# ->
# A mean(B) mean(C)
# 0 0 9.0 13.5
# 1 1 8.0 12.0
# 2 2 10.0 15.0
# Global mean on B column.
ds.mean("B")
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 2851.91it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 319.69it/s]
# -> 9.0
# Global mean on multiple columns.
ds.mean(["B", "C"])
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1730.32it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 231.41it/s]
# -> {'mean(B)': 9.0, 'mean(C)': 13.5}
# Multiple global aggregations on multiple columns.
from ray.data.aggregate import Mean, Std
ds.aggregate(Mean("B"), Std("B", ddof=0), Mean("C"), Std("C", ddof=0))
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1568.73it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 133.51it/s]
# -> {'mean(A)': 0.9, 'std(A)': 0.8306623862918076, 'mean(B)': 9.0, 'std(B)': 5.744562646538029}
Combine aggreations with batch mapping to transform datasets using computed statistics. For example, you can efficiently standardize feature columns and impute missing values with calculated column means.
# Impute missing values with the column mean.
b_mean = ds.mean("B")
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 4054.03it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 359.22it/s]
# -> 9.0
def impute_b(df: pd.DataFrame):
df["B"].fillna(b_mean)
return df
ds = ds.map_batches(impute_b, batch_format="pandas")
# -> MapBatches(impute_b)
# +- Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: int64, C: int64})
# Standard scaling of all feature columns.
stats = ds.aggregate(Mean("B"), Std("B"), Mean("C"), Std("C"))
# -> MapBatches(impute_b): 100%|██████████████████████████████| 10/10 [00:01<00:00, 7.16it/s]
# -> GroupBy Map: 100%|███████████████████████████████████████| 10/10 [00:00<00:00, 1260.99it/s]
# -> GroupBy Reduce: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 128.77it/s]
# -> {'mean(B)': 9.0, 'std(B)': 6.0553007081949835, 'mean(C)': 13.5, 'std(C)': 9.082951062292475}
def batch_standard_scaler(df: pd.DataFrame):
def column_standard_scaler(s: pd.Series):
s_mean = stats[f"mean({s.name})"]
s_std = stats[f"std({s.name})"]
return (s - s_mean) / s_std
cols = df.columns.difference(["A"])
df.loc[:, cols] = df.loc[:, cols].transform(column_standard_scaler)
return df
ds = ds.map_batches(batch_standard_scaler, batch_format="pandas")
ds.materialize()
# -> Map Progress: 100%|██████████████████████████████████████| 10/10 [00:00<00:00, 144.79it/s]
# -> Dataset(num_blocks=10, num_rows=10, schema={A: int64, B: double, C: double})
Shuffling data#
Call Dataset.random_shuffle()
to
perform a global shuffle.
>>> import ray
>>> dataset = ray.data.range(10)
>>> dataset.random_shuffle().take_all()
[7, 0, 9, 3, 5, 1, 4, 2, 8, 6]
For better performance, perform a local shuffle. Read Shuffling Data in the AIR user guide to learn more.