GroupedData.map_groups(fn: Union[Callable[[Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]], Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]], Callable[[Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]], Iterator[Union[pyarrow.Table, pandas.DataFrame, Dict[str, numpy.ndarray]]]], _CallableClassProtocol], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: Optional[str] = 'default', **ray_remote_args) Dataset[source]#

Apply the given function to each group of records of this dataset.

While map_groups() is very flexible, note that it comes with downsides:
  • It may be slower than using more specific methods such as min(), max().

  • It requires that each group fits in memory on a single node.

In general, prefer to use aggregate() instead of map_groups().


>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out).
>>> import ray
>>> import pandas as pd
>>> import numpy as np
>>> # Get first value per group.
>>> ds = ray.data.from_items([ 
...     {"group": 1, "value": 1},
...     {"group": 1, "value": 2},
...     {"group": 2, "value": 3},
...     {"group": 2, "value": 4}])
>>> ds.groupby("group").map_groups( 
...     lambda g: {"result": np.array([g["value"][0]])})
>>> # Return multiple records per group (dataframe in, dataframe out).
>>> df = pd.DataFrame(
...     {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
... )
>>> ds = ray.data.from_pandas(df) 
>>> grouped = ds.groupby("A") 
>>> grouped.map_groups( 
...     lambda g: g.apply(
...         lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
...     )
... ) 
  • fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, ray.data.ActorPoolStrategy(size=n) to use a fixed-size actor pool, or ray.data.ActorPoolStrategy(min_size=m, max_size=n) for an autoscaling actor pool.

  • batch_format – Specify "default" to use the default block format (NumPy), "pandas" to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or "numpy" to select Dict[str, numpy.ndarray], or None to return the underlying block exactly as is with no additional formatting.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).


The return type is determined by the return type of fn, and the return value is combined from results of all groups.