ray.data.grouped_data.GroupedData.map_groups#
- GroupedData.map_groups(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, compute: str | ComputeStrategy = None, batch_format: str | None = 'default', fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, **ray_remote_args) Dataset [source]#
Apply the given function to each group of records of this dataset.
- While map_groups() is very flexible, note that it comes with downsides:
It may be slower than using more specific methods such as min(), max().
It requires that each group fits in memory on a single node.
In general, prefer to use aggregate() instead of map_groups().
Warning
Specifying both
num_cpus
andnum_gpus
for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Ray team.Examples
>>> # Return a single record per group (list of multiple records in, >>> # list of a single record out). >>> import ray >>> import pandas as pd >>> import numpy as np >>> # Get first value per group. >>> ds = ray.data.from_items([ ... {"group": 1, "value": 1}, ... {"group": 1, "value": 2}, ... {"group": 2, "value": 3}, ... {"group": 2, "value": 4}]) >>> ds.groupby("group").map_groups( ... lambda g: {"result": np.array([g["value"][0]])})
>>> # Return multiple records per group (dataframe in, dataframe out). >>> df = pd.DataFrame( ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]} ... ) >>> ds = ray.data.from_pandas(df) >>> grouped = ds.groupby("A") >>> grouped.map_groups( ... lambda g: g.apply( ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c ... ) ... )
- Parameters:
fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().
compute – The compute strategy, either “tasks” (default) to use Ray tasks,
ray.data.ActorPoolStrategy(size=n)
to use a fixed-size actor pool, orray.data.ActorPoolStrategy(min_size=m, max_size=n)
for an autoscaling actor pool.batch_format – Specify
"default"
to use the default block format (NumPy),"pandas"
to selectpandas.DataFrame
, “pyarrow” to selectpyarrow.Table
, or"numpy"
to selectDict[str, numpy.ndarray]
, or None to return the underlying block exactly as is with no additional formatting.fn_args – Arguments to
fn
.fn_kwargs – Keyword arguments to
fn
.fn_constructor_args – Positional arguments to pass to
fn
’s constructor. You can only provide this iffn
is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.fn_constructor_kwargs – Keyword arguments to pass to
fn
’s constructor. This can only be provided iffn
is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.num_cpus – The number of CPUs to reserve for each parallel map worker.
num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify
num_gpus=1
to request 1 GPU for each parallel map worker.ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).
- Returns:
The return type is determined by the return type of
fn
, and the return value is combined from results of all groups.