Source code for ray.util.metrics

import logging

from typing import Dict, Any, List, Optional, Tuple, Union

from ray._raylet import (
    Sum as CythonCount,
    Histogram as CythonHistogram,
    Gauge as CythonGauge,
)  # noqa: E402

# Sum is used for CythonCount because it allows incrementing by positive
# values that are different from one.
from ray.util.annotations import DeveloperAPI

logger = logging.getLogger(__name__)


@DeveloperAPI
class Metric:
    """The parent class of custom metrics.

    Ray's custom metrics APIs are rooted from this class and share
    the same public methods.
    """

    def __init__(
        self,
        name: str,
        description: str = "",
        tag_keys: Optional[Tuple[str, ...]] = None,
    ):
        if len(name) == 0:
            raise ValueError("Empty name is not allowed. Please provide a metric name.")
        self._name = name
        self._description = description
        # The default tags key-value pair.
        self._default_tags = {}
        # Keys of tags.
        self._tag_keys = tag_keys or tuple()
        # The Cython metric class. This should be set in the child class.
        self._metric = None

        if not isinstance(self._tag_keys, tuple):
            raise TypeError(
                "tag_keys should be a tuple type, got: " f"{type(self._tag_keys)}"
            )

        for key in self._tag_keys:
            if not isinstance(key, str):
                raise TypeError(f"Tag keys must be str, got {type(key)}.")

    def set_default_tags(self, default_tags: Dict[str, str]):
        """Set default tags of metrics.

        Example:
            >>> from ray.util.metrics import Counter
            >>> # Note that set_default_tags returns the instance itself.
            >>> counter = Counter("name", tag_keys=("a",))
            >>> counter2 = counter.set_default_tags({"a": "b"})
            >>> assert counter is counter2
            >>> # this means you can instantiate it in this way.
            >>> counter = Counter("name", tag_keys=("a",)).set_default_tags({"a": "b"})

        Args:
            default_tags: Default tags that are
                used for every record method.

        Returns:
            Metric: it returns the instance itself.
        """
        for key, val in default_tags.items():
            if key not in self._tag_keys:
                raise ValueError(f"Unrecognized tag key {key}.")
            if not isinstance(val, str):
                raise TypeError(f"Tag values must be str, got {type(val)}.")

        self._default_tags = default_tags
        return self

    def record(
        self, value: Union[int, float], tags: Dict[str, str] = None, _internal=False
    ) -> None:
        """Record the metric point of the metric.

        Tags passed in will take precedence over the metric's default tags.

        Args:
            value: The value to be recorded as a metric point.
        """
        assert self._metric is not None
        if isinstance(self._metric, CythonCount) and not _internal:
            logger.warning(
                "Counter.record() is deprecated in favor of "
                "Counter.inc() and will be removed in a future "
                "release. Please use Counter.inc() instead."
            )

        if isinstance(self._metric, CythonGauge) and not _internal:
            logger.warning(
                "Gauge.record() is deprecated in favor of "
                "Gauge.set() and will be removed in a future "
                "release. Please use Gauge.set() instead."
            )

        if isinstance(self._metric, CythonHistogram) and not _internal:
            logger.warning(
                "Histogram.record() is deprecated in favor of "
                "Histogram.observe() and will be removed in a "
                "future release. Please use Histogram.observe() "
                "instead."
            )

        if tags is not None:
            for val in tags.values():
                if not isinstance(val, str):
                    raise TypeError(f"Tag values must be str, got {type(val)}.")

        final_tags = {}
        tags_copy = tags.copy() if tags else {}
        for tag_key in self._tag_keys:
            # Prefer passed tags over default tags.
            if tags is not None and tag_key in tags:
                final_tags[tag_key] = tags_copy.pop(tag_key)
            elif tag_key in self._default_tags:
                final_tags[tag_key] = self._default_tags[tag_key]
            else:
                raise ValueError(f"Missing value for tag key {tag_key}.")

        if len(tags_copy) > 0:
            raise ValueError(f"Unrecognized tag keys: {list(tags_copy.keys())}.")

        self._metric.record(value, tags=final_tags)

    @property
    def info(self) -> Dict[str, Any]:
        """Return the information of this metric.

        Example:
            >>> from ray.util.metrics import Counter
            >>> counter = Counter("name", description="desc")
            >>> print(counter.info)
            {'name': 'name', 'description': 'desc', 'tag_keys': (), 'default_tags': {}}
        """
        return {
            "name": self._name,
            "description": self._description,
            "tag_keys": self._tag_keys,
            "default_tags": self._default_tags,
        }


[docs]@DeveloperAPI class Counter(Metric): """A cumulative metric that is monotonically increasing. This corresponds to Prometheus' counter metric: https://prometheus.io/docs/concepts/metric_types/#counter Args: name: Name of the metric. description: Description of the metric. tag_keys: Tag keys of the metric. """ def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None, ): super().__init__(name, description, tag_keys) self._metric = CythonCount(self._name, self._description, self._tag_keys) def __reduce__(self): deserializer = self.__class__ serialized_data = (self._name, self._description, self._tag_keys) return deserializer, serialized_data
[docs] def inc(self, value: Union[int, float] = 1.0, tags: Dict[str, str] = None): """Increment the counter by `value` (defaults to 1). Tags passed in will take precedence over the metric's default tags. Args: value(int, float): Value to increment the counter by (default=1). tags(Dict[str, str]): Tags to set or override for this counter. """ if not isinstance(value, (int, float)): raise TypeError(f"value must be int or float, got {type(value)}.") if value <= 0: raise ValueError(f"value must be >0, got {value}") self.record(value, tags=tags, _internal=True)
@DeveloperAPI class Count(Counter): """The count of the number of metric points. This corresponds to Prometheus' 'Count' metric. This class is DEPRECATED, please use ray.util.metrics.Counter instead. Args: name: Name of the metric. description: Description of the metric. tag_keys: Tag keys of the metric. """ def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None, ): logger.warning( "`metrics.Count` has been renamed to `metrics.Counter`. " "`metrics.Count` will be removed in a future release." ) super().__init__(name, description, tag_keys)
[docs]@DeveloperAPI class Histogram(Metric): """Tracks the size and number of events in buckets. Histograms allow you to calculate aggregate quantiles such as 25, 50, 95, 99 percentile latency for an RPC. This corresponds to Prometheus' histogram metric: https://prometheus.io/docs/concepts/metric_types/#histogram Args: name: Name of the metric. description: Description of the metric. boundaries: Boundaries of histogram buckets. tag_keys: Tag keys of the metric. """ def __init__( self, name: str, description: str = "", boundaries: List[float] = None, tag_keys: Optional[Tuple[str, ...]] = None, ): super().__init__(name, description, tag_keys) if boundaries is None or len(boundaries) == 0: raise ValueError( "boundaries argument should be provided when using " "the Histogram class. e.g., " 'Histogram("name", boundaries=[1.0, 2.0])' ) for i, boundary in enumerate(boundaries): if boundary <= 0: raise ValueError( "Invalid `boundaries` argument at index " f"{i}, {boundaries}. Use positive values for the arguments." ) self.boundaries = boundaries self._metric = CythonHistogram( self._name, self._description, self.boundaries, self._tag_keys )
[docs] def observe(self, value: Union[int, float], tags: Dict[str, str] = None): """Observe a given `value` and add it to the appropriate bucket. Tags passed in will take precedence over the metric's default tags. Args: value(int, float): Value to set the gauge to. tags(Dict[str, str]): Tags to set or override for this gauge. """ if not isinstance(value, (int, float)): raise TypeError(f"value must be int or float, got {type(value)}.") self.record(value, tags, _internal=True)
def __reduce__(self): deserializer = Histogram serialized_data = ( self._name, self._description, self.boundaries, self._tag_keys, ) return deserializer, serialized_data @property def info(self): """Return information about histogram metric.""" info = super().info info.update({"boundaries": self.boundaries}) return info
[docs]@DeveloperAPI class Gauge(Metric): """Gauges keep the last recorded value and drop everything before. Unlike counters, gauges can go up or down over time. This corresponds to Prometheus' gauge metric: https://prometheus.io/docs/concepts/metric_types/#gauge Args: name: Name of the metric. description: Description of the metric. tag_keys: Tag keys of the metric. """ def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str, ...]] = None, ): super().__init__(name, description, tag_keys) self._metric = CythonGauge(self._name, self._description, self._tag_keys)
[docs] def set(self, value: Union[int, float], tags: Dict[str, str] = None): """Set the gauge to the given `value`. Tags passed in will take precedence over the metric's default tags. Args: value(int, float): Value to set the gauge to. tags(Dict[str, str]): Tags to set or override for this gauge. """ if not isinstance(value, (int, float)): raise TypeError(f"value must be int or float, got {type(value)}.") self.record(value, tags, _internal=True)
def __reduce__(self): deserializer = Gauge serialized_data = (self._name, self._description, self._tag_keys) return deserializer, serialized_data
__all__ = [ "Counter", "Histogram", "Gauge", ]