ray.data.read_kafka#
- ray.data.read_kafka(topics: str | List[str], *, bootstrap_servers: str | List[str], trigger: Literal['once'] = 'once', start_offset: int | datetime | Literal['earliest'] = 'earliest', end_offset: int | datetime | Literal['latest'] = 'latest', kafka_auth_config: KafkaAuthConfig | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: Dict[str, Any] | None = None, override_num_blocks: int | None = None, timeout_ms: int = 10000) Dataset[source]#
Read data from Kafka topics.
This function supports bounded reads from Kafka topics, reading messages between a start and end offset. Only the “once” trigger is supported for now, which performs a single bounded read. Currently we only have one read task for each partition.
Examples
import ray # Read from a single topic with offset range ds = ray.data.read_kafka( topics="my-topic", bootstrap_servers="localhost:9092", start_offset=0, end_offset=1000, ) # Read from a topic using datetime range from datetime import datetime ds = ray.data.read_kafka( topics="my-topic", bootstrap_servers="localhost:9092", start_offset=datetime(2025, 1, 1), end_offset=datetime(2025, 1, 2), )
- Parameters:
topics – Kafka topic name(s) to read from. Can be a single topic name or a list of topic names.
bootstrap_servers – Kafka broker addresses. Can be a single string or a list of strings.
trigger – Trigger mode for reading. Only “once” is supported, which performs a single bounded read.
start_offset –
Starting position for reading. Can be:
int: Offset number
datetime: Read from the first message at or after this time. Datetimes with no timezone info are treated as UTC.
str: “earliest”
end_offset –
Ending position for reading (exclusive). Can be:
int: Offset number
datetime: Read up to (but not including) the first message at or after this time. Datetimes with no timezone info are treated as UTC.
str: “latest”
kafka_auth_config – Authentication configuration. See KafkaAuthConfig for details.
num_cpus – The number of CPUs to reserve for each parallel read worker.
num_gpus – The number of GPUs to reserve for each parallel read worker.
memory – The heap memory in bytes to reserve for each parallel read worker.
ray_remote_args – kwargs passed to
ray.remote()in the read tasks.override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.
timeout_ms – Timeout in milliseconds for every read task to poll until reaching end_offset (default 10000ms). If the read task does not reach end_offset within the timeout, it will stop polling and return the messages it has read so far.
- Returns:
offset: int64 - Message offset within partition
key: binary - Message key as raw bytes
value: binary - Message value as raw bytes
topic: string - Topic name
partition: int32 - Partition ID
timestamp: int64 - Message timestamp in milliseconds
timestamp_type: int32 - 0=CreateTime, 1=LogAppendTime
headers: map<string, binary> - Message headers (keys as strings, values as bytes)
- Return type:
A
Datasetcontaining Kafka messages with the following schema- Raises:
ValueError – If invalid parameters are provided.
ImportError – If kafka-python is not installed.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.