ray.data.read_kafka#
- ray.data.read_kafka(topics: str | List[str], *, bootstrap_servers: str | List[str], trigger: Literal['once'] = 'once', start_offset: int | datetime | Literal['earliest'] = 'earliest', end_offset: int | datetime | Literal['latest'] = 'latest', kafka_auth_config: KafkaAuthConfig | None = None, consumer_config: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: Dict[str, Any] | None = None, override_num_blocks: int | None = None, timeout_ms: int | None = None) Dataset[source]#
Read data from Kafka topics.
This function supports bounded reads from Kafka topics, reading messages between a start and end offset. Only the “once” trigger is supported for now, which performs a single bounded read. Currently we only have one read task for each partition.
Examples
import ray # Read from a single topic with offset range ds = ray.data.read_kafka( topics="my-topic", bootstrap_servers="localhost:9092", start_offset=0, end_offset=1000, ) # Read from a topic using datetime range from datetime import datetime ds = ray.data.read_kafka( topics="my-topic", bootstrap_servers="localhost:9092", start_offset=datetime(2025, 1, 1), end_offset=datetime(2025, 1, 2), )
- Parameters:
topics – Kafka topic name(s) to read from. Can be a single topic name or a list of topic names.
bootstrap_servers – Kafka broker addresses. Can be a single string or a list of strings.
trigger – Trigger mode for reading. Only “once” is supported, which performs a single bounded read.
start_offset –
Starting position for reading. Can be:
int: Offset number
datetime: Read from the first message at or after this time. Datetimes with no timezone info are treated as UTC.
str: “earliest”
end_offset –
Ending position for reading (exclusive). Can be:
int: Offset number
datetime: Read up to (but not including) the first message at or after this time. Datetimes with no timezone info are treated as UTC.
str: “latest”
kafka_auth_config – Authentication configuration (kafka-python style). Deprecated; prefer consumer_config with Confluent keys. Mutually exclusive with consumer_config.
consumer_config – Confluent/librdkafka consumer configuration dict to pass through directly to the underlying client. These options override defaults and any mapped values from
kafka_auth_config. Thebootstrap.serversoption is derived frombootstrap_serversand cannot be overridden here. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration for more details.num_cpus – The number of CPUs to reserve for each parallel read worker.
num_gpus – The number of GPUs to reserve for each parallel read worker.
memory – The heap memory in bytes to reserve for each parallel read worker.
ray_remote_args – kwargs passed to
ray.remote()in the read tasks.override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.
timeout_ms – Optional timeout in milliseconds for every read task to poll until reaching end_offset. If None (default), no task-level timeout is applied and each read task will poll until it reaches end_offset. If set, the read task will stop polling after the timeout and return the messages it has read so far.
- Returns:
offset: int64 - Message offset within partition
key: binary - Message key as raw bytes
value: binary - Message value as raw bytes
topic: string - Topic name
partition: int32 - Partition ID
timestamp: int64 - Message timestamp in milliseconds
timestamp_type: int32 - 0=CreateTime, 1=LogAppendTime
headers: map<string, binary> - Message headers (keys as strings, values as bytes)
- Return type:
A
Datasetcontaining Kafka messages with the following schema- Raises:
ValueError – If invalid parameters are provided.
ImportError – If confluent-kafka is not installed.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.