Source code for ray.data.namespace_expressions.dt_namespace
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal
import pyarrow
import pyarrow.compute as pc
from ray.data.datatype import DataType
from ray.data.expressions import pyarrow_udf
if TYPE_CHECKING:
from ray.data.expressions import Expr, UDFExpr
TemporalUnit = Literal[
"year",
"quarter",
"month",
"week",
"day",
"hour",
"minute",
"second",
"millisecond",
"microsecond",
"nanosecond",
]
[docs]
@dataclass
class _DatetimeNamespace:
"""Datetime namespace for operations on datetime-typed expression columns."""
_expr: "Expr"
def _unary_temporal_int(
self, func: Callable[[pyarrow.Array], pyarrow.Array]
) -> "UDFExpr":
"""Helper for year/month/… that return int32."""
@pyarrow_udf(return_dtype=DataType.int32())
def _udf(arr: pyarrow.Array) -> pyarrow.Array:
return func(arr)
return _udf(self._expr)
# extractors
[docs]
def year(self) -> "UDFExpr":
"""Extract year component."""
return self._unary_temporal_int(pc.year)
[docs]
def month(self) -> "UDFExpr":
"""Extract month component."""
return self._unary_temporal_int(pc.month)
[docs]
def day(self) -> "UDFExpr":
"""Extract day component."""
return self._unary_temporal_int(pc.day)
[docs]
def hour(self) -> "UDFExpr":
"""Extract hour component."""
return self._unary_temporal_int(pc.hour)
[docs]
def minute(self) -> "UDFExpr":
"""Extract minute component."""
return self._unary_temporal_int(pc.minute)
[docs]
def second(self) -> "UDFExpr":
"""Extract second component."""
return self._unary_temporal_int(pc.second)
# formatting
[docs]
def strftime(self, fmt: str) -> "UDFExpr":
"""Format timestamps with a strftime pattern."""
@pyarrow_udf(return_dtype=DataType.string())
def _format(arr: pyarrow.Array) -> pyarrow.Array:
return pc.strftime(arr, format=fmt)
return _format(self._expr)
# rounding
[docs]
def ceil(self, unit: TemporalUnit) -> "UDFExpr":
"""Ceil timestamps to the next multiple of the given unit."""
return_dtype = self._expr.data_type
@pyarrow_udf(return_dtype=return_dtype)
def _ceil(arr: pyarrow.Array) -> pyarrow.Array:
return pc.ceil_temporal(arr, multiple=1, unit=unit)
return _ceil(self._expr)
[docs]
def floor(self, unit: TemporalUnit) -> "UDFExpr":
"""Floor timestamps to the previous multiple of the given unit."""
return_dtype = self._expr.data_type
@pyarrow_udf(return_dtype=return_dtype)
def _floor(arr: pyarrow.Array) -> pyarrow.Array:
return pc.floor_temporal(arr, multiple=1, unit=unit)
return _floor(self._expr)
[docs]
def round(self, unit: TemporalUnit) -> "UDFExpr":
"""Round timestamps to the nearest multiple of the given unit."""
return_dtype = self._expr.data_type
@pyarrow_udf(return_dtype=return_dtype)
def _round(arr: pyarrow.Array) -> pyarrow.Array:
return pc.round_temporal(arr, multiple=1, unit=unit)
return _round(self._expr)