ray.data.read_kafka#

ray.data.read_kafka(topics: str | List[str], *, bootstrap_servers: str | List[str], trigger: Literal['once'] = 'once', start_offset: int | datetime | Literal['earliest'] = 'earliest', end_offset: int | datetime | Literal['latest'] = 'latest', kafka_auth_config: KafkaAuthConfig | None = None, consumer_config: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: Dict[str, Any] | None = None, override_num_blocks: int | None = None, timeout_ms: int | None = None) Dataset[source]#

Read data from Kafka topics.

This function supports bounded reads from Kafka topics, reading messages between a start and end offset. Only the “once” trigger is supported for now, which performs a single bounded read. Currently we only have one read task for each partition.

Examples

import ray

# Read from a single topic with offset range
ds = ray.data.read_kafka(
    topics="my-topic",
    bootstrap_servers="localhost:9092",
    start_offset=0,
    end_offset=1000,
)

# Read from a topic using datetime range
from datetime import datetime
ds = ray.data.read_kafka(
    topics="my-topic",
    bootstrap_servers="localhost:9092",
    start_offset=datetime(2025, 1, 1),
    end_offset=datetime(2025, 1, 2),
)
Parameters:
  • topics – Kafka topic name(s) to read from. Can be a single topic name or a list of topic names.

  • bootstrap_servers – Kafka broker addresses. Can be a single string or a list of strings.

  • trigger – Trigger mode for reading. Only “once” is supported, which performs a single bounded read.

  • start_offset

    Starting position for reading. Can be:

    • int: Offset number

    • datetime: Read from the first message at or after this time. Datetimes with no timezone info are treated as UTC.

    • str: “earliest”

  • end_offset

    Ending position for reading (exclusive). Can be:

    • int: Offset number

    • datetime: Read up to (but not including) the first message at or after this time. Datetimes with no timezone info are treated as UTC.

    • str: “latest”

  • kafka_auth_config – Authentication configuration (kafka-python style). Deprecated; prefer consumer_config with Confluent keys. Mutually exclusive with consumer_config.

  • consumer_config – Confluent/librdkafka consumer configuration dict to pass through directly to the underlying client. These options override defaults and any mapped values from kafka_auth_config. The bootstrap.servers option is derived from bootstrap_servers and cannot be overridden here. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration for more details.

  • num_cpus – The number of CPUs to reserve for each parallel read worker.

  • num_gpus – The number of GPUs to reserve for each parallel read worker.

  • memory – The heap memory in bytes to reserve for each parallel read worker.

  • ray_remote_args – kwargs passed to ray.remote() in the read tasks.

  • override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.

  • timeout_ms – Optional timeout in milliseconds for every read task to poll until reaching end_offset. If None (default), no task-level timeout is applied and each read task will poll until it reaches end_offset. If set, the read task will stop polling after the timeout and return the messages it has read so far.

Returns:

  • offset: int64 - Message offset within partition

  • key: binary - Message key as raw bytes

  • value: binary - Message value as raw bytes

  • topic: string - Topic name

  • partition: int32 - Partition ID

  • timestamp: int64 - Message timestamp in milliseconds

  • timestamp_type: int32 - 0=CreateTime, 1=LogAppendTime

  • headers: map<string, binary> - Message headers (keys as strings, values as bytes)

Return type:

A Dataset containing Kafka messages with the following schema

Raises:
  • ValueError – If invalid parameters are provided.

  • ImportError – If confluent-kafka is not installed.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.