Creating Datasets#

Ray Datasets can be created from:

  • generated synthetic data,

  • local and distributed in-memory data, and

  • local and external storage systems (local disk, cloud storage, HDFS, etc.).

This guide surveys the many ways to create a Dataset. If none of these meet your needs, please reach out to us on Discourse or open a feature request on the Ray GitHub repo, and check out our guide for implementing a custom Datasets datasource if you’re interested in rolling your own integration!

Generating Synthetic Data#

Create a Dataset from a range of integers.

# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

ds.take(5)
# -> [0, 1, 2, 3, 4]

Create an Arrow (tabular) Dataset from a range of integers, with a single column containing this integer range.

# Create a Dataset of Arrow records.
ds = ray.data.range_table(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema={value: int64})

ds.take(5)
# -> [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}, {'value': 4}]

Create a tensor dataset from a range of integers, packing this integer range into tensors of the provided shape.

# Create a Dataset of tensors.
ds = ray.data.range_tensor(100 * 64 * 64, shape=(64, 64))
# -> Dataset(
#       num_blocks=200,
#       num_rows=409600,
#       schema={value: <ArrowTensorType: shape=(64, 64), dtype=int64>}
#    )

ds.take(2)
# -> [array([[0, 0, 0, ..., 0, 0, 0],
#         [0, 0, 0, ..., 0, 0, 0],
#         [0, 0, 0, ..., 0, 0, 0],
#         ...,
#         [0, 0, 0, ..., 0, 0, 0],
#         [0, 0, 0, ..., 0, 0, 0],
#         [0, 0, 0, ..., 0, 0, 0]]),
#  array([[1, 1, 1, ..., 1, 1, 1],
#         [1, 1, 1, ..., 1, 1, 1],
#         [1, 1, 1, ..., 1, 1, 1],
#         ...,
#         [1, 1, 1, ..., 1, 1, 1],
#         [1, 1, 1, ..., 1, 1, 1],
#         [1, 1, 1, ..., 1, 1, 1]])]

Reading Files From Storage#

Using the ray.data.read_*() APIs, Datasets can be created from files on local disk or remote storage system such as S3, GCS, Azure Blob Storage, or HDFS. Any filesystem supported by pyarrow can be used to specify file locations, and many common file formats are supported: Parquet, CSV, JSON, NPY, text, binary.

Each of these APIs take a path or list of paths to files or directories. Any directories provided will be walked in order to obtain concrete file paths, at which point all files will be read in parallel.

Supported File Formats#

Read Parquet files into a tabular Dataset. The Parquet data will be read into Arrow Table blocks. Although this simple example demonstrates reading a single file, note that Datasets can also read directories of Parquet files. We also support reading partitioned Parquet datasets with partition column values pulled from the file paths.

# Create a tabular Dataset by reading a Parquet file.
ds = ray.data.read_parquet("example://iris.parquet")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        }
#    )

ds.show(2)
# -> {
#     'sepal.length': 5.1,
#     'sepal.width': 3.5,
#     'petal.length': 1.4,
#     'petal.width': 0.2,
#     'variety': 'Setosa',
# }
# -> {
#     'sepal.length': 4.9,
#     'sepal.width': 3.0,
#     'petal.length': 1.4,
#     'petal.width': 0.2,
#     'variety': 'Setosa',
# }

Datasets’ Parquet reader also supports projection and filter pushdown, allowing column selection and row filtering to be pushed down to the file scan. For column selection, unselected columns will never be read from the file.

import pyarrow as pa

# Create a tabular Dataset by reading a Parquet file, pushing column selection and row
# filtering down to the file scan.
ds = ray.data.read_parquet(
    "example://iris.parquet",
    columns=["sepal.length", "variety"],
    filter=pa.dataset.field("sepal.length") > 5.0,
).materialize()  # Force a full read of the file.
# -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string})

ds.show(2)
# -> {'sepal.length': 5.1, 'variety': 'Setosa'}
#    {'sepal.length': 5.4, 'variety': 'Setosa'}

See the API docs for read_parquet().

Read CSV files into a tabular Dataset. The CSV data will be read into Arrow Table blocks. Although this simple example demonstrates reading a single file, note that Datasets can also read directories of CSV files, with one tabular block created per file.

# Create a tabular Dataset by reading a CSV file.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        }
#    )

ds.show(2)
# -> {
#     'sepal.length': 5.1,
#     'sepal.width': 3.5,
#     'petal.length': 1.4,
#     'petal.width': 0.2,
#     'variety': 'Setosa',
# }
# -> {
#     'sepal.length': 4.9,
#     'sepal.width': 3.0,
#     'petal.length': 1.4,
#     'petal.width': 0.2,
#     'variety': 'Setosa',
# }

See the API docs for read_csv().

Read JSON files into a tabular Dataset. The JSON data will be read into Arrow Table blocks. Although this simple example demonstrates reading a single file, note that Datasets can also read directories of JSON files, with one tabular block created per file.

Currently, only newline-delimited JSON (NDJSON) is supported.

# Create a tabular Dataset by reading a JSON file.
ds = ray.data.read_json("example://iris.json")
# -> Dataset(
#        num_blocks=1,
#        num_rows=150,
#        schema={
#            sepal.length: double,
#            sepal.width: double,
#            petal.length: double,
#            petal.width: double,
#            variety: string,
#        }
#    )

ds.show(2)
# -> {
#     'sepal.length': 5.1,
#     'sepal.width': 3.5,
#     'petal.length': 1.4,
#     'petal.width': 0.2,
#     'variety': 'Setosa',
# }
# -> {
#     'sepal.length': 4.9,
#     'sepal.width': 3.0,
#     'petal.length': 1.4,
#     'petal.width': 0.2,
#     'variety': 'Setosa',
# }

See the API docs for read_json().

Read NumPy files into a tensor Dataset. The NumPy ndarray data will be read into single-column Arrow Table blocks using our tensor extension type, treating the outermost ndarray dimension as the row dimension. See our tensor data guide for more information on working with tensors in Datasets. Although this simple example demonstrates reading a single file, note that Datasets can also read directories of NumPy files, with one tensor block created per file.

# Create a tensor Dataset by reading a NumPy file.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(
#       num_blocks=1,
#       num_rows=3,
#       schema={__RAY_TC__: <ArrowTensorType: shape=(28, 28), dtype=uint8>},
#   )

ds.show(2)
# [array([[0, ...]]), array([[0, ...]])]

See the API docs for read_numpy().

Read text files into a Dataset. Each line in each text file will be treated as a row in the dataset, resulting in a list-of-strings block being created for each text file.

# Create a tabular Dataset by reading a text file.
ds = ray.data.read_text("example://sms_spam_collection_subset.txt")
# -> Dataset(num_blocks=1, num_rows=10, schema=<class 'str'>)

ds.show(3)
# -> ham     Go until jurong point, crazy.. Available only in bugis n great world la e
#            buffet... Cine there got amore wat...
#    ham     Ok lar... Joking wif u oni...
#    spam    Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA
#            to 87121 to receive entry question(std txt rate)T&C's apply
#            08452810075over18's

See the API docs for read_text().

Call read_images() to read images into a Dataset.

This function stores image data in single-column Arrow Table blocks using the tensor extension type. For more information on working with tensors in Datasets, read the tensor data guide.

ds = ray.data.read_images("example://image-datasets/simple")
# -> Dataset(num_blocks=3, num_rows=3, 
#            schema={image: ArrowTensorType(shape=(32, 32, 3), dtype=uint8)})

ds.take(1)
# -> [array([[[ 88,  70,  68],
#            [103,  88,  85],
#            [112,  96,  97],
#            ...,
#            [168, 151,  81],
#            [167, 149,  83],
#            [166, 148,  82]]], dtype=uint8)]

Read binary files into a Dataset. Each binary file will be treated as a single row of opaque bytes. These bytes can be decoded into tensor, tabular, text, or any other kind of data using map_batches() to apply a per-row decoding user-defined function.

Although this simple example demonstrates reading a single file, note that Datasets can also read directories of binary files, with one bytes block created per file.

from io import BytesIO
import PIL.Image

# Create a tabular Dataset by reading a binary file.
ds = ray.data.read_binary_files("example://mnist_subset_partitioned/0/1.png")
# -> Dataset(num_blocks=1, num_rows=1, schema=<class 'bytes'>)

ds = ds.map(lambda bytes_: np.asarray(PIL.Image.open(BytesIO(bytes_)).convert("L")))
# -> Dataset(
#        num_blocks=1,
#        num_rows=1,
#        schema={__RAY_TC__: <ArrowTensorType: shape=(28, 28), dtype=uint8>},
#    )

ds.show(3)
# -> ham     Go until jurong point, crazy.. Available only in bugis n great world la e
#            buffet... Cine there got amore wat...
#    ham     Ok lar... Joking wif u oni...
#    spam    Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA
#            to 87121 to receive entry question(std txt rate)T&C's apply
#            08452810075over18's

See the API docs for read_binary_files().

Call read_tfrecords() to read TFRecord files into a tabular Dataset.

Warning

Only tf.train.Example records are supported.

# Create a tabular Dataset by reading a TFRecord file.
ds = ray.data.read_tfrecords("example://iris.tfrecords")
# Dataset(
#     num_blocks=1,
#     num_rows=150,
#     schema={
#         sepal.length: float64,
#         sepal.width: float64,
#         petal.length: float64,
#         petal.width: float64,
#         label: object,
#     },
# )
ds.show(1)
# {
#     "sepal.length": 5.099999904632568,
#     "sepal.width": 3.5,
#     "petal.length": 1.399999976158142,
#     "petal.width": 0.20000000298023224,
#     "label": b"Setosa",
# }

Reading from Remote Storage#

All of the file formats mentioned above can be read from remote storage, such as S3, GCS, Azure Blob Storage, and HDFS. These storage systems are supported via Arrow’s filesystem APIs natively for S3 and HDFS, and as a wrapper around fsspec for GCS and HDFS. All ray.data.read_*() APIs expose a filesystem argument that accepts both Arrow FileSystem instances and fsspec FileSystem instances, allowing you to configure this connection to the remote storage system, such as authn/authz and buffer/block size.

For S3 and HDFS, the underlying FileSystem implementation will be inferred from the URL scheme ("s3://" and "hdfs://"); if the default connection configuration suffices for your workload, you won’t need to specify a filesystem argument.

We use Parquet files for the below examples, but all of the aforementioned file formats are supported for each of these storage systems.

The AWS S3 storage system is inferred from the URI scheme (s3://), with required connection configuration such as S3 credentials being pulled from the machine’s environment (e.g. the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables).

# Create a tabular Dataset by reading a Parquet file from S3.
ds = ray.data.read_parquet("s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/01/data.parquet")
# -> Dataset(
#        num_blocks=1,
#        num_rows=7667792,
#        schema={
#            vendor_id: string,
#            pickup_at: timestamp[us],
#            dropoff_at: timestamp[us],
#            passenger_count: int8,
#            trip_distance: float,
#            rate_code_id: string,
#            store_and_fwd_flag: string,
#            ...,
#        },
#    )

ds.show(2)
# -> {
#        'vendor_id': '1',
#        'pickup_at': datetime.datetime(2019, 1, 1, 0, 46, 40),
#        'dropoff_at': datetime.datetime(2019, 1, 1, 0, 53, 20),
#        'passenger_count': 1,
#        'trip_distance': 1.5,
#        'rate_code_id': '1',
#        'store_and_fwd_flag': 'N', 
#        ...,
#    }
#    {
#        'vendor_id': '1',
#        'pickup_at': datetime.datetime(2019, 1, 1, 0, 59, 47)
#        'dropoff_at': datetime.datetime(2019, 1, 1, 1, 18, 59),
#        'passenger_count': 1,
#        'trip_distance': 2.5999999046325684,
#        'rate_code_id': '1',
#        'store_and_fwd_flag': 'N', 
#        ...,
#    }

If needing to customize this S3 storage system connection (credentials, region, endpoint override, etc.), you can pass in an S3FileSystem instance to read_parquet().

import pyarrow as pa

# Create a tabular Dataset by reading a Parquet file from a private S3 bucket.
# NOTE: This example is not runnable as-is; add in a path to your private bucket and the
# required S3 credentials!
ds = ray.data.read_parquet(
    "s3://some/private/bucket",
    filesystem=pa.fs.S3FileSystem(
        region="us-west-2",
        access_key="XXXX",
        secret_key="XXXX",
    ),
)

The HDFS storage system is inferred from the URI scheme (hdfs://), with required connection configuration such as the host and the port being derived from the URI.

Note

This example is not runnable as-is; you’ll need to point it at your HDFS cluster/data.

# Create a tabular Dataset by reading a Parquet file from HDFS using HDFS connection
# automatically constructed based on the URI.
# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS
# cluster/data.
ds = ray.data.read_parquet("hdfs://<host:port>/path/to/file.parquet")

If needing to customize this HDFS storage system connection (host, port, user, kerb ticket, etc.), you can pass in an HDFSFileSystem instance to read_parquet().

import pyarrow as pa

# Create a tabular Dataset by reading a Parquet file from HDFS, manually specifying a
# configured HDFS connection via a Pyarrow HDFSFileSystem instance.
# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS
# cluster/data.
ds = ray.data.read_parquet(
    "hdfs://path/to/file.parquet",
    filesystem=pa.fs.HDFSFileSystem(host="localhost", port=9000, user="bob"),
)

Data can be read from Google Cloud Storage by providing a configured gcsfs GCSFileSystem, where the appropriate Google Cloud project and credentials can be specified.

Note

This example is not runnable as-is; you’ll need to point it at your GCS bucket and configure your GCP project and credentials.

import gcsfs

# Create a tabular Dataset by reading a Parquet file from GCS, passing the configured
# GCSFileSystem.
# NOTE: This example is not runnable as-is; you need to point it at your GCS bucket
# and configure your GCP project and credentials.
path = "gs://path/to/file.parquet"
filesystem = gcsfs.GCSFileSystem(project="my-google-project")
ds = ray.data.read_parquet(path, filesystem=filesystem)

Tip

To verify that your GCP project and credentials are set up, validate that the GCS filesystem has permissions to read the input path.

print(filesystem.ls(path))
# ['path/to/file.parquet']
print(filesystem.open(path))
# <File-like object GCSFileSystem, path/to/file.parquet>

For more examples, see the GCSFS Documentation.

Data can be read from Azure Blob Storage by providing a configured adlfs AzureBlobFileSystem, where the appropriate account name and account key can be specified.

import adlfs

# Create a tabular Dataset by reading a Parquet file from Azure Blob Storage, passing
# the configured AzureBlobFileSystem.
path = (
    "az://nyctlc/yellow/puYear=2009/puMonth=1/"
    "part-00019-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426333-4"
    ".c000.snappy.parquet"
)
ds = ray.data.read_parquet(
    path,
    filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)

Reading from Local Storage#

In Ray Datasets, users often read from remote storage systems as described above. In some use cases, users may want to read from local storage. There are three ways to read from a local filesystem:

  • Providing a local filesystem path: For example, in ray.data.read_csv("my_file.csv"), the given path will be resolved as a local filesystem path.

Note

If the file exists only on the local node and you run this read operation in distributed cluster, this will fail as it cannot access the file from remote node.

  • Using ``local://`` custom URI scheme: Similarly, this will be resolved to local filesystem, e.g. ray.data.read_csv("local://my_file.csv") will read the same file as the approach above. The difference is that this scheme will ensure all read tasks happen on the local node, so it’s safe to run in a distributed cluster.

  • Using ``example://`` custom URI scheme: The paths with this scheme will be resolved to ray/data/examples/data directory in the Ray package. This scheme is used only for testing or demoing examples.

Reading Compressed Files#

Ray Datasets supports reading compressed files using the arrow_open_stream_args arg. Codecs supported by Arrow (bz2, brotli, gzip, lz4 or zstd) are compatible with Ray Datasets. For example:

# Read a gzip-compressed CSV file from S3.
ds = ray.data.read_csv(
    "s3://anonymous@air-example-data/gzip_compressed.csv",
    arrow_open_stream_args={"compression": "gzip"},
)

From In-Memory Data#

Datasets can be constructed from existing in-memory data. In addition to being able to construct a Dataset from plain Python objects, Datasets also interoperates with popular single-node libraries (Pandas, NumPy, Arrow) as well as distributed frameworks (Dask, Spark, Modin, Mars).

From Single-Node Data Libraries#

In this section, we demonstrate creating a Dataset from single-node in-memory data.

Create a Dataset from a Pandas DataFrame. This constructs a Dataset backed by a single Pandas DataFrame block.

import pandas as pd

# Create a tabular Dataset from a Pandas DataFrame.
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ds = ray.data.from_pandas(df)
# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: object})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

We can also build a Dataset from more than one Pandas DataFrame, where each said DataFrame will become a block in the Dataset.

import pandas as pd

data = list(range(10000))
num_chunks = 10
chunk_size = len(data) // num_chunks
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
dfs = [
    pd.DataFrame({"col1": list(chunk), "col2": list(map(str, chunk))})
    for chunk in chunks
]
# Create a tabular Dataset from multiple Pandas DataFrames.
ds = ray.data.from_pandas(dfs)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

Create a Dataset from a NumPy ndarray. This constructs a Dataset backed by a single-column Arrow table block; the outer dimension of the ndarray will be treated as the row dimension, and the column will have name "__value__".

import numpy as np

# Create a tensor Dataset from a 3D NumPy ndarray.
arr = np.ones((3, 4, 4))
# The outer dimension is treated as the row dimension.
ds = ray.data.from_numpy(arr)
# -> Dataset(
#        num_blocks=1,
#        num_rows=3,
#        schema={value: <ArrowTensorType: shape=(4, 4), dtype=int64>},
#    )

ds.show(2)
# -> {'value': array([[1., 1., 1., 1.],
#        [1., 1., 1., 1.],
#        [1., 1., 1., 1.],
#        [1., 1., 1., 1.]])}
# -> {'value': array([[1., 1., 1., 1.],
#        [1., 1., 1., 1.],
#        [1., 1., 1., 1.],
#        [1., 1., 1., 1.]])}

We can also build a Dataset from more than one NumPy ndarray, where each said ndarray will become a single-column Arrow table block in the Dataset.

import numpy as np

# Create a tensor Dataset from multiple 3D NumPy ndarray.
arrs = [np.random.rand(2, 4, 4) for _ in range(4)]
# The outer dimension is treated as the row dimension.
ds = ray.data.from_numpy(arrs)
# -> Dataset(
#        num_blocks=4,
#        num_rows=8,
#        schema={value: <ArrowTensorType: shape=(4, 4), dtype=int64>},
#    )

ds.show(2)
# -> {'value': array([[0.06587483, 0.67808656, 0.76461924, 0.83428549],
#        [0.04932103, 0.25112165, 0.26476714, 0.24599738],
#        [0.67624391, 0.58689537, 0.12594709, 0.94663371],
#        [0.32435665, 0.97719096, 0.03234169, 0.71563231]])}
# -> {'value': array([[0.98570318, 0.65956399, 0.82168898, 0.09798336],
#        [0.22426704, 0.34209978, 0.02605247, 0.48200137],
#        [0.17312096, 0.38789983, 0.42663678, 0.92652456],
#        [0.80787394, 0.92437162, 0.11185822, 0.3319638 ]])}

Create a Dataset from an Arrow Table. This constructs a Dataset backed by a single Arrow Table block.

import pyarrow as pa

# Create a tabular Dataset from an Arrow Table.
t = pa.table({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ds = ray.data.from_arrow(t)
# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: string})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

We can also build a Dataset from more than one Arrow Table, where each said Table will become a block in the Dataset.

import pyarrow as pa

data = list(range(10000))
num_chunks = 10
chunk_size = len(data) // num_chunks
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
ts = [
    pa.table({"col1": list(chunk), "col2": list(map(str, chunk))})
    for chunk in chunks
]
# Create a tabular Dataset from multiple Arrow Tables.
ds = ray.data.from_arrow(ts)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

Create a Dataset from a list of Python objects; since each object in this particular list is a dictionary, Datasets will treat this list as a list of tabular records, and will construct an Arrow Dataset.

# Create a Dataset of tabular (Arrow) records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

From Distributed Data Processing Frameworks#

In addition to working with single-node in-memory data, Datasets can be constructed from distributed (multi-node) in-memory data, interoperating with popular distributed data processing frameworks such as Dask, Spark, Modin, and Mars.

These conversions work by running Ray tasks converting each Dask/Spark/Modin/Mars data partition to a block format supported by Datasets (copying data if needed), and using the futures representing the return value of those conversion tasks as the Dataset block futures.

Note

These data processing frameworks must be running on Ray in order for these Datasets integrations to work. See how these frameworks can be run on Ray in our data processing integrations docs.

Create a Dataset from a Dask DataFrame. This constructs a Dataset backed by the distributed Pandas DataFrame partitions that underly the Dask DataFrame.

This conversion has near-zero overhead, since Datasets simply reinterprets existing Dask-in-Ray partition objects as Dataset blocks.

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ddf = dd.from_pandas(df, npartitions=4)
# Create a tabular Dataset from a Dask DataFrame.
ds = ray.data.from_dask(ddf)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

Create a Dataset from a Spark DataFrame. This constructs a Dataset backed by the distributed Spark DataFrame partitions that underly the Spark DataFrame. When this conversion happens, Spark-on-Ray (RayDP) will save the Spark DataFrame partitions to Ray’s object store in the Arrow format, which Datasets will then interpret as its blocks.

import raydp

spark = raydp.init_spark(app_name="Spark -> Datasets Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
# Create a tabular Dataset from a Spark DataFrame.
ds = ray.data.from_dask(df)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

Create a Dataset from a Modin DataFrame. This constructs a Dataset backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame.

This conversion has near-zero overhead, since Datasets simply reinterprets existing Modin partition objects as Dataset blocks.

import modin.pandas as md

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df)
# Create a tabular Dataset from a Modin DataFrame.
ds = ray.data.from_modin(mdf)
# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

Create a Dataset from a Mars DataFrame. This constructs a Dataset backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame.

This conversion has near-zero overhead, since Datasets simply reinterprets existing Mars partition objects as Dataset blocks.

import mars
import mars.dataframe as md
import pandas as pd

cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df, num_partitions=8)
# Create a tabular Dataset from a Mars DataFrame.
ds = ray.data.from_mars(mdf)
# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object})

ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}

From Torch and TensorFlow#

If you already have a Torch dataset available, you can create a Ray Dataset using from_torch.

Warning

from_torch doesn’t support parallel reads. You should only use this datasource for small datasets like MNIST or CIFAR.

import ray
import torchvision

dataset = torchvision.datasets.MNIST("data", download=True)
dataset = ray.data.from_torch(dataset)
dataset.take(1)
# (<PIL.Image.Image image mode=L size=28x28 at 0x1142CCA60>, 5)

If you already have a TensorFlow dataset available, you can create a Ray Dataset using from_tf.

Warning

from_tf doesn’t support parallel reads. You should only use this function with small datasets like MNIST or CIFAR.

import ray
import tensorflow_datasets as tfds

dataset, _ = tfds.load("cifar10", split=["train", "test"])
dataset = ray.data.from_tf(dataset)

dataset
# -> Dataset(num_blocks=200, num_rows=50000, schema={id: binary, image: ArrowTensorType(shape=(32, 32, 3), dtype=uint8), label: int64})

From 🤗 (Hugging Face) Datasets#

You can convert 🤗 Datasets into Ray Datasets by using from_huggingface. This function accesses the underlying Arrow table and converts it into a Ray Dataset directly.

Warning

from_huggingface doesn’t support parallel reads. This will not usually be an issue with in-memory 🤗 Datasets, but may fail with large memory-mapped 🤗 Datasets. 🤗 IterableDataset objects are not supported.

import ray.data
from datasets import load_dataset

hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_datasets = ray.data.from_huggingface(hf_datasets)
ray_datasets["train"].take(2)
# [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]

From MongoDB#

A Dataset can also be created from MongoDB with read_mongo. This interacts with MongoDB similar to external filesystems, except here you will need to specify the MongoDB source by its uri, database and collection, and specify a pipeline to run against the collection. The execution results are then used to create a Dataset.

Note

This example is not runnable as-is; you’ll need to point it at your MongoDB instance.

import ray

# Read a local MongoDB.
ds = ray.data.read_mongo(
    uri="mongodb://localhost:27017",
    database="my_db",
    collection="my_collection",
    pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)

# Reading a remote MongoDB is the same.
ds = ray.data.read_mongo(
    uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin",
    database="my_db",
    collection="my_collection",
    pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)

# Write back to MongoDB.
ds.write_mongo(
    MongoDatasource(),
    uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin",
    database="my_db",
    collection="my_collection",
)

Reading From SQL Databases#

Call read_sql() to read data from a database that provides a Python DB API2-compliant connector.

To read from MySQL, install MySQL Connector/Python. It’s the first-party MySQL database connector.

pip install mysql-connector-python

Then, define your connection login and query the database.

import mysql.connector

import ray

def create_connection():
    return mysql.connector.connect(
        user="admin",
        password=...,
        host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
        connection_timeout=30,
        database="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from PostgreSQL, install Psycopg 2. It’s the most popular PostgreSQL database connector.

pip install psycopg2-binary

Then, define your connection login and query the database.

import psycopg2

import ray

def create_connection():
    return psycopg2.connect(
        user="postgres",
        password=...,
        host="example-postgres-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
        dbname="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from Snowflake, install the Snowflake Connector for Python.

pip install snowflake-connector-python

Then, define your connection login and query the database.

import snowflake.connector

import ray

def create_connection():
    return snowflake.connector.connect(
        user=...,
        password=...
        account="ZZKXUVH-IPB52023",
        database="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from Databricks, install the Databricks SQL Connector for Python.

pip install databricks-sql-connector

Then, define your connection logic and read from the Databricks SQL warehouse.

from databricks import sql

import ray

def create_connection():
    return sql.connect(
        server_hostname="dbc-1016e3a4-d292.cloud.databricks.com",
        http_path="/sql/1.0/warehouses/a918da1fc0b7fed0",
        access_token=...,


# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from BigQuery, install the Python Client for Google BigQuery. This package includes a DB API2-compliant database connector.

pip install google-cloud-bigquery

Then, define your connection login and query the dataset.

from google.cloud import bigquery
from google.cloud.bigquery import dbapi

import ray

def create_connection():
    client = bigquery.Client(...)
    return dbapi.Connection(client)

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

Custom Datasources#

Datasets can read and write in parallel to custom datasources defined in Python. Once you have implemented YourCustomDataSource, you can use it like any other source in Ray Data:

# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)

# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)

For more details, check out guide for implementing a custom Datasets datasource.

Performance Considerations#

Read Parallelism#

Datasets automatically selects the read parallelism according to the following procedure:

  1. The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster.

  2. The parallelism is set to the estimated number of CPUs multiplied by 2. If the parallelism is less than 8, it is set to 8.

  3. The in-memory data size is estimated. If the parallelism would create in-memory blocks that are larger on average than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

  4. The parallelism is truncated to min(num_files, parallelism).

To perform the read, parallelism parallel read tasks will be launched, each reading one or more files and each creating a single block of data. When reading from remote datasources, these parallel read tasks will be spread across the nodes in your Ray cluster, creating the distributed collection of blocks that makes up a distributed Ray Dataset.

../_images/dataset-read.svg

This default parallelism can be overridden via the parallelism argument; see the performance guide for tips on how to tune this read parallelism.

Deferred Read Task Execution#

Datasets created via the ray.data.read_*() APIs are lazy: no read tasks are executed until a downstream consumption operation triggers execution. Metadata inspection functions like ds.schema() and ds.show() will trigger execution of only one or some tasks, instead of all tasks. This allows metadata to be inspected right away. Execution of all read tasks can be triggered manually using the ds.materialize() API.