Transforming Data#

Transformations let you process and modify your dataset. You can compose transformations to express a chain of computations.

Note

Transformations are lazy by default. They aren’t executed until you trigger consumption of the data by iterating over the Dataset, saving the Dataset, or inspecting properties of the Dataset.

This guide shows you how to:

Transforming rows#

Tip

If your transformation is vectorized, call map_batches() for better performance. To learn more, see Transforming batches.

Transforming rows with map#

If your transformation returns exactly one row for each input row, call map().

import os
from typing import Any, Dict
import ray

def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)

The user defined function passed to map() should be of type Callable[[Dict[str, Any]], Dict[str, Any]]. In other words, your function should input and output a dictionary with keys of strings and values of any type. For example:

from typing import Any, Dict

def fn(row: Dict[str, Any]) -> Dict[str, Any]:
    # access row data
    value = row["col1"]

    # add data to row
    row["col2"] = ...

    # return row
    return row

Transforming rows with flat map#

If your transformation returns multiple rows for each input row, call flat_map().

from typing import Any, Dict, List
import ray

def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    return [row] * 2

print(
    ray.data.range(3)
    .flat_map(duplicate_row)
    .take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]

The user defined function passed to flat_map() should be of type Callable[[Dict[str, Any]], List[Dict[str, Any]]]. In other words your function should input a dictionary with keys of strings and values of any type and output a list of dictionaries that have the same type as the input, for example:

from typing import Any, Dict, List

def fn(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    # access row data
    value = row["col1"]

    # add data to row
    row["col2"] = ...

    # construct output list
    output = [row, row]

    # return list of output rows
    return output

Transforming batches#

If your transformation is vectorized like most NumPy or pandas operations, transforming batches is more performant than transforming rows.

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness)
)

Configuring batch format#

Ray Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. By default, Ray Data represents batches as dicts of NumPy ndarrays. To configure the batch type, specify batch_format in map_batches(). You can return either format from your function, but batch_format should match the input of your function.

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness, batch_format="numpy")
)
import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
    return batch.dropna()

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .map_batches(drop_nas, batch_format="pandas")
)

The user defined function you pass to map_batches() is more flexible. Because you can represent batches in multiple ways (see Configuring batch format), the function should be of type Callable[DataBatch, DataBatch], where DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]]. In other words, your function should take as input and output a batch of data which you can represent as a pandas DataFrame or a dictionary with string keys and NumPy ndarrays values. For example, your function might look like:

import pandas as pd

def fn(batch: pd.DataFrame) -> pd.DataFrame:
    # modify batch
    batch = ...

    # return batch
    return output

The user defined function can also be a Python generator that yields batches, so the function can also be of type Callable[DataBatch, Iterator[[DataBatch]], where DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]]. In this case, your function would look like:

from typing import Dict, Iterator
import numpy as np

def fn(batch: Dict[str, np.ndarray]) -> Iterator[Dict[str, np.ndarray]]:
    # yield the same batch multiple times
    for _ in range(10):
        yield batch

Configuring batch size#

Increasing batch_size improves the performance of vectorized transformations like NumPy functions and model inference. However, if your batch size is too large, your program might run out of memory. If you encounter an out-of-memory error, decrease your batch_size.

Stateful Transforms#

If your transform requires expensive setup such as downloading model weights, use a callable Python class instead of a function to make the transform stateful. When a Python class is used, the __init__ method is called to perform setup exactly once on each worker. In contrast, functions are stateless, so any setup must be performed for each data item.

Internally, Ray Data uses tasks to execute functions, and uses actors to execute classes. To learn more about tasks and actors, read the Ray Core Key Concepts.

To transform data with a Python class, complete these steps:

  1. Implement a class. Perform setup in __init__ and transform data in __call__.

  2. Call map_batches(), map(), or flat_map(). Pass the number of concurrent workers to use with the concurrency argument. Each worker transforms a partition of data in parallel. Fixing the number of concurrent workers gives the most predictable performance, but you can also pass a tuple of (min, max) to allow Ray Data to automatically scale the number of concurrent workers.

from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32)
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(TorchPredictor, concurrency=2)
)
from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        concurrency=2,
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

Groupby and transforming groups#

To transform groups, call groupby() to group rows. Then, call map_groups() to transform the groups.

from typing import Dict
import numpy as np
import ray

items = [
    {"image": np.zeros((32, 32, 3)), "label": label}
    for _ in range(10) for label in range(100)
]

def normalize_images(group: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    group["image"] = (group["image"] - group["image"].mean()) / group["image"].std()
    return group

ds = (
    ray.data.from_items(items)
    .groupby("label")
    .map_groups(normalize_images)
)
import pandas as pd
import ray

def normalize_features(group: pd.DataFrame) -> pd.DataFrame:
    target = group.drop("target")
    group = (group - group.min()) / group.std()
    group["target"] = target
    return group

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .groupby("target")
    .map_groups(normalize_features)
)