GroupedDataset API
Contents
GroupedDataset API#
- class ray.data.grouped_dataset.GroupedDataset(dataset: ray.data.dataset.Dataset[ray.data.block.T], key: Union[None, str, Callable[[ray.data.block.T], Any]])[source]#
Represents a grouped dataset created by calling
Dataset.groupby()
.The actual groupby is deferred until an aggregation is applied.
PublicAPI: This API is stable across Ray releases.
- aggregate(*aggs: ray.data.aggregate.AggregateFn) ray.data.dataset.Dataset[ray.data.block.U] [source]#
Implements an accumulator-based aggregation.
This is a blocking operation.
Examples
import ray from ray.data.aggregate import AggregateFn ds = ray.data.range(100) grouped_ds = ds.groupby(lambda x: x % 3) result = grouped_ds.aggregate(AggregateFn( init=lambda k: [], accumulate_row=lambda a, r: a + [r], merge=lambda a1, a2: a1 + a2, finalize=lambda a: a )) result.show()
(0, [0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99]) (1, [1, 4, 7, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 40, 43, 46, 49, 52, 55, 58, 61, 64, 67, 70, 73, 76, 79, 82, 85, 88, 91, 94, 97]) (2, [2, 5, 8, 11, 14, 17, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 50, 53, 56, 59, 62, 65, 68, 71, 74, 77, 80, 83, 86, 89, 92, 95, 98])
- Parameters
aggs – Aggregations to do.
- Returns
If the input dataset is simple dataset then the output is a simple dataset of
(k, v_1, ..., v_n)
tuples wherek
is the groupby key andv_i
is the result of the ith given aggregation. If the input dataset is an Arrow dataset then the output is an Arrow dataset ofn + 1
columns where the first column is the groupby key and the second throughn + 1
columns are the results of the aggregations. If groupby key isNone
then the key part of return is omitted.
- map_groups(fn: Union[type, Callable[[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]], Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'default', **ray_remote_args) Dataset[Any] [source]#
Apply the given function to each group of records of this dataset.
- While map_groups() is very flexible, note that it comes with downsides:
It may be slower than using more specific methods such as min(), max().
It requires that each group fits in memory on a single node.
In general, prefer to use aggregate() instead of map_groups().
This is a blocking operation.
Examples
>>> # Return a single record per group (list of multiple records in, >>> # list of a single record out). >>> import ray >>> import pandas as pd >>> import numpy as np >>> # Get median per group. Note that median is not an associative >>> # function so cannot be computed with aggregate(). >>> ds = ray.data.range(100) >>> ds.groupby(lambda x: x % 3).map_groups( ... lambda x: [np.median(x)]) >>> # Get first value per group. >>> ds = ray.data.from_items([ ... {"group": 1, "value": 1}, ... {"group": 1, "value": 2}, ... {"group": 2, "value": 3}, ... {"group": 2, "value": 4}]) >>> ds.groupby("group").map_groups( ... lambda g: [g["value"][0]])
>>> # Return multiple records per group (dataframe in, dataframe out). >>> df = pd.DataFrame( ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]} ... ) >>> ds = ray.data.from_pandas(df) >>> grouped = ds.groupby("A") >>> grouped.map_groups( ... lambda g: g.apply( ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c ... ) ... )
- Parameters
fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().
compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
batch_format – Specify “default” to use the default block format (promotes Arrow to pandas), “pandas” to select
pandas.DataFrame
as the batch format, or “pyarrow” to selectpyarrow.Table
.ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).
- Returns
The return type is determined by the return type of
fn
, and the return value is combined from results of all groups.
- count() ray.data.dataset.Dataset[ray.data.block.U] [source]#
Compute count aggregation.
This is a blocking operation.
Examples
>>> import ray >>> ray.data.range(100).groupby(lambda x: x % 3).count() >>> ray.data.from_items([ ... {"A": x % 3, "B": x} for x in range(100)]).groupby( ... "A").count()
- Returns
A simple dataset of
(k, v)
pairs or an Arrow dataset of[k, v]
columns wherek
is the groupby key andv
is the number of rows with that key. If groupby key isNone
then the key part of return is omitted.
- sum(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U] [source]#
Compute grouped sum aggregation.
This is a blocking operation.
Examples
>>> import ray >>> ray.data.range(100).groupby(lambda x: x % 3).sum() >>> ray.data.from_items([ ... (i % 3, i, i**2) ... for i in range(100)]) \ ... .groupby(lambda x: x[0] % 3) \ ... .sum(lambda x: x[2]) >>> ray.data.range_table(100).groupby("value").sum() >>> ray.data.from_items([ ... {"A": i % 3, "B": i, "C": i**2} ... for i in range(100)]) \ ... .groupby("A") \ ... .sum(["B", "C"])
- Parameters
on –
The data subset on which to compute the sum.
For a simple dataset: it can be a callable or a list thereof, and the default is to take a sum of all rows.
For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise sum of all columns.
ignore_nulls – Whether to ignore null values. If
True
, null values will be ignored when computing the sum; ifFalse
, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default isTrue
.
- Returns
The sum result.
For a simple dataset, the output is:
on=None
: a simple dataset of(k, sum)
tuples wherek
is the groupby key andsum
is sum of all rows in that group.on=[callable_1, ..., callable_n]
: a simple dataset of(k, sum_1, ..., sum_n)
tuples wherek
is the groupby key andsum_i
is sum of the outputs of the ith callable called on each row in that group.
For an Arrow dataset, the output is:
on=None
: an Arrow dataset containing a groupby key column,"k"
, and a column-wise sum column for each original column in the dataset.on=["col_1", ..., "col_n"]
: an Arrow dataset ofn + 1
columns where the first column is the groupby key and the second throughn + 1
columns are the results of the aggregations.
If groupby key is
None
then the key part of return is omitted.
- min(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U] [source]#
Compute grouped min aggregation.
This is a blocking operation.
Examples
>>> import ray >>> ray.data.range(100).groupby(lambda x: x % 3).min() >>> ray.data.from_items([ ... (i % 3, i, i**2) ... for i in range(100)]) \ ... .groupby(lambda x: x[0] % 3) \ ... .min(lambda x: x[2]) >>> ray.data.range_table(100).groupby("value").min() >>> ray.data.from_items([ ... {"A": i % 3, "B": i, "C": i**2} ... for i in range(100)]) \ ... .groupby("A") \ ... .min(["B", "C"])
- Parameters
on –
The data subset on which to compute the min.
For a simple dataset: it can be a callable or a list thereof, and the default is to take a min of all rows.
For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise min of all columns.
ignore_nulls – Whether to ignore null values. If
True
, null values will be ignored when computing the min; ifFalse
, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default isTrue
.
- Returns
The min result.
For a simple dataset, the output is:
on=None
: a simple dataset of(k, min)
tuples wherek
is the groupby key and min is min of all rows in that group.on=[callable_1, ..., callable_n]
: a simple dataset of(k, min_1, ..., min_n)
tuples wherek
is the groupby key andmin_i
is min of the outputs of the ith callable called on each row in that group.
For an Arrow dataset, the output is:
on=None
: an Arrow dataset containing a groupby key column,"k"
, and a column-wise min column for each original column in the dataset.on=["col_1", ..., "col_n"]
: an Arrow dataset ofn + 1
columns where the first column is the groupby key and the second throughn + 1
columns are the results of the aggregations.
If groupby key is
None
then the key part of return is omitted.
- max(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U] [source]#
Compute grouped max aggregation.
This is a blocking operation.
Examples
>>> import ray >>> ray.data.range(100).groupby(lambda x: x % 3).max() >>> ray.data.from_items([ ... (i % 3, i, i**2) ... for i in range(100)]) \ ... .groupby(lambda x: x[0] % 3) \ ... .max(lambda x: x[2]) >>> ray.data.range_table(100).groupby("value").max() >>> ray.data.from_items([ ... {"A": i % 3, "B": i, "C": i**2} ... for i in range(100)]) \ ... .groupby("A") \ ... .max(["B", "C"])
- Parameters
on –
The data subset on which to compute the max.
For a simple dataset: it can be a callable or a list thereof, and the default is to take a max of all rows.
For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise max of all columns.
ignore_nulls – Whether to ignore null values. If
True
, null values will be ignored when computing the max; ifFalse
, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default isTrue
.
- Returns
The max result.
For a simple dataset, the output is:
on=None
: a simple dataset of(k, max)
tuples wherek
is the groupby key andmax
is max of all rows in that group.on=[callable_1, ..., callable_n]
: a simple dataset of(k, max_1, ..., max_n)
tuples wherek
is the groupby key andmax_i
is max of the outputs of the ith callable called on each row in that group.
For an Arrow dataset, the output is:
on=None
: an Arrow dataset containing a groupby key column,"k"
, and a column-wise max column for each original column in the dataset.on=["col_1", ..., "col_n"]
: an Arrow dataset ofn + 1
columns where the first column is the groupby key and the second throughn + 1
columns are the results of the aggregations.
If groupby key is
None
then the key part of return is omitted.
- mean(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U] [source]#
Compute grouped mean aggregation.
This is a blocking operation.
Examples
>>> import ray >>> ray.data.range(100).groupby(lambda x: x % 3).mean() >>> ray.data.from_items([ ... (i % 3, i, i**2) ... for i in range(100)]) \ ... .groupby(lambda x: x[0] % 3) \ ... .mean(lambda x: x[2]) >>> ray.data.range_table(100).groupby("value").mean() >>> ray.data.from_items([ ... {"A": i % 3, "B": i, "C": i**2} ... for i in range(100)]) \ ... .groupby("A") \ ... .mean(["B", "C"])
- Parameters
on –
The data subset on which to compute the mean.
For a simple dataset: it can be a callable or a list thereof, and the default is to take a mean of all rows.
For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise mean of all columns.
ignore_nulls – Whether to ignore null values. If
True
, null values will be ignored when computing the mean; ifFalse
, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default isTrue
.
- Returns
The mean result.
For a simple dataset, the output is:
on=None
: a simple dataset of(k, mean)
tuples wherek
is the groupby key andmean
is mean of all rows in that group.on=[callable_1, ..., callable_n]
: a simple dataset of(k, mean_1, ..., mean_n)
tuples wherek
is the groupby key andmean_i
is mean of the outputs of the ith callable called on each row in that group.
For an Arrow dataset, the output is:
on=None
: an Arrow dataset containing a groupby key column,"k"
, and a column-wise mean column for each original column in the dataset.on=["col_1", ..., "col_n"]
: an Arrow dataset ofn + 1
columns where the first column is the groupby key and the second throughn + 1
columns are the results of the aggregations.
If groupby key is
None
then the key part of return is omitted.
- std(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ddof: int = 1, ignore_nulls: bool = True) ray.data.dataset.Dataset[ray.data.block.U] [source]#
Compute grouped standard deviation aggregation.
This is a blocking operation.
Examples
>>> import ray >>> ray.data.range(100).groupby(lambda x: x % 3).std() >>> ray.data.from_items([ ... (i % 3, i, i**2) ... for i in range(100)]) \ ... .groupby(lambda x: x[0] % 3) \ ... .std(lambda x: x[2]) >>> ray.data.range_table(100).groupby("value").std(ddof=0) >>> ray.data.from_items([ ... {"A": i % 3, "B": i, "C": i**2} ... for i in range(100)]) \ ... .groupby("A") \ ... .std(["B", "C"])
NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm
- Parameters
on –
The data subset on which to compute the std.
For a simple dataset: it can be a callable or a list thereof, and the default is to take a std of all rows.
For an Arrow dataset: it can be a column name or a list thereof, and the default is to do a column-wise std of all columns.
ddof – Delta Degrees of Freedom. The divisor used in calculations is
N - ddof
, whereN
represents the number of elements.ignore_nulls – Whether to ignore null values. If
True
, null values will be ignored when computing the std; ifFalse
, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default isTrue
.
- Returns
The standard deviation result.
For a simple dataset, the output is:
on=None
: a simple dataset of(k, std)
tuples wherek
is the groupby key andstd
is std of all rows in that group.on=[callable_1, ..., callable_n]
: a simple dataset of(k, std_1, ..., std_n)
tuples wherek
is the groupby key andstd_i
is std of the outputs of the ith callable called on each row in that group.
For an Arrow dataset, the output is:
on=None
: an Arrow dataset containing a groupby key column,"k"
, and a column-wise std column for each original column in the dataset.on=["col_1", ..., "col_n"]
: an Arrow dataset ofn + 1
columns where the first column is the groupby key and the second throughn + 1
columns are the results of the aggregations.
If groupby key is
None
then the key part of return is omitted.
Aggregations#
- class ray.data.aggregate.AggregateFn(init: Callable[[ray.data.block.KeyType], ray.data.block.AggType], merge: Callable[[ray.data.block.AggType, ray.data.block.AggType], ray.data.block.AggType], accumulate_row: Callable[[ray.data.block.AggType, ray.data.block.T], ray.data.block.AggType] = None, accumulate_block: Callable[[ray.data.block.AggType, Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]], ray.data.block.AggType] = None, finalize: Callable[[ray.data.block.AggType], ray.data.block.U] = <function AggregateFn.<lambda>>, name: Optional[str] = None)[source]#
PublicAPI: This API is stable across Ray releases.
- class ray.data.aggregate.Count[source]#
Defines count aggregation.
PublicAPI: This API is stable across Ray releases.
- class ray.data.aggregate.Sum(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]#
Defines sum aggregation.
PublicAPI: This API is stable across Ray releases.
- class ray.data.aggregate.Max(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]#
Defines max aggregation.
PublicAPI: This API is stable across Ray releases.
- class ray.data.aggregate.Mean(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ignore_nulls: bool = True)[source]#
Defines mean aggregation.
PublicAPI: This API is stable across Ray releases.
- class ray.data.aggregate.Std(on: Union[None, str, Callable[[ray.data.block.T], Any]] = None, ddof: int = 1, ignore_nulls: bool = True)[source]#
Defines standard deviation aggregation.
Uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to its numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm
PublicAPI: This API is stable across Ray releases.