ray.data.aggregate.AggregateFn#

class ray.data.aggregate.AggregateFn(init: Callable[[KeyType], AggType], merge: Callable[[AggType, AggType], AggType], name: str, accumulate_row: Callable[[AggType, T], AggType] = None, accumulate_block: Callable[[AggType, pyarrow.Table | pandas.DataFrame], AggType] = None, finalize: Callable[[AggType], U] | None = None)[source]#

NOTE: THIS IS DEPRECATED, PLEASE USE AggregateFnV2 INSTEAD

Defines how to perform a custom aggregation in Ray Data.

AggregateFn instances are passed to a Dataset’s .aggregate(...) method to specify the steps required to transform and combine rows sharing the same key. This enables implementing custom aggregators beyond the standard built-in options like Sum, Min, Max, Mean, etc.

Parameters:
  • init – Function that creates an initial aggregator for each group. Receives a key (the group key) and returns the initial accumulator state (commonly 0, an empty list, or an empty dictionary).

  • merge – Function that merges two accumulators generated by different workers into one accumulator.

  • name – An optional display name for the aggregator. Useful for debugging.

  • accumulate_row – Function that processes an individual row. It receives the current accumulator and a row, then returns an updated accumulator. Cannot be used if accumulate_block is provided.

  • accumulate_block – Function that processes an entire block of rows at once. It receives the current accumulator and a block of rows, then returns an updated accumulator. This allows for vectorized operations. Cannot be used if accumulate_row is provided.

  • finalize – Function that finishes the aggregation by transforming the final accumulator state into the desired output. For example, if your accumulator is a list of items, you may want to compute a statistic from the list. If not provided, the final accumulator state is returned as-is.

Example

import ray
from ray.data.aggregate import AggregateFn

# A simple aggregator that counts how many rows there are per group
count_agg = AggregateFn(
    init=lambda k: 0,
    accumulate_row=lambda counter, row: counter + 1,
    merge=lambda c1, c2: c1 + c2,
    name="custom_count"
)
ds = ray.data.from_items([{"group": "A"}, {"group": "B"}, {"group": "A"}])
result = ds.groupby("group").aggregate(count_agg).take_all()
# result: [{'group': 'A', 'custom_count': 2}, {'group': 'B', 'custom_count': 1}]

Warning

DEPRECATED: This API is deprecated and may be removed in future Ray releases. AggregateFn is deprecated, please use AggregateFnV2

Methods