ray.data.Dataset.map_batches#
- Dataset.map_batches(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | type[_CallableClassProtocol], *, batch_size: int | None | Literal['default'] = None, compute: ComputeStrategy | None = None, batch_format: str | None = 'default', zero_copy_batch: bool = False, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, concurrency: int | Tuple[int, int] | Tuple[int, int, int] | None = None, udf_modifying_row_count: bool = True, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset[source]#
Apply the given function to batches of data.
This method is useful for preprocessing data and performing inference. To learn more, see Transforming batches.
You can use either a function or a callable class to perform the transformation. For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses stateful Ray actors. For more information, see Stateful Transforms.
Tip
To understand the format of the input to
fn, calltake_batch()on the dataset to get a batch in the same format as will be passed tofn.Tip
If
fndoesn’t mutate its input, setzero_copy_batch=Trueto improve performance and decrease memory utilization.Warning
Specifying both
num_cpusandnum_gpusfor map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Ray team.Examples
Call
map_batches()to transform your data.from typing import Dict import numpy as np import ray def add_dog_years(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: batch["age_in_dog_years"] = 7 * batch["age"] return batch ds = ( ray.data.from_items([ {"name": "Luna", "age": 4}, {"name": "Rory", "age": 14}, {"name": "Scout", "age": 9}, ]) .map_batches(add_dog_years) ) ds.show()
{'name': 'Luna', 'age': 4, 'age_in_dog_years': 28} {'name': 'Rory', 'age': 14, 'age_in_dog_years': 98} {'name': 'Scout', 'age': 9, 'age_in_dog_years': 63}If your function returns large objects, yield outputs in chunks.
from typing import Dict import ray import numpy as np def map_fn_with_large_output(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: for i in range(3): yield {"large_output": np.ones((100, 1000))} ds = ( ray.data.from_items([1]) .map_batches(map_fn_with_large_output) )
If you require stateful transfomation, use Python callable class. Here is an example showing how to use stateful transforms to create model inference workers, without having to reload the model on each call.
from typing import Dict import numpy as np import torch import ray class TorchPredictor: def __init__(self): self.model = torch.nn.Identity().cuda() self.model.eval() def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda() with torch.inference_mode(): batch["output"] = self.model(inputs).detach().cpu().numpy() return batch ds = ( ray.data.from_numpy(np.ones((32, 100))) .map_batches( TorchPredictor, # Two workers with one GPU each compute=ray.data.ActorPoolStrategy(size=2), # Batch size is required if you're using GPUs. batch_size=4, num_gpus=1 ) )
To learn more, see End-to-end: Offline Batch Inference.
- Parameters:
fn – The function or generator to apply to a record batch, or a class type that can be instantiated to create such a callable. Note
fnmust be pickle-able.batch_size – The desired number of rows in each batch, or
Noneto use entire blocks as batches (blocks may contain different numbers of rows). The actual size of the batch provided tofnmay be smaller thanbatch_sizeifbatch_sizedoesn’t evenly divide the block(s) sent to a given map task. Defaultbatch_sizeisNone.compute –
The compute strategy to use for the map operation.
If
computeis not specified for a function, will useray.data.TaskPoolStrategy()to launch concurrent tasks based on the available resources and number of input blocks.Use
ray.data.TaskPoolStrategy(size=n)to launch at mostnconcurrent Ray tasks.If
computeis not specified for a callable class, will useray.data.ActorPoolStrategy(min_size=1, max_size=None)to launch an autoscaling actor pool from 1 to unlimited workers.Use
ray.data.ActorPoolStrategy(size=n)to use a fixed size actor pool ofnworkers.Use
ray.data.ActorPoolStrategy(min_size=m, max_size=n)to use an autoscaling actor pool frommtonworkers.Use
ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)to use an autoscaling actor pool frommtonworkers, with an initial size ofinitial.
batch_format – If
"default"or"numpy", batches areDict[str, numpy.ndarray]. If"pandas", batches arepandas.DataFrame. If"pyarrow", batches arepyarrow.Table. Ifbatch_formatis set toNoneinput block format will be used.zero_copy_batch – Whether
fnshould be provided zero-copy, read-only batches. If this isTrueand no copy is required for thebatch_formatconversion, the batch is a zero-copy, read-only view on data in Ray’s object store, which can decrease memory utilization and improve performance. If this isFalse, the batch is writable, which requires an extra copy to guarantee. Iffnmutates its input, this needs to beFalsein order to avoid “assignment destination is read-only” or “buffer source array is read-only” errors. Default isFalse.fn_args – Positional arguments to pass to
fnafter the first argument. These arguments are top-level arguments to the underlying Ray task.fn_kwargs – Keyword arguments to pass to
fn. These arguments are top-level arguments to the underlying Ray task.fn_constructor_args – Positional arguments to pass to
fn’s constructor. You can only provide this iffnis a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.fn_constructor_kwargs – Keyword arguments to pass to
fn’s constructor. This can only be provided iffnis a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.num_cpus – The number of CPUs to reserve for each parallel map worker.
num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify
num_gpus=1to request 1 GPU for each parallel map worker.memory – The heap memory in bytes to reserve for each parallel map worker.
concurrency – This argument is deprecated. Use
computeargument.udf_modifying_row_count – Set to False only if the UDF always emits the same number of records it receives (no drops or duplicates). When set to False, the logical optimizer, in the presence of a limit(limit=k), will only scan k rows prior to executing the UDF, thereby saving on compute resources.
ray_remote_args_fn – A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor/task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in
ray_remote_args. Note: this is an advanced, experimental feature.ray_remote_args – Additional resource requirements to request from Ray for each map worker. See
ray.remote()for details.
Note
The size of the batches provided to
fnmight be smaller than the specifiedbatch_sizeifbatch_sizedoesn’t evenly divide the block(s) sent to a given map task.If
batch_sizeis set and each input block is smaller than thebatch_size, Ray Data will bundle up many blocks as the input for one task, until their total size is equal to or greater than the givenbatch_size. Ifbatch_sizeis not set, the bundling will not be performed. Each task will receive entire input block as a batch.See also
iter_batches()Call this function to iterate over batches of data.
take_batch()Call this function to get a batch of data from the dataset in the same format as will be passed to the
fnfunction ofmap_batches().flat_map()Call this method to create new records from existing ones. Unlike
map(), a function passed toflat_map()can return multiple records.map()Call this method to transform one record at time.