ray.data.Dataset.write_kafka#
- Dataset.write_kafka(topic: str, bootstrap_servers: str, key_field: str | None = None, key_serializer: str = 'string', value_serializer: str = 'json', producer_config: Dict[str, Any] | None = None, *, ray_remote_args: Dict[str, Any] | None = None, concurrency: int | None = None) None[source]#
Convenience method to write Ray Dataset to Kafka.
Note
This operation will trigger execution of the lazy transformations performed on this dataset.
Examples
import ray ds = ray.data.range(100) ds.write_kafka("my-topic", "localhost:9092")
- Parameters:
topic – Kafka topic name
bootstrap_servers – Comma-separated Kafka broker addresses
key_field – Optional field name to use as message key
key_serializer – Key serialization format (‘json’, ‘string’, or ‘bytes’)
value_serializer – Value serialization format (‘json’, ‘string’, or ‘bytes’)
producer_config – Additional Kafka producer configuration (confluent-kafka/librdkafka format)
ray_remote_args – Kwargs passed to
ray.remote()in the write tasks.concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.