ray.data.grouped_data.GroupedData.map_groups#
- GroupedData.map_groups(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, compute: str | ComputeStrategy = None, batch_format: str | None = 'default', fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, concurrency: int | Tuple[int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset [source]#
Apply the given function to each group of records of this dataset.
While map_groups() is very flexible, note that it comes with downsides:
It may be slower than using more specific methods such as min(), max().
It requires that each group fits in memory on a single node.
In general, prefer to use
aggregate()
instead ofmap_groups()
.Warning
Specifying both
num_cpus
andnum_gpus
for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Ray team.Examples
>>> # Return a single record per group (list of multiple records in, >>> # list of a single record out). >>> import ray >>> import pandas as pd >>> import numpy as np >>> # Get first value per group. >>> ds = ray.data.from_items([ ... {"group": 1, "value": 1}, ... {"group": 1, "value": 2}, ... {"group": 2, "value": 3}, ... {"group": 2, "value": 4}]) >>> ds.groupby("group").map_groups( ... lambda g: {"result": np.array([g["value"][0]])})
>>> # Return multiple records per group (dataframe in, dataframe out). >>> df = pd.DataFrame( ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]} ... ) >>> ds = ray.data.from_pandas(df) >>> grouped = ds.groupby("A") >>> grouped.map_groups( ... lambda g: g.apply( ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c ... ) ... )
- Parameters:
fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().
compute – The compute strategy, either “tasks” (default) to use Ray tasks,
ray.data.ActorPoolStrategy(size=n)
to use a fixed-size actor pool, orray.data.ActorPoolStrategy(min_size=m, max_size=n)
for an autoscaling actor pool.batch_format – Specify
"default"
to use the default block format (NumPy),"pandas"
to selectpandas.DataFrame
, “pyarrow” to selectpyarrow.Table
, or"numpy"
to selectDict[str, numpy.ndarray]
, or None to return the underlying block exactly as is with no additional formatting.fn_args – Arguments to
fn
.fn_kwargs – Keyword arguments to
fn
.fn_constructor_args – Positional arguments to pass to
fn
’s constructor. You can only provide this iffn
is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.fn_constructor_kwargs – Keyword arguments to pass to
fn
’s constructor. This can only be provided iffn
is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.num_cpus – The number of CPUs to reserve for each parallel map worker.
num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify
num_gpus=1
to request 1 GPU for each parallel map worker.memory – The heap memory in bytes to reserve for each parallel map worker.
ray_remote_args_fn – A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor or task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in
ray_remote_args
. Note: this is an advanced, experimental feature.ray_remote_args – Additional resource requirements to request from Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
ray.remote()
for details.
- Returns:
The return type is determined by the return type of
fn
, and the return value is combined from results of all groups.
See also
GroupedData.aggregate()
Use this method for common aggregation use cases.