Ray Event Exporter Infrastructure#
This document is based on Ray version 2.52.1.
Ray’s event exporting infrastructure collects events from C++ components (GCS, workers) and Python components, buffers and merges them, and exports them to external HTTP services. This document explains how events flow through the system from creation to final export.
Architecture Overview#
Ray’s event system uses a multi-stage pipeline:
C++ Components: GCS and worker processes create events implementing RayEventInterface. Raylet does not emit any Ray events, but there are no technical limitations preventing it from doing so.
Event Buffering: Events are buffered in a bounded circular buffer
Event Merging: Events with the same entity ID and type are merged before export
gRPC Export: Events are exported via gRPC to the aggregator agent
Python Aggregation: The AggregatorAgent receives and buffers events
HTTP Publishing: Events are filtered, converted to JSON, and published to external HTTP services
The following diagram shows the high-level flow:
C++ Components (GCS, workers)
↓ (Create events via RayEventInterface)
RayEventRecorder (C++)
↓ (Buffer & merge events)
↓ (gRPC export via EventAggregatorClient)
AggregatorAgent (Python)
↓ (Add to MultiConsumerEventBuffer)
RayEventPublisher
↓ (Filter & convert to JSON)
↓ (HTTP POST)
External HTTP Service
Event Types and Structure#
Ray events are structured using protobuf messages with a base RayEvent message that contains event-specific nested messages.
Event Types#
Events are categorized by type, defined in the EventType enum in events_base_event.proto:
TASK_DEFINITION_EVENT: Task definition information
TASK_LIFECYCLE_EVENT: Task state transitions (this covers both normal tasks and actor tasks)
ACTOR_TASK_DEFINITION_EVENT: Actor task definition
ACTOR_DEFINITION_EVENT: Actor definition
ACTOR_LIFECYCLE_EVENT: Actor state transitions
DRIVER_JOB_DEFINITION_EVENT: Driver job definition
DRIVER_JOB_LIFECYCLE_EVENT: Driver job state transitions
NODE_DEFINITION_EVENT: Node definition
NODE_LIFECYCLE_EVENT: Node state transitions
TASK_PROFILE_EVENT: Task profiling data
Event Structure#
The base RayEvent message contains:
event_id: Unique identifier for the event
source_type: Component that generated the event
event_type: Type of event (from EventType enum)
timestamp: When the event was created
severity: Event severity level (TRACE, DEBUG, INFO, WARNING, ERROR, FATAL)
message: Optional string message
session_name: Ray session identifier
Nested event messages: One of the event-specific messages (e.g.,
task_definition_event,actor_lifecycle_event)
Entity ID Concept#
The entity ID is a unique identifier for the entity associated with an event. It’s used for two purposes:
Association: Links execution events with definition events (e.g., task lifecycle events with task definition events)
Merging: Groups events with the same entity ID and type for merging before export
For example:
- Task events use task_id + task_attempt as the entity ID
- Actor events use actor_id as the entity ID
- Driver job events use job_id as the entity ID
Event Recording and Buffering (C++ Side)#
C++ components record events through the RayEventRecorder class, which provides thread-safe event buffering and export.
RayEventRecorder#
The RayEventRecorder is a thread-safe event recorder that:
Maintains a bounded circular buffer for events
Merges events with the same entity ID and type before export
Periodically exports events via gRPC to the aggregator agent using EventAggregatorClient
Tracks dropped events when the buffer is full
Adding Events#
Events are added to the recorder via the AddEvents() method, which accepts a vector of RayEventInterface pointers. The method:
Checks if event recording is enabled (via
enable_ray_eventconfig)Calculates if adding events would exceed the buffer size
Drops old events if necessary and records metrics for dropped events
Adds new events to the circular buffer
Buffer Management#
The recorder uses a boost::circular_buffer to store events. When the buffer is full:
Oldest events are dropped to make room for new ones
Dropped events are tracked via the
dropped_events_countermetricThe metric includes the source component name for tracking
The default buffer size is 10,000 events, but it can be configured via the
RAY_ray_event_recorder_max_queued_eventsenvironment variable
Event Export from C++ (gRPC)#
Events are exported from C++ components to the aggregator agent using gRPC. The export process is initiated by calling StartExportingEvents().
StartExportingEvents#
This method:
Checks if event recording is enabled
Verifies it hasn’t been called before (should only be called once)
Sets up a
PeriodicalRunnerto periodically callExportEvents()Uses the configured export interval (
ray_events_report_interval_ms)
ExportEvents Process#
The ExportEvents() method performs the following steps:
Check Buffer: Returns early if the buffer is empty
Group Events: Groups events by entity ID and type using a hash map
Merge Events: Events with the same key are merged using the Merge() method
Serialize: Each merged event is serialized to a
RayEventprotobuf via Serialize()Send via gRPC: Events are sent to the aggregator agent via EventAggregatorClient::AddEvents()
Clear Buffer: The buffer is cleared after successful export
Event Merging Logic#
Event merging is an optimization that reduces data size by combining related events. Events with the same entity ID and type are merged:
Definition Events: Typically don’t change when merged (e.g., actor definition)
Lifecycle Events: State transitions are appended to form a time series (e.g., task state transitions: started → running → completed)
The merging maintains the order of events while combining them into a single event with all state transitions.
Error Handling#
If the gRPC export fails:
An error is logged
The process continues (doesn’t crash)
The next export interval will attempt to send events again
Events remain in the buffer until successfully exported (or the buffer is full and old events are dropped)
Event Reception and Buffering (Python Side)#
The AggregatorAgent receives events from C++ components via a gRPC service and buffers them for publishing.
AggregatorAgent#
The AggregatorAgent is a dashboard agent module that:
Implements
EventAggregatorServiceServicerfor gRPC event receptionMaintains a
MultiConsumerEventBufferfor event storageManages
RayEventPublisherinstances for publishing to external http endpointsTracks metrics for events received, buffer and publisher operations
AddEvents gRPC Handler#
The AddEvents() method is the gRPC handler that receives events:
Checks if event processing is enabled
Iterates through events in the request
Records metrics for each received event
Adds each event to the
MultiConsumerEventBuffervia add_event()Handles errors if adding events fails
MultiConsumerEventBuffer#
The MultiConsumerEventBuffer is an asyncio-friendly buffer that:
Supports Multiple Consumers: Each consumer has an independent cursor index. RayEventPublisher and other consumers share this same buffer.
Tracks Evictions: When the buffer is full, oldest events are dropped and tracked per consumer
Bounded Buffer: Uses
dequewithmaxlento limit buffer sizeAsyncio-Safe: Uses
asyncio.Lockandasyncio.Conditionfor synchronization
Key operations:
add_event(): Adds an event to the buffer, dropping oldest if full
wait_for_batch(): Waits for a batch of events up to
max_batch_size, with timeout. The timeout only applies when there is at least one event in the buffer. If the buffer is empty,wait_for_batch()can block indefinitely.register_consumer(): Registers a new consumer with a unique name
Event Filtering#
The agent checks if events can be exposed to external services via _can_expose_event(). Only events whose type is in the EXPOSABLE_EVENT_TYPES set are allowed to be published externally.
Event Publishing to HTTP#
Events are published to external HTTP services by the RayEventPublisher, which reads from the event buffer and sends HTTP POST requests.
RayEventPublisher#
The RayEventPublisher runs a worker loop that:
Registers as a consumer of the
MultiConsumerEventBufferContinuously waits for batches of events via
wait_for_batch()Publishes batches using the configured
PublisherClientInterfaceHandles retries with exponential backoff on failures
Records metrics for publish success, failures, and latency
The publisher runs in an async context and uses asyncio for non-blocking operations.
AsyncHttpPublisherClient#
The AsyncHttpPublisherClient handles HTTP publishing:
Event Filtering: Filters events using
events_filter_fn(typically_can_expose_event)JSON Conversion: Converts protobuf events to JSON dictionaries - Uses
message_to_json()from protobuf - Optionally preserves proto field names or converts to camelCase - Runs inThreadPoolExecutorto avoid blocking the event loopHTTP POST: Sends filtered events as JSON to the configured endpoint
Error Handling: Catches exceptions and returns failure status
Session Management: Uses
aiohttp.ClientSessionfor HTTP requests
Batch Publishing#
Events are published in batches:
Batch size is limited by
max_batch_size(default: 10,000 events)Batches are created by
wait_for_batch()which waits up to a timeout for eventsLarger batches reduce HTTP request overhead but increase latency
Retry Logic#
The publisher implements retry logic with exponential backoff:
Retries failed publishes up to
max_retriestimes (default: infinite)Uses exponential backoff with jitter between retries
If max retries are exhausted, we drop the events and record a metric for dropped events
Configuration#
HTTP publishing is configured via environment variables:
RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR: HTTP endpoint URL (e.g.,
http://localhost:8080/events)RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES: Comma-separated list of event types to expose
RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SERVICE: Enable/disable flag (default: True)
Creating New Event Types#
To create a new event type, follow these steps:
Step 1: Define Protobuf Message#
Create a new .proto file in src/ray/protobuf/public/ following the naming convention events_<name>_event.proto. For example, see events_task_definition_event.proto.
Define your event-specific message with the fields you need:
syntax = "proto3";
package ray.rpc.events;
message MyNewEvent {
// Define your event-specific fields here
string entity_id = 1;
// ... other fields
}
Step 2: Add to Base Event#
Update events_base_event.proto:
Add import for your new proto file
Add new
EventTypeenum value (e.g.,MY_NEW_EVENT = 11)Add new field to
RayEventmessage (e.g.,MyNewEvent my_new_event = 18)
Step 3: Implement RayEventInterface#
Create a C++ class that implements RayEventInterface. The easiest approach is to extend RayEvent<T> template class, as shown in ray_actor_definition_event.h.
You need to implement:
GetEntityId(): Return a unique identifier for the entity (e.g., task ID + attempt, actor ID)
MergeData(): Implement merging logic for events with the same entity ID - Definition events typically don’t change when merged - Lifecycle events append state transitions
SerializeData(): Convert the event data to a
RayEventprotobufGetEventType(): Return the
EventTypeenum value for this event
See ray_actor_definition_event.cc for a complete example.
Step 4: Update Exposable Event Types (if needed)#
If your event should be exposed to external HTTP services, add it to DEFAULT_EXPOSABLE_EVENT_TYPES in aggregator_agent.py. Alternatively, users can configure it via the RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES environment variable.
Step 5: Update RayEventRecorder to publish your new event type#
Use RayEventRecorder::AddEvent() to add your new event type to the buffer.
Step 6: Update AggregatorAgent to publish your new event type#
Update AggregatorAgent to publish your new event type.