Source code for ray.util.metrics

import logging

from typing import Dict, Any, List, Optional, Tuple, Union

from ray._raylet import (
    Sum as CythonCount,
    Histogram as CythonHistogram,
    Gauge as CythonGauge,
)  # noqa: E402

# Sum is used for CythonCount because it allows incrementing by positive
# values that are different from one.
from ray.util.annotations import DeveloperAPI

logger = logging.getLogger(__name__)


@DeveloperAPI
class Metric:
    """The parent class of custom metrics.

    Ray's custom metrics APIs are rooted from this class and share
    the same public methods.
    """

    def __init__(
        self,
        name: str,
        description: str = "",
        tag_keys: Optional[Tuple[str, ...]] = None,
    ):
        if len(name) == 0:
            raise ValueError("Empty name is not allowed. Please provide a metric name.")
        self._name = name
        self._description = description
        # The default tags key-value pair.
        self._default_tags = {}
        # Keys of tags.
        self._tag_keys = tag_keys or tuple()
        # The Cython metric class. This should be set in the child class.
        self._metric = None

        if not isinstance(self._tag_keys, tuple):
            raise TypeError(
                "tag_keys should be a tuple type, got: " f"{type(self._tag_keys)}"
            )

        for key in self._tag_keys:
            if not isinstance(key, str):
                raise TypeError(f"Tag keys must be str, got {type(key)}.")

    def set_default_tags(self, default_tags: Dict[str, str]):
        """Set default tags of metrics.

        Example:
            >>> from ray.util.metrics import Counter
            >>> # Note that set_default_tags returns the instance itself.
            >>> counter = Counter("name", tag_keys=("a",))
            >>> counter2 = counter.set_default_tags({"a": "b"})
            >>> assert counter is counter2
            >>> # this means you can instantiate it in this way.
            >>> counter = Counter("name", tag_keys=("a",)).set_default_tags({"a": "b"})

        Args:
            default_tags: Default tags that are
                used for every record method.

        Returns:
            Metric: it returns the instance itself.
        """
        for key, val in default_tags.items():
            if key not in self._tag_keys:
                raise ValueError(f"Unrecognized tag key {key}.")
            if not isinstance(val, str):
                raise TypeError(f"Tag values must be str, got {type(val)}.")

        self._default_tags = default_tags
        return self

    def _record(
        self,
        value: Union[int, float],
        tags: Optional[Dict[str, str]] = None,
    ) -> None:
        """Record the metric point of the metric.

        Tags passed in will take precedence over the metric's default tags.

        Args:
            value: The value to be recorded as a metric point.
        """
        assert self._metric is not None

        final_tags = self._get_final_tags(tags)
        self._validate_tags(final_tags)
        self._metric.record(value, tags=final_tags)

    def _get_final_tags(self, tags):
        if not tags:
            return self._default_tags

        for val in tags.values():
            if not isinstance(val, str):
                raise TypeError(f"Tag values must be str, got {type(val)}.")

        return {**self._default_tags, **tags}

    def _validate_tags(self, final_tags):
        missing_tags = []
        for tag_key in self._tag_keys:
            # Prefer passed tags over default tags.
            if tag_key not in final_tags:
                missing_tags.append(tag_key)

        if missing_tags:
            raise ValueError(f"Missing value for tag key(s): {','.join(missing_tags)}.")

    @property
    def info(self) -> Dict[str, Any]:
        """Return the information of this metric.

        Example:
            >>> from ray.util.metrics import Counter
            >>> counter = Counter("name", description="desc")
            >>> print(counter.info)
            {'name': 'name', 'description': 'desc', 'tag_keys': (), 'default_tags': {}}
        """
        return {
            "name": self._name,
            "description": self._description,
            "tag_keys": self._tag_keys,
            "default_tags": self._default_tags,
        }


[docs] @DeveloperAPI class Counter(Metric): """A cumulative metric that is monotonically increasing. This corresponds to Prometheus' counter metric: https://prometheus.io/docs/concepts/metric_types/#counter Before Ray 2.10, this exports a Prometheus gauge metric instead of a counter metric, which is wrong. Since 2.10, this exports both counter (with a suffix "_total") and gauge metrics (for bug compatibility). Use `RAY_EXPORT_COUNTER_AS_GAUGE=0` to disable exporting the gauge metric. Args: name: Name of the metric. description: Description of the metric. tag_keys: Tag keys of the metric. """ def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None, ): super().__init__(name, description, tag_keys) self._metric = CythonCount(self._name, self._description, self._tag_keys) def __reduce__(self): deserializer = self.__class__ serialized_data = (self._name, self._description, self._tag_keys) return deserializer, serialized_data
[docs] def inc(self, value: Union[int, float] = 1.0, tags: Dict[str, str] = None): """Increment the counter by `value` (defaults to 1). Tags passed in will take precedence over the metric's default tags. Args: value(int, float): Value to increment the counter by (default=1). tags(Dict[str, str]): Tags to set or override for this counter. """ if not isinstance(value, (int, float)): raise TypeError(f"value must be int or float, got {type(value)}.") if value <= 0: raise ValueError(f"value must be >0, got {value}") self._record(value, tags=tags)
[docs] @DeveloperAPI class Histogram(Metric): """Tracks the size and number of events in buckets. Histograms allow you to calculate aggregate quantiles such as 25, 50, 95, 99 percentile latency for an RPC. This corresponds to Prometheus' histogram metric: https://prometheus.io/docs/concepts/metric_types/#histogram Args: name: Name of the metric. description: Description of the metric. boundaries: Boundaries of histogram buckets. tag_keys: Tag keys of the metric. """ def __init__( self, name: str, description: str = "", boundaries: List[float] = None, tag_keys: Optional[Tuple[str, ...]] = None, ): super().__init__(name, description, tag_keys) if boundaries is None or len(boundaries) == 0: raise ValueError( "boundaries argument should be provided when using " "the Histogram class. e.g., " 'Histogram("name", boundaries=[1.0, 2.0])' ) for i, boundary in enumerate(boundaries): if boundary <= 0: raise ValueError( "Invalid `boundaries` argument at index " f"{i}, {boundaries}. Use positive values for the arguments." ) self.boundaries = boundaries self._metric = CythonHistogram( self._name, self._description, self.boundaries, self._tag_keys )
[docs] def observe(self, value: Union[int, float], tags: Dict[str, str] = None): """Observe a given `value` and add it to the appropriate bucket. Tags passed in will take precedence over the metric's default tags. Args: value(int, float): Value to set the gauge to. tags(Dict[str, str]): Tags to set or override for this gauge. """ if not isinstance(value, (int, float)): raise TypeError(f"value must be int or float, got {type(value)}.") self._record(value, tags)
def __reduce__(self): deserializer = Histogram serialized_data = ( self._name, self._description, self.boundaries, self._tag_keys, ) return deserializer, serialized_data @property def info(self): """Return information about histogram metric.""" info = super().info info.update({"boundaries": self.boundaries}) return info
[docs] @DeveloperAPI class Gauge(Metric): """Gauges keep the last recorded value and drop everything before. Unlike counters, gauges can go up or down over time. This corresponds to Prometheus' gauge metric: https://prometheus.io/docs/concepts/metric_types/#gauge Args: name: Name of the metric. description: Description of the metric. tag_keys: Tag keys of the metric. """ def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None, ): super().__init__(name, description, tag_keys) self._metric = CythonGauge(self._name, self._description, self._tag_keys)
[docs] def set(self, value: Union[int, float], tags: Dict[str, str] = None): """Set the gauge to the given `value`. Tags passed in will take precedence over the metric's default tags. Args: value(int, float): Value to set the gauge to. tags(Dict[str, str]): Tags to set or override for this gauge. """ if not isinstance(value, (int, float)): raise TypeError(f"value must be int or float, got {type(value)}.") self._record(value, tags)
def __reduce__(self): deserializer = Gauge serialized_data = (self._name, self._description, self._tag_keys) return deserializer, serialized_data
__all__ = [ "Counter", "Histogram", "Gauge", ]