Loading Data#

Ray Data loads data from various sources. This guide shows you how to:

Reading files#

Ray Data reads files from local disk or cloud storage in a variety of file formats. To view the full list of supported file formats, see the Input/Output reference.

To read Parquet files, call read_parquet().

import ray

ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

To read raw images, call read_images(). Ray Data represents images as NumPy ndarrays.

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages/")

print(ds.schema())
Column  Type
------  ----
image   numpy.ndarray(shape=(32, 32, 3), dtype=uint8)

To read lines of text, call read_text().

import ray

ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")

print(ds.schema())
Column  Type
------  ----
text    string

To read CSV files, call read_csv().

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

print(ds.schema())
Column             Type
------             ----
sepal length (cm)  double
sepal width (cm)   double
petal length (cm)  double
petal width (cm)   double
target             int64

To read raw binary files, call read_binary_files().

import ray

ds = ray.data.read_binary_files("s3://anonymous@ray-example-data/documents")

print(ds.schema())
Column  Type
------  ----
bytes   binary

To read TFRecords files, call read_tfrecords().

import ray

ds = ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")

print(ds.schema())
Column        Type
------        ----
label         binary
petal.length  float
sepal.width   float
petal.width   float
sepal.length  float

Reading files from local disk#

To read files from local disk, call a function like read_parquet() and specify paths with the local:// schema. Paths can point to files or directories.

To read formats other than Parquet, see the Input/Output reference.

Tip

If your files are accessible on every node, exclude local:// to parallelize the read tasks across the cluster.

import ray

ds = ray.data.read_parquet("local:///tmp/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

Reading files from cloud storage#

To read files in cloud storage, authenticate all nodes with your cloud service provider. Then, call a method like read_parquet() and specify URIs with the appropriate schema. URIs can point to buckets, folders, or objects.

To read formats other than Parquet, see the Input/Output reference.

To read files from Amazon S3, specify URIs with the s3:// scheme.

import ray

ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

Ray Data relies on PyArrow for authentication with Amazon S3. For more on how to configure your credentials to be compatible with PyArrow, see their S3 Filesytem docs.

To read files from Google Cloud Storage, install the Filesystem interface to Google Cloud Storage

pip install gcsfs

Then, create a GCSFileSystem and specify URIs with the gcs:// scheme.

import ray

filesystem = gcsfs.GCSFileSystem(project="my-google-project")
ds = ray.data.read_parquet(
    "gcs://anonymous@ray-example-data/iris.parquet",
    filesystem=filesystem
)

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

Ray Data relies on PyArrow for authentication with Google Cloud Storage. For more on how to configure your credentials to be compatible with PyArrow, see their GCS Filesytem docs.

To read files from Azure Blob Storage, install the Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage

pip install adlfs

Then, create a AzureBlobFileSystem and specify URIs with the az:// scheme.

import adlfs
import ray

ds = ray.data.read_parquet(
    "az://ray-example-data/iris.parquet",
    adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

Ray Data relies on PyArrow for authentication with Azure Blob Storage. For more on how to configure your credentials to be compatible with PyArrow, see their fsspec-compatible filesystems docs.

Reading files from NFS#

To read files from NFS filesystems, call a function like read_parquet() and specify files on the mounted filesystem. Paths can point to files or directories.

To read formats other than Parquet, see the Input/Output reference.

import ray

ds = ray.data.read_parquet("/mnt/cluster_storage/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

Handling compressed files#

To read a compressed file, specify compression in arrow_open_stream_args. You can use any codec supported by Arrow.

import ray

ds = ray.data.read_csv(
    "s3://anonymous@ray-example-data/iris.csv.gz",
    arrow_open_stream_args={"compression": "gzip"},
)

Loading data from other libraries#

Loading data from single-node data libraries#

Ray Data interoperates with libraries like pandas, NumPy, and Arrow.

To create a Dataset from Python objects, call from_items() and pass in a list of Dict. Ray Data treats each Dict as a row.

import ray

ds = ray.data.from_items([
    {"food": "spam", "price": 9.34},
    {"food": "ham", "price": 5.37},
    {"food": "eggs", "price": 0.94}
])

print(ds)
MaterializedDataset(
   num_blocks=3,
   num_rows=3,
   schema={food: string, price: double}
)

You can also create a Dataset from a list of regular Python objects.

import ray

ds = ray.data.from_items([1, 2, 3, 4, 5])

print(ds)
MaterializedDataset(num_blocks=5, num_rows=5, schema={item: int64})

To create a Dataset from a NumPy array, call from_numpy(). Ray Data treats the outer axis as the row dimension.

import numpy as np
import ray

array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)

print(ds)
MaterializedDataset(
   num_blocks=1,
   num_rows=3,
   schema={data: numpy.ndarray(shape=(2, 2), dtype=double)}
)

To create a Dataset from a pandas DataFrame, call from_pandas().

import pandas as pd
import ray

df = pd.DataFrame({
    "food": ["spam", "ham", "eggs"],
    "price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)

print(ds)
MaterializedDataset(
   num_blocks=1,
   num_rows=3,
   schema={food: object, price: float64}
)

To create a Dataset from an Arrow table, call from_arrow().

import pyarrow as pa

table = pa.table({
    "food": ["spam", "ham", "eggs"],
    "price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_arrow(table)

print(ds)
MaterializedDataset(
   num_blocks=1,
   num_rows=3,
   schema={food: string, price: double}
)

Loading data from distributed DataFrame libraries#

Ray Data interoperates with distributed data processing frameworks like Dask, Spark, Modin, and Mars.

Note

The Ray Community provides these operations but may not actively maintain them. If you run into issues, create a GitHub issue here.

To create a Dataset from a Dask DataFrame, call from_dask(). This function constructs a Dataset backed by the distributed Pandas DataFrame partitions that underly the Dask DataFrame.

import dask.dataframe as dd
import pandas as pd
import ray

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ddf = dd.from_pandas(df, npartitions=4)
# Create a Dataset from a Dask DataFrame.
ds = ray.data.from_dask(ddf)

ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

To create a Dataset from a Spark DataFrame, call from_spark(). This function creates a Dataset backed by the distributed Spark DataFrame partitions that underly the Spark DataFrame.

import ray
import raydp

spark = raydp.init_spark(app_name="Spark -> Datasets Example",
                        num_executors=2,
                        executor_cores=2,
                        executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
ds = ray.data.from_spark(df)

ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

To create a Dataset from an Iceberg Table, call read_iceberg(). This function creates a Dataset backed by the distributed files that underlie the Iceberg table.

>>> import ray
>>> from pyiceberg.expressions import EqualTo
>>> ds = ray.data.read_iceberg(
...     table_identifier="db_name.table_name",
...     row_filter=EqualTo("column_name", "literal_value"),
...     catalog_kwargs={"name": "default", "type": "glue"}
... )
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

To create a Dataset from a Modin DataFrame, call from_modin(). This function constructs a Dataset backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame.

import modin.pandas as md
import pandas as pd
import ray

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df)
# Create a Dataset from a Modin DataFrame.
ds = ray.data.from_modin(mdf)

ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

To create a Dataset from a Mars DataFrame, call from_mars(). This function constructs a Dataset backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame.

import mars
import mars.dataframe as md
import pandas as pd
import ray

cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df, num_partitions=8)
# Create a tabular Dataset from a Mars DataFrame.
ds = ray.data.from_mars(mdf)

ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

Loading data from ML libraries#

Ray Data interoperates with HuggingFace, PyTorch, and TensorFlow datasets.

To convert a HuggingFace Dataset to a Ray Datasets, call from_huggingface(). This function accesses the underlying Arrow table and converts it to a Dataset directly.

Warning

from_huggingface only supports parallel reads in certain instances, namely for untransformed public HuggingFace Datasets. For those datasets, Ray Data uses hosted parquet files to perform a distributed read; otherwise, Ray Data uses a single node read. This behavior shouldn’t be an issue with in-memory HuggingFace Datasets, but may cause a failure with large memory-mapped HuggingFace Datasets. Additionally, HuggingFace DatasetDict and IterableDatasetDict objects aren’t supported.

import ray.data
from datasets import load_dataset

hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)
[{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]

To convert a PyTorch dataset to a Ray Dataset, call from_torch().

import ray
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor

tds = datasets.CIFAR10(root="data", train=True, download=True, transform=ToTensor())
ds = ray.data.from_torch(tds)

print(ds)
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz
100%|███████████████████████| 170498071/170498071 [00:07<00:00, 23494838.54it/s]
Extracting data/cifar-10-python.tar.gz to data
Dataset(num_rows=50000, schema={item: object})

To convert a TensorFlow dataset to a Ray Dataset, call from_tf().

Warning

from_tf doesn’t support parallel reads. Only use this function with small datasets like MNIST or CIFAR.

import ray
import tensorflow_datasets as tfds

tf_ds, _ = tfds.load("cifar10", split=["train", "test"])
ds = ray.data.from_tf(tf_ds)

print(ds)
MaterializedDataset(
   num_blocks=...,
   num_rows=50000,
   schema={
      id: binary,
      image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8),
      label: int64
   }
)

Reading databases#

Ray Data reads from databases like MySQL, PostgreSQL, MongoDB, and BigQuery.

Reading SQL databases#

Call read_sql() to read data from a database that provides a Python DB API2-compliant connector.

To read from MySQL, install MySQL Connector/Python. It’s the first-party MySQL database connector.

pip install mysql-connector-python

Then, define your connection logic and query the database.

import mysql.connector

import ray

def create_connection():
    return mysql.connector.connect(
        user="admin",
        password=...,
        host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
        connection_timeout=30,
        database="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from PostgreSQL, install Psycopg 2. It’s the most popular PostgreSQL database connector.

pip install psycopg2-binary

Then, define your connection logic and query the database.

import psycopg2

import ray

def create_connection():
    return psycopg2.connect(
        user="postgres",
        password=...,
        host="example-postgres-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
        dbname="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from Snowflake, install the Snowflake Connector for Python.

pip install snowflake-connector-python

Then, define your connection logic and query the database.

import snowflake.connector

import ray

def create_connection():
    return snowflake.connector.connect(
        user=...,
        password=...
        account="ZZKXUVH-IPB52023",
        database="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from Databricks, set the DATABRICKS_TOKEN environment variable to your Databricks warehouse access token.

export DATABRICKS_TOKEN=...

If you’re not running your program on the Databricks runtime, also set the DATABRICKS_HOST environment variable.

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Then, call ray.data.read_databricks_tables() to read from the Databricks SQL warehouse.

import ray

dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

To read from BigQuery, install the Python Client for Google BigQuery and the Python Client for Google BigQueryStorage.

pip install google-cloud-bigquery
pip install google-cloud-bigquery-storage

To read data from BigQuery, call read_bigquery() and specify the project id, dataset, and query (if applicable).

import ray

# Read the entire dataset. Do not specify query.
ds = ray.data.read_bigquery(
    project_id="my_gcloud_project_id",
    dataset="bigquery-public-data.ml_datasets.iris",
)

# Read from a SQL query of the dataset. Do not specify dataset.
ds = ray.data.read_bigquery(
    project_id="my_gcloud_project_id",
    query = "SELECT * FROM `bigquery-public-data.ml_datasets.iris` LIMIT 50",
)

# Write back to BigQuery
ds.write_bigquery(
    project_id="my_gcloud_project_id",
    dataset="destination_dataset.destination_table",
    overwrite_table=True,
)

Reading MongoDB#

To read data from MongoDB, call read_mongo() and specify the source URI, database, and collection. You also need to specify a pipeline to run against the collection.

import ray

# Read a local MongoDB.
ds = ray.data.read_mongo(
    uri="mongodb://localhost:27017",
    database="my_db",
    collection="my_collection",
    pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)

# Reading a remote MongoDB is the same.
ds = ray.data.read_mongo(
    uri="mongodb://username:[email protected]:27017/?authSource=admin",
    database="my_db",
    collection="my_collection",
    pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)

# Write back to MongoDB.
ds.write_mongo(
    MongoDatasource(),
    uri="mongodb://username:[email protected]:27017/?authSource=admin",
    database="my_db",
    collection="my_collection",
)

Creating synthetic data#

Synthetic datasets can be useful for testing and benchmarking.

To create a synthetic Dataset from a range of integers, call range(). Ray Data stores the integer range in a single column.

import ray

ds = ray.data.range(10000)

print(ds.schema())
Column  Type
------  ----
id      int64

To create a synthetic Dataset containing arrays, call range_tensor(). Ray Data packs an integer range into ndarrays of the provided shape.

import ray

ds = ray.data.range_tensor(10, shape=(64, 64))

print(ds.schema())
Column  Type
------  ----
data    numpy.ndarray(shape=(64, 64), dtype=int64)

Loading other datasources#

If Ray Data can’t load your data, subclass Datasource. Then, construct an instance of your custom datasource and pass it to read_datasource(). To write results, you might also need to subclass ray.data.Datasink. Then, create an instance of your custom datasink and pass it to write_datasink(). For more details, see Advanced: Read and Write Custom File Types.

# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)

# Write to a custom datasink.
ds.write_datasink(YourCustomDatasink())

Performance considerations#

By default, the number of output blocks from all read tasks is dynamically decided based on input data size and available resources. It should work well in most cases. However, you can also override the default value by setting the override_num_blocks argument. Ray Data decides internally how many read tasks to run concurrently to best utilize the cluster, ranging from 1...override_num_blocks tasks. In other words, the higher the override_num_blocks, the smaller the data blocks in the Dataset and hence more opportunities for parallel execution.

For more information on how to tune the number of output blocks and other suggestions for optimizing read performance, see Optimizing reads.