ray.data.Dataset.map_batches#

Dataset.map_batches(fn: Union[Callable[[Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]], Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]], Callable[[Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]], Iterator[Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]]], _CallableClassProtocol], *, batch_size: Union[int, None, typing_extensions.Literal[default]] = 'default', compute: Optional[ray.data._internal.compute.ComputeStrategy] = None, batch_format: Optional[str] = 'default', zero_copy_batch: bool = False, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, num_cpus: Optional[float] = None, num_gpus: Optional[float] = None, **ray_remote_args) Dataset[source]#

Apply the given function to batches of data.

This applies the fn in parallel with map tasks, with each task handling a batch of data (typically Dict[str, np.ndarray] or pd.DataFrame).

To learn more about writing functions for map_batches(), read writing user-defined functions.

Tip

If fn does not mutate its input, set zero_copy_batch=True to elide a batch copy, which can improve performance and decrease memory utilization. fn will then receive zero-copy read-only batches. If fn mutates its input, you will need to ensure that the batch provided to fn is writable by setting zero_copy_batch=False (default). This will create an extra, mutable copy of each batch before handing it to fn.

Note

The size of the batches provided to fn may be smaller than the provided batch_size if batch_size doesn’t evenly divide the block(s) sent to a given map task. When batch_size is specified, each map task will be sent a single block if the block is equal to or larger than batch_size, and will be sent a bundle of blocks up to (but not exceeding) batch_size if blocks are smaller than batch_size.

Examples

>>> import numpy as np
>>> import ray
>>> ds = ray.data.from_items([
...     {"name": "Luna", "age": 4},
...     {"name": "Rory", "age": 14},
...     {"name": "Scout", "age": 9},
... ])
>>> ds  
MaterializedDataset(
    num_blocks=3,
    num_rows=3,
    schema={name: string, age: int64}
)

Here fn returns the same batch type as the input, but your fn can also return a different batch type (e.g., pd.DataFrame). Read more about Transforming Data.

>>> from typing import Dict
>>> def map_fn(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
...     batch["age_in_dog_years"] = 7 * batch["age"]
...     return batch
>>> ds = ds.map_batches(map_fn)
>>> ds
MapBatches(map_fn)
+- Dataset(num_blocks=3, num_rows=3, schema={name: string, age: int64})

Actors can improve the performance of some workloads. For example, you can use actors to load a model once per worker instead of once per inference.

To transform batches with actors, pass a callable type to fn and specify an ActorPoolStrategy.

In the example below, CachedModel is called on an autoscaling pool of two to eight actors, each allocated one GPU by Ray.

>>> init_large_model = ... 
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_large_model()
...    def __call__(self, item):
...        return self.model(item)
>>> ds.map_batches( 
...     CachedModel, 
...     batch_size=256, 
...     compute=ray.data.ActorPoolStrategy(size=8), 
...     num_gpus=1,
... ) 

fn can also be a generator, yielding multiple batches in a single invocation. This is useful when returning large objects. Instead of returning a very large output batch, fn can instead yield the output batch in chunks.

>>> def map_fn_with_large_output(batch):
...     for i in range(3):
...         yield {"large_output": np.ones((100, 1000))}
>>> ds = ray.data.from_items([1])
>>> ds = ds.map_batches(map_fn_with_large_output)
>>> ds
MapBatches(map_fn_with_large_output)
+- Dataset(num_blocks=1, num_rows=1, schema={item: int64})
Parameters
  • fn – The function or generator to apply to each record batch, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy. Note fn must be pickle-able.

  • batch_size – The desired number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The actual size of the batch provided to fn may be smaller than batch_size if batch_size doesn’t evenly divide the block(s) sent to a given map task. Default batch_size is 4096 with “default”.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, ray.data.ActorPoolStrategy(size=n) to use a fixed-size actor pool, or ray.data.ActorPoolStrategy(min_size=m, max_size=n) for an autoscaling actor pool.

  • batch_format – Specify "default" to use the default block format (NumPy), "pandas" to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or "numpy" to select Dict[str, numpy.ndarray], or None to return the underlying block exactly as is with no additional formatting.

  • zero_copy_batch – Whether fn should be provided zero-copy, read-only batches. If this is True and no copy is required for the batch_format conversion, the batch will be a zero-copy, read-only view on data in Ray’s object store, which can decrease memory utilization and improve performance. If this is False, the batch will be writable, which will require an extra copy to guarantee. If fn mutates its input, this will need to be False in order to avoid “assignment destination is read-only” or “buffer source array is read-only” errors. Default is False.

  • fn_args – Positional arguments to pass to fn after the first argument. These arguments are top-level arguments to the underlying Ray task.

  • fn_kwargs – Keyword arguments to pass to fn. These arguments are top-level arguments to the underlying Ray task.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • num_cpus – The number of CPUs to reserve for each parallel map worker.

  • num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify num_gpus=1 to request 1 GPU for each parallel map worker.

  • ray_remote_args – Additional resource requirements to request from ray for each map worker.

See also

iter_batches()

Call this function to iterate over batches of data.

flat_map():

Call this method to create new records from existing ones. Unlike map(), a function passed to flat_map() can return multiple records.

flat_map() isn’t recommended because it’s slow; call map_batches() instead.

map()

Call this method to transform one record at time.

This method isn’t recommended because it’s slow; call map_batches() instead.