ray.data.Dataset.map_batches
ray.data.Dataset.map_batches#
- Dataset.map_batches(fn: Union[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], _CallableClassProtocol], *, batch_size: Optional[Union[int, typing_extensions.Literal[default]]] = 'default', compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, batch_format: typing_extensions.Literal[default, pandas, pyarrow, numpy] = 'default', prefetch_batches: int = 0, zero_copy_batch: bool = False, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, **ray_remote_args) Dataset[Any] [source]#
Apply the given function to batches of data.
This applies the
fn
in parallel with map tasks, with each task handling a block or a bundle of blocks of the dataset. Each batch is executed serially at Ray level (at lower level, the processing of the batch is usually vectorized).Batches are represented as dataframes, ndarrays, or lists. The default batch type is determined by your dataset’s schema. To determine the default batch type, call
default_batch_format()
. Alternatively, set the batch type withbatch_format
.To learn more about writing functions for
map_batches()
, read writing user-defined functions.Tip
If you’re using Ray AIR for training or batch inference, consider using
BatchMapper
. It’s more performant and easier to use.Tip
For some standard operations like imputing, encoding or normalization, one may find directly using
Preprocessor
to be more convenient.Tip
If you have a small number of big blocks, it may limit parallelism. You may consider increasing the number of blocks via
.repartition()
before applying.map_batches()
.Tip
If
fn
does not mutate its input, setzero_copy_batch=True
to elide a batch copy, which can improve performance and decrease memory utilization.fn
will then receive zero-copy read-only batches. Iffn
mutates its input, you will need to ensure that the batch provided tofn
is writable by settingzero_copy_batch=False
(default). This will create an extra, mutable copy of each batch before handing it tofn
.Note
The size of the batches provided to
fn
may be smaller than the providedbatch_size
ifbatch_size
doesn’t evenly divide the block(s) sent to a given map task. Whenbatch_size
is specified, each map task will be sent a single block if the block is equal to or larger thanbatch_size
, and will be sent a bundle of blocks up to (but not exceeding)batch_size
if blocks are smaller thanbatch_size
.Examples
>>> import pandas as pd >>> import ray >>> df = pd.DataFrame({ ... "name": ["Luna", "Rory", "Scout"], ... "age": [4, 14, 9] ... }) >>> ds = ray.data.from_pandas(df) >>> ds Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64})
Call
default_batch_format()
to determine the default batch type.>>> ds.default_batch_format() <class 'pandas.core.frame.DataFrame'>
Tip
Datasets created from tabular data like Arrow tables and Parquet files yield
pd.DataFrame
batches.Once you know the batch type, define a function that transforms batches of data.
ds.map_batches
applies the function in parallel.>>> def map_fn(batch: pd.DataFrame) -> pd.DataFrame: ... batch["age_in_dog_years"] = 7 * batch["age"] ... return batch >>> ds = ds.map_batches(map_fn) >>> ds MapBatches(map_fn) +- Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64})
Your
fn
can return a different type than the input type. To learn more about supported output types, read user-defined function output types.>>> from typing import List >>> def map_fn(batch: pd.DataFrame) -> List[int]: ... return list(batch["age_in_dog_years"]) >>> ds = ds.map_batches(map_fn) >>> ds MapBatches(map_fn) +- MapBatches(map_fn) +- Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64})
Actors can improve the performance of some workloads. For example, you can use actors to load a model once per worker instead of once per inference.
To transform batches with actors, pass a callable type to
fn
and specify anActorPoolStrategy>
.In the example below,
CachedModel
is called on an autoscaling pool of two to eight actors, each allocated one GPU by Ray.>>> from ray.data import ActorPoolStrategy >>> init_large_model = ... >>> class CachedModel: ... def __init__(self): ... self.model = init_large_model() ... def __call__(self, item): ... return self.model(item) >>> ds.map_batches( ... CachedModel, ... batch_size=256, ... compute=ActorPoolStrategy(2, 8), ... num_gpus=1, ... )
- Parameters
fn – The function to apply to each record batch, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy. Note
fn
must be pickle-able.batch_size – The desired number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The actual size of the batch provided to
fn
may be smaller thanbatch_size
ifbatch_size
doesn’t evenly divide the block(s) sent to a given map task. Default batch_size is 4096 with “default”.compute – The compute strategy, either
"tasks"
(default) to use Ray tasks, or"actors"
to use an autoscaling actor pool. If you want to configure the size of the autoscaling actor pool, provide anActorPoolStrategy
instance. If you’re passing callable type tofn
, you must pass anActorPoolStrategy
or"actors"
.batch_format – Specify
"default"
to use the default block format (promotes tables to Pandas and tensors to NumPy),"pandas"
to selectpandas.DataFrame
, “pyarrow” to selectpyarrow.Table
, or"numpy"
to selectnumpy.ndarray
for tensor datasets andDict[str, numpy.ndarray]
for tabular datasets. Default is “default”.prefetch_batches – The number of batches to fetch ahead of the current batch to process. If set to greater than 0, a separate thread will be used to fetch the specified amount of formatted batches from blocks. This improves performance for non-CPU bound UDFs, allowing batch fetching compute and formatting to be overlapped with the UDF. Defaults to 0 (no prefetching enabled.) Increasing the number of batches to prefetch can result in higher throughput, at the expense of requiring more heap memory to buffer the batches.
zero_copy_batch – Whether
fn
should be provided zero-copy, read-only batches. If this isTrue
and no copy is required for thebatch_format
conversion, the batch will be a zero-copy, read-only view on data in Ray’s object store, which can decrease memory utilization and improve performance. If this isFalse
, the batch will be writable, which will require an extra copy to guarantee. Iffn
mutates its input, this will need to beFalse
in order to avoid “assignment destination is read-only” or “buffer source array is read-only” errors. Default isFalse
. See batch format docs for details on which format conversion always require a copy.fn_args – Positional arguments to pass to
fn
after the first argument. These arguments are top-level arguments to the underlying Ray task.fn_kwargs – Keyword arguments to pass to
fn
. These arguments are top-level arguments to the underlying Ray task.fn_constructor_args – Positional arguments to pass to
fn
’s constructor. You can only provide this iffn
is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.fn_constructor_kwargs – Keyword arguments to pass to
fn
’s constructor. This can only be provided iffn
is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.ray_remote_args – Additional resource requirements to request from ray (e.g.,
num_gpus=1
to request GPUs for the map tasks).
See also
iter_batches()
Call this function to iterate over batches of data.
default_batch_format()
Call this function to determine the default batch type.
flat_map()
:Call this method to create new records from existing ones. Unlike
map()
, a function passed toflat_map()
can return multiple records.flat_map()
isn’t recommended because it’s slow; callmap_batches()
instead.map()
Call this method to transform one record at time.
This method isn’t recommended because it’s slow; call
map_batches()
instead.