ray.data.read_sql#

ray.data.read_sql(sql: str, connection_factory: Callable[[], Any], *, shard_keys: list[str] | None = None, shard_hash_fn: str = 'MD5', parallelism: int = -1, ray_remote_args: Dict[str, Any] | None = None, concurrency: int | None = None, override_num_blocks: int | None = None) Dataset[source]#

Read from a database that provides a Python DB API2-compliant connector.

Note

Parallelism is supported by databases that support sharding. This means that the database needs to support all of the following operations: MOD, ABS, and CONCAT.

You can use shard_hash_fn to specify the hash function to use for sharding. The default is MD5, but other common alternatives include hash, unicode, and SHA.

If the database does not support sharding, the read operation will be executed in a single task.

Examples

For examples of reading from larger databases like MySQL and PostgreSQL, see Reading from SQL Databases.

import sqlite3

import ray

# Create a simple database
connection = sqlite3.connect("example.db")
connection.execute("CREATE TABLE movie(title, year, score)")
connection.execute(
    """
    INSERT INTO movie VALUES
        ('Monty Python and the Holy Grail', 1975, 8.2),
        ("Monty Python Live at the Hollywood Bowl", 1982, 7.9),
        ("Monty Python's Life of Brian", 1979, 8.0),
        ("Rocky II", 1979, 7.3)
    """
)
connection.commit()
connection.close()

def create_connection():
    return sqlite3.connect("example.db")

# Get all movies
ds = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
ds = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
ds = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
Parameters:
  • sql – The SQL query to execute.

  • connection_factory – A function that takes no arguments and returns a Python DB API2 Connection object.

  • shard_keys – The keys to shard the data by.

  • shard_hash_fn – The hash function string to use for sharding. Defaults to “MD5”. For other databases, common alternatives include “hash” and “SHA”. This is applied to the shard keys.

  • parallelism – This argument is deprecated. Use override_num_blocks argument.

  • ray_remote_args – kwargs passed to ray.remote() in the read tasks.

  • concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources.

  • override_num_blocks – Override the number of output blocks from all read tasks. This is used for sharding when shard_keys is provided. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.

Returns:

A Dataset containing the queried data.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.