Transforming Data#
Transformations let you process and modify your dataset. You can compose transformations to express a chain of computations.
Note
Transformations are lazy by default. They aren’t executed until you trigger consumption of the data by iterating over the Dataset, saving the Dataset, or inspecting properties of the Dataset.
This guide shows you how to:
Transforming rows#
Tip
If your transformation is vectorized, call map_batches() for
better performance. To learn more, see Transforming batches.
Transforming rows with map#
If your transformation returns exactly one row for each input row, call
map().
import os
from typing import Any, Dict
import ray
def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
row["filename"] = os.path.basename(row["path"])
return row
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
.map(parse_filename)
)
The user defined function passed to map() should be of type
Callable[[Dict[str, Any]], Dict[str, Any]]. In other words, your function should
input and output a dictionary with keys of strings and values of any type. For example:
from typing import Any, Dict
def fn(row: Dict[str, Any]) -> Dict[str, Any]:
# access row data
value = row["col1"]
# add data to row
row["col2"] = ...
# return row
return row
Transforming rows with flat map#
If your transformation returns multiple rows for each input row, call
flat_map().
from typing import Any, Dict, List
import ray
def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
return [row] * 2
print(
ray.data.range(3)
.flat_map(duplicate_row)
.take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]
The user defined function passed to flat_map() should be of type
Callable[[Dict[str, Any]], List[Dict[str, Any]]]. In other words your function should
input a dictionary with keys of strings and values of any type and output a list of
dictionaries that have the same type as the input, for example:
from typing import Any, Dict, List
def fn(row: Dict[str, Any]) -> List[Dict[str, Any]]:
# access row data
value = row["col1"]
# add data to row
row["col2"] = ...
# construct output list
output = [row, row]
# return list of output rows
return output
Transforming batches#
If your transformation can be vectorized using NumPy, PyArrow or Pandas operations, transforming batches is considerably more performant than transforming individual rows.
from typing import Dict
import numpy as np
import ray
def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness)
)
Configuring batch format#
Ray Data represents batches as dicts of NumPy ndarrays, pandas DataFrames or Arrow Tables. By
default, Ray Data represents batches as dicts of NumPy ndarrays. To configure the batch type,
specify batch_format in map_batches(). You can return either
format from your function, but batch_format should match the input of your function.
When applying transformations to batches of rows, Ray Data could represent these batches as either NumPy’s ndarrays,
Pandas DataFrame or PyArrow Table.
- When using
batch_format=numpy, the input to the function will be a dictionary where keys correspond to column names and values to column values represented asndarrays.batch_format=pyarrow, the input to the function will be a PyarrowTable.batch_format=pandas, the input to the function will be a PandasDataFrame.
from typing import Dict
import numpy as np
import ray
def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness, batch_format="numpy")
)
import pandas as pd
import ray
def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
return batch.dropna()
ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="pandas")
)
import pyarrow as pa
import pyarrow.compute as pc
import ray
def drop_nas(batch: pa.Table) -> pa.Table:
return pc.drop_null(batch)
ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="pyarrow")
)
The user defined function can also be a Python generator that yields batches, so the function can also
be of type Callable[DataBatch, Iterator[[DataBatch]], where DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]].
In this case, your function would look like:
from typing import Dict, Iterator
import numpy as np
def fn(batch: Dict[str, np.ndarray]) -> Iterator[Dict[str, np.ndarray]]:
# yield the same batch multiple times
for _ in range(10):
yield batch
Choosing the right batch format#
When choosing appropriate batch format for your map_batches primary consideration is a trade-off of convenience vs performance:
Batches are a sliding window into the underlying block: the UDF is invoked with a subset of rows of the underlying block that make up the current batch of specified
batch_size. Specifyingbatch_size=Nonemakes batch include all rows of the block in a single batch.Depending on the batch format, such view can either be a zero-copy (when batch format matches the block type of either
pandasorpyarrow) or copying one (when the batch format differs from the block type).
For example, if the underlying block type is Arrow, specifying batch_format="numpy" or batch_format="pandas" might invoke a copy on the underlying data when converting it from the underlying block type.
Ray Data also strives to minimize the amount of data conversions: for example, if your map_batches operation returns Pandas batches, then these batches are combined into blocks without conversion and propagated further as Pandas blocks. Most Ray Data datasources produce Arrow blocks, so using batch format pyarrow can avoid unnecessary data conversions.
If you’d like to use a more ergonomic API for transformations but avoid performance overheads, you can consider using polars inside your map_batches operation with batch_format="pyarrow" as follows:
import pyarrow as pa
def udf(table: pa.Table):
import polars as pl
df = polars.from_pyarrow(table)
df.summary()
return df.to_arrow()
ds.map_batches(udf, batch_format="pyarrow")
Configuring batch size#
Increasing batch_size improves the performance of vectorized transformations as well
as performance of model inference. However, if your batch size is too large, your
program might run into out-of-memory (OOM) errors.
If you encounter an OOM errors, try decreasing your batch_size.
Ordering of rows#
When transforming data, the order of blocks isn’t preserved by default.
If the order of blocks needs to be preserved/deterministic,
you can use sort() method, or set ray.data.ExecutionOptions.preserve_order to True.
Note that setting this flag may negatively impact performance on larger cluster setups where stragglers are more likely.
import ray
ctx = ray.data.DataContext().get_current()
# By default, this is set to False.
ctx.execution_options.preserve_order = True
Stateful Transforms#
If your transform requires expensive setup such as downloading
model weights, use a callable Python class instead of a function to make the transform stateful. When a Python class
is used, the __init__ method is called to perform setup exactly once on each worker.
In contrast, functions are stateless, so any setup must be performed for each data item.
Internally, Ray Data uses tasks to execute functions, and uses actors to execute classes. To learn more about tasks and actors, read the Ray Core Key Concepts.
To transform data with a Python class, complete these steps:
Implement a class. Perform setup in
__init__and transform data in__call__.Call
map_batches(),map(), orflat_map(). Pass a compute strategy with thecomputeargument to control how many workers Ray uses. Each worker transforms a partition of data in parallel. Useray.data.TaskPoolStrategy(size=n)to cap the number of concurrent tasks, orray.data.ActorPoolStrategy(...)to run callable classes on a fixed or autoscaling actor pool.
from typing import Dict
import numpy as np
import torch
import ray
class TorchPredictor:
def __init__(self):
self.model = torch.nn.Identity()
self.model.eval()
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
inputs = torch.as_tensor(batch["data"], dtype=torch.float32)
with torch.inference_mode():
batch["output"] = self.model(inputs).detach().numpy()
return batch
ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(
TorchPredictor,
compute=ray.data.ActorPoolStrategy(size=2),
)
)
from typing import Dict
import numpy as np
import torch
import ray
class TorchPredictor:
def __init__(self):
self.model = torch.nn.Identity().cuda()
self.model.eval()
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
with torch.inference_mode():
batch["output"] = self.model(inputs).detach().cpu().numpy()
return batch
ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(
TorchPredictor,
# Two workers with one GPU each
compute=ray.data.ActorPoolStrategy(size=2),
# Batch size is required if you're using GPUs.
batch_size=4,
num_gpus=1
)
)
Avoiding out-of-memory errors#
If your user defined function uses lots of memory, you might encounter out-of-memory
errors. To avoid these errors, configure the memory parameter. It tells Ray how much
memory your function uses, and prevents Ray from scheduling too many tasks on a node.
def uses_lots_of_memory(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
...
# Tell Ray that the function uses 1 GiB of memory
ds.map_batches(uses_lots_of_memory, memory=1 * 1024 * 1024)
Group-by and transforming groups#
To transform groups, call groupby() to group rows based on provided key column values.
Then, call map_groups() to execute a transformation on each group.
from typing import Dict
import numpy as np
import ray
items = [
{"image": np.zeros((32, 32, 3)), "label": label}
for _ in range(10) for label in range(100)
]
def normalize_images(group: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
group["image"] = (group["image"] - group["image"].mean()) / group["image"].std()
return group
ds = (
ray.data.from_items(items)
.groupby("label")
.map_groups(normalize_images)
)
import pandas as pd
import ray
def normalize_features(group: pd.DataFrame) -> pd.DataFrame:
target = group.drop("target")
group = (group - group.min()) / group.std()
group["target"] = target
return group
ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.groupby("target")
.map_groups(normalize_features)
)