Source code for ray.data.expressions

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any

from ray.util.annotations import DeveloperAPI, PublicAPI


@DeveloperAPI(stability="alpha")
class Operation(Enum):
    """Enumeration of supported operations in expressions.

    This enum defines all the binary operations that can be performed
    between expressions, including arithmetic, comparison, and boolean operations.

    Attributes:
        ADD: Addition operation (+)
        SUB: Subtraction operation (-)
        MUL: Multiplication operation (*)
        DIV: Division operation (/)
        GT: Greater than comparison (>)
        LT: Less than comparison (<)
        GE: Greater than or equal comparison (>=)
        LE: Less than or equal comparison (<=)
        EQ: Equality comparison (==)
        AND: Logical AND operation (&)
        OR: Logical OR operation (|)
    """

    ADD = "add"
    SUB = "sub"
    MUL = "mul"
    DIV = "div"
    GT = "gt"
    LT = "lt"
    GE = "ge"
    LE = "le"
    EQ = "eq"
    AND = "and"
    OR = "or"


[docs] @DeveloperAPI(stability="alpha") @dataclass(frozen=True) class Expr(ABC): """Base class for all expression nodes. This is the abstract base class that all expression types inherit from. It provides operator overloads for building complex expressions using standard Python operators. Expressions form a tree structure where each node represents an operation or value. The tree can be evaluated against data batches to compute results. Example: >>> from ray.data.expressions import col, lit >>> # Create an expression tree: (col("x") + 5) * col("y") >>> expr = (col("x") + lit(5)) * col("y") >>> # This creates a BinaryExpr with operation=MUL >>> # left=BinaryExpr(op=ADD, left=ColumnExpr("x"), right=LiteralExpr(5)) >>> # right=ColumnExpr("y") Note: This class should not be instantiated directly. Use the concrete subclasses like ColumnExpr, LiteralExpr, etc. """
[docs] @abstractmethod def structurally_equals(self, other: Any) -> bool: """Compare two expression ASTs for structural equality.""" raise NotImplementedError
def _bin(self, other: Any, op: Operation) -> "Expr": """Create a binary expression with the given operation. Args: other: The right operand expression or literal value op: The operation to perform Returns: A new BinaryExpr representing the operation Note: If other is not an Expr, it will be automatically converted to a LiteralExpr. """ if not isinstance(other, Expr): other = LiteralExpr(other) return BinaryExpr(op, self, other) # arithmetic def __add__(self, other: Any) -> "Expr": """Addition operator (+).""" return self._bin(other, Operation.ADD) def __radd__(self, other: Any) -> "Expr": """Reverse addition operator (for literal + expr).""" return LiteralExpr(other)._bin(self, Operation.ADD) def __sub__(self, other: Any) -> "Expr": """Subtraction operator (-).""" return self._bin(other, Operation.SUB) def __rsub__(self, other: Any) -> "Expr": """Reverse subtraction operator (for literal - expr).""" return LiteralExpr(other)._bin(self, Operation.SUB) def __mul__(self, other: Any) -> "Expr": """Multiplication operator (*).""" return self._bin(other, Operation.MUL) def __rmul__(self, other: Any) -> "Expr": """Reverse multiplication operator (for literal * expr).""" return LiteralExpr(other)._bin(self, Operation.MUL) def __truediv__(self, other: Any) -> "Expr": """Division operator (/).""" return self._bin(other, Operation.DIV) def __rtruediv__(self, other: Any) -> "Expr": """Reverse division operator (for literal / expr).""" return LiteralExpr(other)._bin(self, Operation.DIV) # comparison def __gt__(self, other: Any) -> "Expr": """Greater than operator (>).""" return self._bin(other, Operation.GT) def __lt__(self, other: Any) -> "Expr": """Less than operator (<).""" return self._bin(other, Operation.LT) def __ge__(self, other: Any) -> "Expr": """Greater than or equal operator (>=).""" return self._bin(other, Operation.GE) def __le__(self, other: Any) -> "Expr": """Less than or equal operator (<=).""" return self._bin(other, Operation.LE) def __eq__(self, other: Any) -> "Expr": """Equality operator (==).""" return self._bin(other, Operation.EQ) # boolean def __and__(self, other: Any) -> "Expr": """Logical AND operator (&).""" return self._bin(other, Operation.AND) def __or__(self, other: Any) -> "Expr": """Logical OR operator (|).""" return self._bin(other, Operation.OR)
[docs] @DeveloperAPI(stability="alpha") @dataclass(frozen=True, eq=False) class ColumnExpr(Expr): """Expression that references a column by name. This expression type represents a reference to an existing column in the dataset. When evaluated, it returns the values from the specified column. Args: name: The name of the column to reference Example: >>> from ray.data.expressions import col >>> # Reference the "age" column >>> age_expr = col("age") # Creates ColumnExpr(name="age") """ name: str def structurally_equals(self, other: Any) -> bool: return isinstance(other, ColumnExpr) and self.name == other.name
[docs] @DeveloperAPI(stability="alpha") @dataclass(frozen=True, eq=False) class LiteralExpr(Expr): """Expression that represents a constant scalar value. This expression type represents a literal value that will be broadcast to all rows when evaluated. The value can be any Python object. Args: value: The constant value to represent Example: >>> from ray.data.expressions import lit >>> # Create a literal value >>> five = lit(5) # Creates LiteralExpr(value=5) >>> name = lit("John") # Creates LiteralExpr(value="John") """ value: Any def structurally_equals(self, other: Any) -> bool: return ( isinstance(other, LiteralExpr) and self.value == other.value and type(self.value) is type(other.value) )
[docs] @DeveloperAPI(stability="alpha") @dataclass(frozen=True, eq=False) class BinaryExpr(Expr): """Expression that represents a binary operation between two expressions. This expression type represents an operation with two operands (left and right). The operation is specified by the `op` field, which must be one of the supported operations from the Operation enum. Args: op: The operation to perform (from Operation enum) left: The left operand expression right: The right operand expression Example: >>> from ray.data.expressions import col, lit, Operation >>> # Manually create a binary expression (usually done via operators) >>> expr = BinaryExpr(Operation.ADD, col("x"), lit(5)) >>> # This is equivalent to: col("x") + lit(5) """ op: Operation left: Expr right: Expr def structurally_equals(self, other: Any) -> bool: return ( isinstance(other, BinaryExpr) and self.op is other.op and self.left.structurally_equals(other.left) and self.right.structurally_equals(other.right) )
[docs] @PublicAPI(stability="beta") def col(name: str) -> ColumnExpr: """ Reference an existing column by name. This is the primary way to reference columns in expressions. The returned expression will extract values from the specified column when evaluated. Args: name: The name of the column to reference Returns: A ColumnExpr that references the specified column Example: >>> from ray.data.expressions import col >>> # Reference columns in an expression >>> expr = col("price") * col("quantity") >>> >>> # Use with Dataset.with_columns() >>> import ray >>> ds = ray.data.from_items([{"price": 10, "quantity": 2}]) >>> ds = ds.with_columns({"total": col("price") * col("quantity")}) """ return ColumnExpr(name)
[docs] @PublicAPI(stability="beta") def lit(value: Any) -> LiteralExpr: """ Create a literal expression from a constant value. This creates an expression that represents a constant scalar value. The value will be broadcast to all rows when the expression is evaluated. Args: value: The constant value to represent. Can be any Python object (int, float, str, bool, etc.) Returns: A LiteralExpr containing the specified value Example: >>> from ray.data.expressions import col, lit >>> # Create literals of different types >>> five = lit(5) >>> pi = lit(3.14159) >>> name = lit("Alice") >>> flag = lit(True) >>> >>> # Use in expressions >>> expr = col("age") + lit(1) # Add 1 to age column >>> >>> # Use with Dataset.with_columns() >>> import ray >>> ds = ray.data.from_items([{"age": 25}, {"age": 30}]) >>> ds = ds.with_columns({"age_plus_one": col("age") + lit(1)}) """ return LiteralExpr(value)
# ────────────────────────────────────── # Public API for evaluation # ────────────────────────────────────── # Note: Implementation details are in _expression_evaluator.py # Re-export eval_expr for public use __all__ = [ "Operation", "Expr", "ColumnExpr", "LiteralExpr", "BinaryExpr", "col", "lit", ]