ray.data.grouped_data.GroupedData.map_groups#

GroupedData.map_groups(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, compute: str | ComputeStrategy = None, batch_format: str | None = 'default', fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, **ray_remote_args) Dataset[source]#

Apply the given function to each group of records of this dataset.

While map_groups() is very flexible, note that it comes with downsides:
  • It may be slower than using more specific methods such as min(), max().

  • It requires that each group fits in memory on a single node.

In general, prefer to use aggregate() instead of map_groups().

Warning

Specifying both num_cpus and num_gpus for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Ray team.

Examples

>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out).
>>> import ray
>>> import pandas as pd
>>> import numpy as np
>>> # Get first value per group.
>>> ds = ray.data.from_items([ 
...     {"group": 1, "value": 1},
...     {"group": 1, "value": 2},
...     {"group": 2, "value": 3},
...     {"group": 2, "value": 4}])
>>> ds.groupby("group").map_groups( 
...     lambda g: {"result": np.array([g["value"][0]])})
>>> # Return multiple records per group (dataframe in, dataframe out).
>>> df = pd.DataFrame(
...     {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
... )
>>> ds = ray.data.from_pandas(df) 
>>> grouped = ds.groupby("A") 
>>> grouped.map_groups( 
...     lambda g: g.apply(
...         lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
...     )
... ) 
Parameters:
  • fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, ray.data.ActorPoolStrategy(size=n) to use a fixed-size actor pool, or ray.data.ActorPoolStrategy(min_size=m, max_size=n) for an autoscaling actor pool.

  • batch_format – Specify "default" to use the default block format (NumPy), "pandas" to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or "numpy" to select Dict[str, numpy.ndarray], or None to return the underlying block exactly as is with no additional formatting.

  • fn_args – Arguments to fn.

  • fn_kwargs – Keyword arguments to fn.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • num_cpus – The number of CPUs to reserve for each parallel map worker.

  • num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify num_gpus=1 to request 1 GPU for each parallel map worker.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Returns:

The return type is determined by the return type of fn, and the return value is combined from results of all groups.