ray.data.read_kafka#

ray.data.read_kafka(topics: str | List[str], *, bootstrap_servers: str | List[str], trigger: Literal['once'] = 'once', start_offset: int | Literal['earliest'] = 'earliest', end_offset: int | Literal['latest'] = 'latest', kafka_auth_config: KafkaAuthConfig | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: Dict[str, Any] | None = None, override_num_blocks: int | None = None, timeout_ms: int = 10000) Dataset[source]#

Read data from Kafka topics.

This function supports bounded reads from Kafka topics, reading messages between a start and end offset. Only the “once” trigger is supported for now, which performs a single bounded read. Currently we only have one read task for each partition.

Examples

import ray

# Read from a single topic with offset range
ds = ray.data.read_kafka(
    topics="my-topic",
    bootstrap_servers="localhost:9092",
    start_offset=0,
    end_offset=1000,
)
Parameters:
  • topics – Kafka topic name(s) to read from. Can be a single topic name or a list of topic names.

  • bootstrap_servers – Kafka broker addresses. Can be a single string or a list of strings.

  • trigger – Trigger mode for reading. Only “once” is supported, which performs a single bounded read.

  • start_offset – Starting position for reading. Can be: - int: Offset number - str: “earliest”

  • end_offset – Ending position for reading (exclusive). Can be: - int: Offset number - str: “latest”

  • kafka_auth_config – Authentication configuration. See KafkaAuthConfig for details.

  • num_cpus – The number of CPUs to reserve for each parallel read worker.

  • num_gpus – The number of GPUs to reserve for each parallel read worker.

  • memory – The heap memory in bytes to reserve for each parallel read worker.

  • ray_remote_args – kwargs passed to ray.remote() in the read tasks.

  • override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.

  • timeout_ms – Timeout in milliseconds for every read task to poll until reaching end_offset (default 10000ms). If the read task does not reach end_offset within the timeout, it will stop polling and return the messages it has read so far.

Returns:

  • offset: int64 - Message offset within partition

  • key: binary - Message key as raw bytes

  • value: binary - Message value as raw bytes

  • topic: string - Topic name

  • partition: int32 - Partition ID

  • timestamp: int64 - Message timestamp in milliseconds

  • timestamp_type: int32 - 0=CreateTime, 1=LogAppendTime

  • headers: map<string, binary> - Message headers (keys as strings, values as bytes)

Return type:

A Dataset containing Kafka messages with the following schema

Raises:

PublicAPI (alpha): This API is in alpha and may change before becoming stable.