ray.data.aggregate.AggregateFnV2#

class ray.data.aggregate.AggregateFnV2(name: str, zero_factory: Callable[[], AccumulatorType], *, on: str | None, ignore_nulls: bool)[source]#

Bases: AggregateFn, ABC, Generic[AccumulatorType, AggOutputType]

Provides an interface to implement efficient aggregations to be applied to the dataset.

AggregateFnV2 instances are passed to a Dataset’s .aggregate(...) method to perform distributed aggregations. To create a custom aggregation, you should subclass AggregateFnV2 and implement the aggregate_block and combine methods. The finalize method can also be overridden if the final accumulated state needs further transformation.

Aggregation follows these steps:

  1. Initialization: For each group (if grouping) or for the entire dataset, an initial accumulator is created using zero_factory.

  2. Block Aggregation: The aggregate_block method is applied to each block independently, producing a partial aggregation result for that block.

  3. Combination: The combine method is used to merge these partial results (or an existing accumulated result with a new partial result) into a single, combined accumulator.

  4. Finalization: Optionally, the finalize method transforms the final combined accumulator into the desired output format.

Generic Type Parameters:

This class is parameterized by two type variables:

  • AccumulatorType: The type of the intermediate state (accumulator) used during aggregation. This is what aggregate_block returns, what combine takes as inputs and returns, and what finalize receives. For simple aggregations like Sum, this might just be a numeric type. For more complex aggregations like Mean, this could be a composite type like List[Union[int, float]] representing [sum, count].

  • AggOutputType: The type of the final result after finalize is called. This is what gets written to the output dataset. For Sum, this is the same as the accumulator type (a number). For Mean, the accumulator is [sum, count] but the output is a single float (the computed mean).

Examples of type parameterization in built-in aggregations:

Count(AggregateFnV2[int, int])               # accumulator: int, output: int
Sum(AggregateFnV2[Union[int, float], ...])   # accumulator: number, output: number
Mean(AggregateFnV2[List[...], float])        # accumulator: [sum, count], output: float
Std(AggregateFnV2[List[...], float])         # accumulator: [M2, mean, count], output: float
Parameters:
  • name – The name of the aggregation. This will be used as the column name in the output, e.g., “sum(my_col)”.

  • zero_factory – A callable that returns the initial “zero” value for the accumulator. For example, for a sum, this would be lambda: 0; for finding a minimum, lambda: float("inf"), for finding a maximum, lambda: float("-inf").

  • on – The name of the column to perform the aggregation on. If None, the aggregation is performed over the entire row (e.g., for Count()).

  • ignore_nulls – Whether to ignore null values during aggregation. If True, nulls are skipped. If False, the presence of a null value might result in a null output, depending on the aggregation logic.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

Methods

aggregate_block

Aggregates data within a single block.

combine

Combines a new partial aggregation result with the current accumulator.

finalize

Transforms the final accumulated state into the desired output.

get_agg_name

Return the agg name (e.g., 'sum', 'mean', 'count').