ray.data.grouped_dataset.GroupedDataset.map_groups#

GroupedDataset.map_groups(fn: Union[type, Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'default', **ray_remote_args) Dataset[Any][source]#

Apply the given function to each group of records of this dataset.

While map_groups() is very flexible, note that it comes with downsides:
  • It may be slower than using more specific methods such as min(), max().

  • It requires that each group fits in memory on a single node.

In general, prefer to use aggregate() instead of map_groups().

This is a blocking operation.

Examples

>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out).
>>> import ray
>>> import pandas as pd
>>> import numpy as np
>>> # Get median per group. Note that median is not an associative
>>> # function so cannot be computed with aggregate().
>>> ds = ray.data.range(100) 
>>> ds.groupby(lambda x: x % 3).map_groups( 
...     lambda x: [np.median(x)])
>>> # Get first value per group.
>>> ds = ray.data.from_items([ 
...     {"group": 1, "value": 1},
...     {"group": 1, "value": 2},
...     {"group": 2, "value": 3},
...     {"group": 2, "value": 4}])
>>> ds.groupby("group").map_groups( 
...     lambda g: [g["value"][0]])
>>> # Return multiple records per group (dataframe in, dataframe out).
>>> df = pd.DataFrame(
...     {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
... )
>>> ds = ray.data.from_pandas(df) 
>>> grouped = ds.groupby("A") 
>>> grouped.map_groups( 
...     lambda g: g.apply(
...         lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
...     )
... ) 
Parameters
  • fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.

  • batch_format – Specify “default” to use the default block format (promotes Arrow to pandas), “pandas” to select pandas.DataFrame as the batch format, or “pyarrow” to select pyarrow.Table.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Returns

The return type is determined by the return type of fn, and the return value is combined from results of all groups.