ray.data.grouped_dataset.GroupedDataset.map_groups
ray.data.grouped_dataset.GroupedDataset.map_groups#
- GroupedDataset.map_groups(fn: Union[type, Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'default', **ray_remote_args) Dataset[Any] [source]#
Apply the given function to each group of records of this dataset.
- While map_groups() is very flexible, note that it comes with downsides:
It may be slower than using more specific methods such as min(), max().
It requires that each group fits in memory on a single node.
In general, prefer to use aggregate() instead of map_groups().
Examples
>>> # Return a single record per group (list of multiple records in, >>> # list of a single record out). >>> import ray >>> import pandas as pd >>> import numpy as np >>> # Get median per group. Note that median is not an associative >>> # function so cannot be computed with aggregate(). >>> ds = ray.data.range(100) >>> ds.groupby(lambda x: x % 3).map_groups( ... lambda x: [np.median(x)]) >>> # Get first value per group. >>> ds = ray.data.from_items([ ... {"group": 1, "value": 1}, ... {"group": 1, "value": 2}, ... {"group": 2, "value": 3}, ... {"group": 2, "value": 4}]) >>> ds.groupby("group").map_groups( ... lambda g: [g["value"][0]])
>>> # Return multiple records per group (dataframe in, dataframe out). >>> df = pd.DataFrame( ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]} ... ) >>> ds = ray.data.from_pandas(df) >>> grouped = ds.groupby("A") >>> grouped.map_groups( ... lambda g: g.apply( ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c ... ) ... )
- Parameters
fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().
compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
batch_format – Specify “default” to use the default block format (promotes Arrow to pandas), “pandas” to select
pandas.DataFrame
as the batch format, or “pyarrow” to selectpyarrow.Table
.ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).
- Returns
The return type is determined by the return type of
fn
, and the return value is combined from results of all groups.