Source code for ray._private.ray_logging.logging_config

from abc import ABC, abstractmethod
from typing import Set

from ray._private.ray_logging import default_impl
from ray._private.ray_logging.formatters import TextFormatter
from ray._private.ray_logging.filters import CoreContextFilter
from ray.util.annotations import PublicAPI

from dataclasses import dataclass

import logging
import time


class LoggingConfigurator(ABC):
    @abstractmethod
    def get_supported_encodings(self) -> Set[str]:
        raise NotImplementedError

    @abstractmethod
    def configure_logging(self, encoding: str, log_level: str):
        raise NotImplementedError


class DefaultLoggingConfigurator(LoggingConfigurator):
    def __init__(self):
        self._encoding_to_formatter = {
            "TEXT": TextFormatter(),
        }

    def get_supported_encodings(self) -> Set[str]:
        return self._encoding_to_formatter.keys()

    def configure_logging(self, encoding: str, log_level: str):
        formatter = self._encoding_to_formatter[encoding]
        core_context_filter = CoreContextFilter()
        handler = logging.StreamHandler()
        handler.setLevel(log_level)
        handler.setFormatter(formatter)
        handler.addFilter(core_context_filter)

        root_logger = logging.getLogger()
        root_logger.setLevel(log_level)
        root_logger.addHandler(handler)

        ray_logger = logging.getLogger("ray")
        ray_logger.setLevel(log_level)
        # Remove all existing handlers added by `ray/__init__.py`.
        for h in ray_logger.handlers[:]:
            ray_logger.removeHandler(h)
        ray_logger.addHandler(handler)
        ray_logger.propagate = False


_logging_configurator: LoggingConfigurator = default_impl.get_logging_configurator()


[docs] @PublicAPI(stability="alpha") @dataclass class LoggingConfig: encoding: str = "TEXT" log_level: str = "INFO" def __post_init__(self): if self.encoding not in _logging_configurator.get_supported_encodings(): raise ValueError( f"Invalid encoding type: {self.encoding}. " "Valid encoding types are: " f"{list(_logging_configurator.get_supported_encodings())}" ) def _configure_logging(self): """Set up the logging configuration for the current process.""" _logging_configurator.configure_logging(self.encoding, self.log_level) def _setup_log_record_factory(self): old_factory = logging.getLogRecordFactory() def record_factory(*args, **kwargs): record = old_factory(*args, **kwargs) # Python logging module starts to use `time.time_ns()` to generate `created` # from Python 3.13 to avoid the precision loss caused by the float type. # Here, we generate the `created` for the LogRecord to support older Python # versions. ct = time.time_ns() record.created = ct / 1e9 from ray._private.ray_logging.constants import LogKey record.__dict__[LogKey.TIMESTAMP_NS.value] = ct return record logging.setLogRecordFactory(record_factory) def _apply(self): """Set up both the LogRecord factory and the logging configuration.""" self._setup_log_record_factory() self._configure_logging()
LoggingConfig.__doc__ = f""" Logging configuration for a Ray job. These configurations are used to set up the root logger of the driver process and all Ray tasks and actor processes that belong to the job. Examples: .. testcode:: import ray import logging ray.init( logging_config=ray.LoggingConfig(encoding="TEXT", log_level="INFO") ) @ray.remote def f(): logger = logging.getLogger(__name__) logger.info("This is a Ray task") ray.get(f.remote()) .. testoutput:: :options: +MOCK 2024-06-03 07:53:50,815 INFO test.py:11 -- This is a Ray task job_id=01000000 worker_id=0dbbbd0f17d5343bbeee8228fa5ff675fe442445a1bc06ec899120a8 node_id=577706f1040ea8ebd76f7cf5a32338d79fe442e01455b9e7110cddfc task_id=c8ef45ccd0112571ffffffffffffffffffffffff01000000 Args: encoding: Encoding type for the logs. The valid values are {list(_logging_configurator.get_supported_encodings())} log_level: Log level for the logs. Defaults to 'INFO'. You can set it to 'DEBUG' to receive more detailed debug logs. """ # noqa: E501