ray.data.Dataset.map_batches#

Dataset.map_batches(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, batch_size: int | None | Literal['default'] = 'default', compute: ComputeStrategy | None = None, batch_format: str | None = 'default', zero_copy_batch: bool = False, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset[source]#

Apply the given function to batches of data.

This method is useful for preprocessing data and performing inference. To learn more, see Transforming batches.

You can use either a function or a callable class to perform the transformation. For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses stateful Ray actors. For more information, see Stateful Transforms.

Tip

To understand the format of the input to fn, call take_batch() on the dataset to get a batch in the same format as will be passed to fn.

Tip

If fn doesn’t mutate its input, set zero_copy_batch=True to improve performance and decrease memory utilization.

Warning

Specifying both num_cpus and num_gpus for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Ray team.

Examples

Call map_batches() to transform your data.

from typing import Dict
import numpy as np
import ray

def add_dog_years(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["age_in_dog_years"] = 7 * batch["age"]
    return batch

ds = (
    ray.data.from_items([
        {"name": "Luna", "age": 4},
        {"name": "Rory", "age": 14},
        {"name": "Scout", "age": 9},
    ])
    .map_batches(add_dog_years)
)
ds.show()
{'name': 'Luna', 'age': 4, 'age_in_dog_years': 28}
{'name': 'Rory', 'age': 14, 'age_in_dog_years': 98}
{'name': 'Scout', 'age': 9, 'age_in_dog_years': 63}

If your function returns large objects, yield outputs in chunks.

from typing import Dict
import ray
import numpy as np

def map_fn_with_large_output(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    for i in range(3):
        yield {"large_output": np.ones((100, 1000))}

ds = (
    ray.data.from_items([1])
    .map_batches(map_fn_with_large_output)
)

If you require stateful transfomation, use Python callable class. Here is an example showing how to use stateful transforms to create model inference workers, without having to reload the model on each call.

from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        concurrency=2,
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

To learn more, see End-to-end: Offline Batch Inference.

Parameters:
  • fn – The function or generator to apply to a record batch, or a class type that can be instantiated to create such a callable. Note fn must be pickle-able.

  • batch_size – The desired number of rows in each batch, or None to use entire blocks as batches (blocks may contain different numbers of rows). The actual size of the batch provided to fn may be smaller than batch_size if batch_size doesn’t evenly divide the block(s) sent to a given map task. Default batch_size is 1024 with “default”.

  • compute – This argument is deprecated. Use concurrency argument.

  • batch_format – If "default" or "numpy", batches are Dict[str, numpy.ndarray]. If "pandas", batches are pandas.DataFrame.

  • zero_copy_batch – Whether fn should be provided zero-copy, read-only batches. If this is True and no copy is required for the batch_format conversion, the batch is a zero-copy, read-only view on data in Ray’s object store, which can decrease memory utilization and improve performance. If this is False, the batch is writable, which requires an extra copy to guarantee. If fn mutates its input, this needs to be False in order to avoid “assignment destination is read-only” or “buffer source array is read-only” errors. Default is False.

  • fn_args – Positional arguments to pass to fn after the first argument. These arguments are top-level arguments to the underlying Ray task.

  • fn_kwargs – Keyword arguments to pass to fn. These arguments are top-level arguments to the underlying Ray task.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • num_cpus – The number of CPUs to reserve for each parallel map worker.

  • num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify num_gpus=1 to request 1 GPU for each parallel map worker.

  • concurrency – The number of Ray workers to use concurrently. For a fixed-sized worker pool of size n, specify concurrency=n. For an autoscaling worker pool from m to n workers, specify concurrency=(m, n).

  • ray_remote_args_fn – A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor/task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in ray_remote_args. Note: this is an advanced, experimental feature.

  • ray_remote_args – Additional resource requirements to request from ray for each map worker.

Note

The size of the batches provided to fn might be smaller than the specified batch_size if batch_size doesn’t evenly divide the block(s) sent to a given map task.

If batch_size is set and each input block is smaller than the batch_size, Ray Data will bundle up many blocks as the input for one task, until their total size is equal to or greater than the given batch_size. If batch_size is not set, the bundling will not be performed. Each task will receive only one input block.

See also

iter_batches()

Call this function to iterate over batches of data.

take_batch()

Call this function to get a batch of data from the dataset in the same format as will be passed to the fn function of map_batches().

flat_map()

Call this method to create new records from existing ones. Unlike map(), a function passed to flat_map() can return multiple records.

map()

Call this method to transform one record at time.