ray.data.grouped_dataset.GroupedDataset.aggregate#

GroupedDataset.aggregate(*aggs: ray.data.aggregate.AggregateFn) ray.data.dataset.Dataset[ray.data.block.U][source]#

Implements an accumulator-based aggregation.

This is a blocking operation.

Examples

import ray
from ray.data.aggregate import AggregateFn
ds = ray.data.range(100)
grouped_ds = ds.groupby(lambda x: x % 3)
result = grouped_ds.aggregate(AggregateFn(
    init=lambda k: [],
    accumulate_row=lambda a, r: a + [r],
    merge=lambda a1, a2: a1 + a2,
    finalize=lambda a: a
))
result.show()
(0, [0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99])
(1, [1, 4, 7, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 40, 43, 46, 49, 52, 55, 58, 61, 64, 67, 70, 73, 76, 79, 82, 85, 88, 91, 94, 97])
(2, [2, 5, 8, 11, 14, 17, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 50, 53, 56, 59, 62, 65, 68, 71, 74, 77, 80, 83, 86, 89, 92, 95, 98])
Parameters

aggs – Aggregations to do.

Returns

If the input dataset is simple dataset then the output is a simple dataset of (k, v_1, ..., v_n) tuples where k is the groupby key and v_i is the result of the ith given aggregation. If the input dataset is an Arrow dataset then the output is an Arrow dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations. If groupby key is None then the key part of return is omitted.