ray.data.aggregate.AggregateFn#
- class ray.data.aggregate.AggregateFn(init: Callable[[KeyType], AggType], merge: Callable[[AggType, AggType], AggType], name: str, accumulate_row: Callable[[AggType, T], AggType] = None, accumulate_block: Callable[[AggType, pyarrow.Table | pandas.DataFrame], AggType] = None, finalize: Callable[[AggType], U] | None = None)[source]#
NOTE: THIS IS DEPRECATED, PLEASE USE AggregateFnV2 INSTEAD
Defines how to perform a custom aggregation in Ray Data.
AggregateFn
instances are passed to a Dataset’s.aggregate(...)
method to specify the steps required to transform and combine rows sharing the same key. This enables implementing custom aggregators beyond the standard built-in options like Sum, Min, Max, Mean, etc.- Parameters:
init – Function that creates an initial aggregator for each group. Receives a key (the group key) and returns the initial accumulator state (commonly 0, an empty list, or an empty dictionary).
merge – Function that merges two accumulators generated by different workers into one accumulator.
name – An optional display name for the aggregator. Useful for debugging.
accumulate_row – Function that processes an individual row. It receives the current accumulator and a row, then returns an updated accumulator. Cannot be used if
accumulate_block
is provided.accumulate_block – Function that processes an entire block of rows at once. It receives the current accumulator and a block of rows, then returns an updated accumulator. This allows for vectorized operations. Cannot be used if
accumulate_row
is provided.finalize – Function that finishes the aggregation by transforming the final accumulator state into the desired output. For example, if your accumulator is a list of items, you may want to compute a statistic from the list. If not provided, the final accumulator state is returned as-is.
Example
import ray from ray.data.aggregate import AggregateFn # A simple aggregator that counts how many rows there are per group count_agg = AggregateFn( init=lambda k: 0, accumulate_row=lambda counter, row: counter + 1, merge=lambda c1, c2: c1 + c2, name="custom_count" ) ds = ray.data.from_items([{"group": "A"}, {"group": "B"}, {"group": "A"}]) result = ds.groupby("group").aggregate(count_agg).take_all() # result: [{'group': 'A', 'custom_count': 2}, {'group': 'B', 'custom_count': 1}]
Warning
DEPRECATED: This API is deprecated and may be removed in future Ray releases. AggregateFn is deprecated, please use AggregateFnV2
Methods