Source code for ray.data.namespace_expressions.dt_namespace
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal
import pyarrow.compute as pc
from ray.data.datatype import DataType
from ray.data.expressions import _create_pyarrow_compute_udf
if TYPE_CHECKING:
from ray.data.expressions import Expr, PyArrowComputeUDFExpr
TemporalUnit = Literal[
"year",
"quarter",
"month",
"week",
"day",
"hour",
"minute",
"second",
"millisecond",
"microsecond",
"nanosecond",
]
[docs]
@dataclass
class _DatetimeNamespace:
"""Datetime namespace for operations on datetime-typed expression columns."""
_expr: "Expr"
# extractors
[docs]
def year(self) -> "PyArrowComputeUDFExpr":
"""Extract year component."""
return _create_pyarrow_compute_udf(pc.year, DataType.int32())(self._expr)
[docs]
def month(self) -> "PyArrowComputeUDFExpr":
"""Extract month component."""
return _create_pyarrow_compute_udf(pc.month, DataType.int32())(self._expr)
[docs]
def day(self) -> "PyArrowComputeUDFExpr":
"""Extract day component."""
return _create_pyarrow_compute_udf(pc.day, DataType.int32())(self._expr)
[docs]
def hour(self) -> "PyArrowComputeUDFExpr":
"""Extract hour component."""
return _create_pyarrow_compute_udf(pc.hour, DataType.int32())(self._expr)
[docs]
def minute(self) -> "PyArrowComputeUDFExpr":
"""Extract minute component."""
return _create_pyarrow_compute_udf(pc.minute, DataType.int32())(self._expr)
[docs]
def second(self) -> "PyArrowComputeUDFExpr":
"""Extract second component."""
return _create_pyarrow_compute_udf(pc.second, DataType.int32())(self._expr)
# formatting
[docs]
def strftime(self, fmt: str) -> "PyArrowComputeUDFExpr":
"""Format timestamps with a strftime pattern."""
return _create_pyarrow_compute_udf(pc.strftime, DataType.string())(
self._expr, format=fmt
)
# rounding
[docs]
def ceil(self, unit: TemporalUnit) -> "PyArrowComputeUDFExpr":
"""Ceil timestamps to the next multiple of the given unit."""
return _create_pyarrow_compute_udf(pc.ceil_temporal, self._expr.data_type)(
self._expr, multiple=1, unit=unit
)
[docs]
def floor(self, unit: TemporalUnit) -> "PyArrowComputeUDFExpr":
"""Floor timestamps to the previous multiple of the given unit."""
return _create_pyarrow_compute_udf(pc.floor_temporal, self._expr.data_type)(
self._expr, multiple=1, unit=unit
)
[docs]
def round(self, unit: TemporalUnit) -> "PyArrowComputeUDFExpr":
"""Round timestamps to the nearest multiple of the given unit."""
return _create_pyarrow_compute_udf(pc.round_temporal, self._expr.data_type)(
self._expr, multiple=1, unit=unit
)