GroupedDataset API

class ray.data.grouped_dataset.GroupedDataset(dataset: ray.data.dataset.Dataset[ray.data.block.T], key: Union[None, str, Callable[[ray.data.block.T], Any]])[source]

Represents a grouped dataset created by calling Dataset.groupby().

The actual groupby is deferred until an aggregation is applied.

PublicAPI: This API is stable across Ray releases.

aggregate(*aggs: ray.data.aggregate.AggregateFn) ray.data.dataset.Dataset[ray.data.block.U][source]

Implements an accumulator-based aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> from ray.data.aggregate import AggregateFn
>>> ds = ray.data.range(100) 
>>> grouped_ds = ds.groupby(lambda x: x % 3) 
>>> grouped_ds.aggregate(AggregateFn( 
...     init=lambda k: [], 
...     accumulate=lambda a, r: a + [r], 
...     merge=lambda a1, a2: a1 + a2, 
...     finalize=lambda a: a 
... )) 
Parameters

aggs – Aggregations to do.

Returns

If the input dataset is simple dataset then the output is a simple dataset of (k, v_1, ..., v_n) tuples where k is the groupby key and v_i is the result of the ith given aggregation. If the input dataset is an Arrow dataset then the output is an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations. If groupby key is None then the key part of return is omitted.

map_groups(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]], Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'native', **ray_remote_args) Dataset[Any][source]

Apply the given function to each group of records of this dataset.

While map_groups() is very flexible, note that it comes with downsides:
  • It may be slower than using more specific methods such as min(), max().

  • It requires that each group fits in memory on a single node.

In general, prefer to use aggregate() instead of map_groups().

This is a blocking operation.

Examples

>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out). Note that median is not an
>>> # associative function so cannot be computed with aggregate().
>>> import ray
>>> import pandas as pd
>>> import numpy as np
>>> ds = ray.data.range(100) 
>>> ds.groupby(lambda x: x % 3).map_groups( 
...     lambda x: [np.median(x)])
>>> # Return multiple records per group (dataframe in, dataframe out).
>>> df = pd.DataFrame(
...     {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
... )
>>> ds = ray.data.from_pandas(df) 
>>> grouped = ds.groupby("A") 
>>> grouped.map_groups( 
...     lambda g: g.apply(
...         lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
...     )
... ) 
Parameters
  • fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.

  • batch_format – Specify “native” to use the native block format (promotes Arrow to pandas), “pandas” to select pandas.DataFrame as the batch format, or “pyarrow” to select pyarrow.Table.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Returns

The return type is determined by the return type of fn, and the return value is combined from results of all groups.

count() ray.data.dataset.Dataset[ray.data.block.U][source]

Compute count aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).count() 
>>> ray.data.from_items([ 
...     {"A": x % 3, "B": x} for x in range(100)]).groupby( 
...     "A").count() 
Returns

A simple dataset of (k, v) pairs or an Arrow dataset of [k, v] columns where k is the groupby key and v is the number of rows with that key. If groupby key is None then the key part of return is omitted.

sum(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped sum aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).sum() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .sum(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").sum() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .sum(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the sum.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a sum of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise sum of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the sum; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The sum result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, sum) tuples where k is the groupby key and sum is sum of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, sum_1, ..., sum_n) tuples where k is the groupby key and sum_i is sum of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise sum column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

min(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped min aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).min() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .min(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").min() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .min(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the min.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a min of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise min of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the min; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The min result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, min) tuples where k is the groupby key and min is min of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, min_1, ..., min_n) tuples where k is the groupby key and min_i is min of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise min column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

max(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped max aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).max() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .max(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").max() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .max(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the max.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a max of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise max of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the max; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The max result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, max) tuples where k is the groupby key and max is max of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, max_1, ..., max_n) tuples where k is the groupby key and max_i is max of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise max column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

mean(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped mean aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).mean() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .mean(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").mean() 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .mean(["B", "C"]) 
Parameters
  • on

    The data subset on which to compute the mean.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a mean of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise mean of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the mean; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The mean result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, mean) tuples where k is the groupby key and mean is mean of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, mean_1, ..., mean_n) tuples where k is the groupby key and mean_i is mean of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise mean column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

std(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ddof: int = 1, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U][source]

Compute grouped standard deviation aggregation.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).groupby(lambda x: x % 3).std() 
>>> ray.data.from_items([ 
...     (i % 3, i, i**2) 
...     for i in range(100)]) \ 
...     .groupby(lambda x: x[0] % 3) \ 
...     .std(lambda x: x[2]) 
>>> ray.data.range_table(100).groupby("value").std(ddof=0) 
>>> ray.data.from_items([ 
...     {"A": i % 3, "B": i, "C": i**2} 
...     for i in range(100)]) \ 
...     .groupby("A") \ 
...     .std(["B", "C"]) 

NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

Parameters
  • on

    The data subset on which to compute the std.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to take a std of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise std of all columns.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the std; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The standard deviation result.

For a simple dataset, the output is:

  • on=None: a simple dataset of (k, std) tuples where k is the groupby key and std is std of all rows in that group.

  • on=[callable_1, ..., callable_n]: a simple dataset of (k, std_1, ..., std_n) tuples where k is the groupby key and std_i is std of the outputs of the ith callable called on each row in that group.

For an Arrow dataset, the output is:

  • on=None: an Arrow dataset containing a groupby key column, "k", and a column-wise std column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

Aggregations

class ray.data.aggregate.AggregateFn(init: Callable[[ray.data.block.KeyType], ray.data.block.AggType], merge: Callable[[ray.data.block.AggType, ray.data.block.AggType], ray.data.block.AggType], accumulate_row: Callable[[ray.data.block.AggType, ray.data.block.T], ray.data.block.AggType] = None, accumulate_block: Callable[[ray.data.block.AggType, Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], ray.data.block.AggType] = None, finalize: Callable[[ray.data.block.AggType], ray.data.block.U] = <function AggregateFn.<lambda>>, name: Optional[str] = None)[source]

PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Count[source]

Defines count aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Sum(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines sum aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Max(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines max aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Mean(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines mean aggregation. PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.Std(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ddof: int = 1, ignore_nulls: bool = True)[source]

Defines standard deviation aggregation.

Uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to its numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

PublicAPI: This API is stable across Ray releases.

class ray.data.aggregate.AbsMax(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]

Defines absolute max aggregation. PublicAPI: This API is stable across Ray releases.