Creating Datasets¶
A Dataset
can be created from:
generated synthetic data,
local and distributed in-memory data, and
local and external storage systems (local disk, cloud storage, HDFS, etc.).
Creation from existing in-memory data is enabled via Datasets’ integrations with familiar single-node data libraries (Pandas, NumPy, Arrow) and distributed data processing frameworks (Dask, Spark, Modin, Mars). Creating datasets from persistent storage is enabled by Datasets’ support for reading many common file formats (Parquet, CSV, JSON, NPY, text, binary).
A Dataset
can hold plain Python objects (simple datasets),
Arrow records (Arrow datasets), or Pandas records (Pandas datasets). These records are
grouped into one or more data blocks, and these blocks can be spread across
multiple Ray nodes. Simple datasets are represented by simple blocks (lists of Python
objects), Arrow datasets are represented by Arrow blocks (
Arrow Tables
), and Pandas
datasets are represented by Pandas blocks (
Pandas DataFrames
).
The method of creating the dataset will determine the format of its internal block representation.
See more about Datasets’ internal block representation
The following details the block representation for each creation method:
Reading tabular files (Parquet, CSV, JSON) and converting directly from Arrow produces Arrow datasets.
Converting from Pandas, Dask, Modin, and Mars produces Pandas datasets.
Reading NumPy files or converting from NumPy ndarrays produces Arrow datasets.
Reading text and raw binary files produces simple datasets.
The following figure visualizes a Dataset
that has three
Arrow Table
blocks, each block holding 1000 rows:
This guide surveys the many ways to create a Dataset
. If none of these meet your
needs, please reach out on Discourse or open a feature
request on the Ray GitHub repo, and check out
our guide for implementing a custom Datasets datasource
if you’re interested in rolling your own integration!
Generating Synthetic Data¶
Using Datasets with small, generated data is a great way to test out some of Datasets’
features. Each of these synthetic data generators will generate data in Ray tasks that
will execute in parallel and will be load-balanced across your Ray cluster; the
Dataset
will hold a set of futures representing the return values of those tasks,
serving as pointers to a collection of distributed data blocks.
Create a Dataset
from a range of integers.
# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 1, 2, 3, 4]
Create an Arrow (tabular) Dataset
from a range of integers,
with a single column containing this integer range.
# Create a Dataset of Arrow records.
ds = ray.data.range_table(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema={value: int64})
ds.take(5)
# -> [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}, {'value': 4}]
Create a tensor dataset from a range of integers, packing this integer range into tensors of the provided shape.
# Create a Dataset of tensors.
ds = ray.data.range_tensor(100 * 64 * 64, shape=(64, 64))
# -> Dataset(
# num_blocks=200,
# num_rows=409600,
# schema={value: <ArrowTensorType: shape=(64, 64), dtype=int64>}
# )
ds.take(2)
# -> [array([[0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0],
# ...,
# [0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0],
# [0, 0, 0, ..., 0, 0, 0]]),
# array([[1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# ...,
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1],
# [1, 1, 1, ..., 1, 1, 1]])]
From Local and Distributed In-Memory Data¶
Datasets can be constructed from existing in-memory data. In addition to being able to
construct a Dataset
from plain Python objects, Datasets also interoperates with popular
single-node libraries (Pandas,
NumPy, Arrow) as well as
distributed frameworks (Dask, Spark,
Modin, Mars).
Integration with Single-Node Data Libraries¶
In this section, we demonstrate creating a Dataset
from single-node in-memory data.
Create a Dataset
from a Pandas DataFrame. This constructs a Dataset
backed by a single Pandas DataFrame block.
import pandas as pd
# Create a tabular Dataset from a Pandas DataFrame.
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ds = ray.data.from_pandas(df)
# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: object})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
We can also build a Dataset
from more than one Pandas DataFrame, where each said
DataFrame will become a block in the Dataset
.
import pandas as pd
data = list(range(10000))
num_chunks = 10
chunk_size = len(data) // num_chunks
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
dfs = [
pd.DataFrame({"col1": list(chunk), "col2": list(map(str, chunk))})
for chunk in chunks
]
# Create a tabular Dataset from multiple Pandas DataFrames.
ds = ray.data.from_pandas(dfs)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
Create a Dataset
from a NumPy ndarray. This constructs a Dataset
backed by a single-column Arrow table block; the outer dimension of the ndarray
will be treated as the row dimension, and the column with have name "value"
.
import numpy as np
# Create a tensor Dataset from a 1D NumPy ndarray.
arr = np.arange(100)
ds = ray.data.from_numpy(arr)
# -> Dataset(
# num_blocks=1,
# num_rows=100,
# schema={value: <ArrowTensorType: shape=(), dtype=int64>},
# )
# Each element is a scalar ndarray.
ds.show(3)
# -> {'value': array(0)}
# -> {'value': array(1)}
# -> {'value': array(2)}
# Create a tensor Dataset from a 3D NumPy ndarray.
arr = np.ones((3, 4, 4))
# The outer dimension is treated as the row dimension.
ds = ray.data.from_numpy(arr)
# -> Dataset(
# num_blocks=1,
# num_rows=3,
# schema={value: <ArrowTensorType: shape=(4, 4), dtype=int64>},
# )
ds.show(2)
# -> {'value': array([[1., 1., 1., 1.],
# [1., 1., 1., 1.],
# [1., 1., 1., 1.],
# [1., 1., 1., 1.]])}
# -> {'value': array([[1., 1., 1., 1.],
# [1., 1., 1., 1.],
# [1., 1., 1., 1.],
# [1., 1., 1., 1.]])}
We can also build a Dataset
from more than one NumPy ndarray, where each said
ndarray will become a single-column Arrow table block in the Dataset
.
import numpy as np
# Create a tensor Dataset from multiple 3D NumPy ndarray.
arrs = [np.random.rand(2, 4, 4) for _ in range(4)]
# The outer dimension is treated as the row dimension.
ds = ray.data.from_numpy(arrs)
# -> Dataset(
# num_blocks=4,
# num_rows=8,
# schema={value: <ArrowTensorType: shape=(4, 4), dtype=int64>},
# )
ds.show(2)
# -> {'value': array([[0.06587483, 0.67808656, 0.76461924, 0.83428549],
# [0.04932103, 0.25112165, 0.26476714, 0.24599738],
# [0.67624391, 0.58689537, 0.12594709, 0.94663371],
# [0.32435665, 0.97719096, 0.03234169, 0.71563231]])}
# -> {'value': array([[0.98570318, 0.65956399, 0.82168898, 0.09798336],
# [0.22426704, 0.34209978, 0.02605247, 0.48200137],
# [0.17312096, 0.38789983, 0.42663678, 0.92652456],
# [0.80787394, 0.92437162, 0.11185822, 0.3319638 ]])}
Create a Dataset
from an
Arrow Table.
This constructs a Dataset
backed by a single Arrow Table
block.
import pyarrow as pa
# Create a tabular Dataset from an Arrow Table.
t = pa.table({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ds = ray.data.from_arrow(t)
# -> Dataset(num_blocks=1, num_rows=10000, schema={col1: int64, col2: string})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
We can also build a Dataset
from more than one Arrow Table, where each said
Table
will become a block in the Dataset
.
import arrow as pd
data = list(range(10000))
num_chunks = 10
chunk_size = len(data) // num_chunks
chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
ts = [
pa.table({"col1": list(chunk), "col2": list(map(str, chunk))})
for chunk in chunks
]
# Create a tabular Dataset from multiple Arrow Tables.
ds = ray.data.from_arrow(ts)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
Create a Dataset
from a list of Python objects; since each object in this
particular list is a dictionary, Datasets will treat this list as a list of tabular
records, and will construct an Arrow Dataset
.
# Create a Dataset of tabular (Arrow) records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
Integration with Distributed Data Processing Frameworks¶
In addition to working with single-node in-memory data, Datasets can be constructed from distributed (multi-node) in-memory data, interoperating with popular distributed data processing frameworks such as Dask, Spark, Modin, and Mars.
The common paradigm used by
these conversions is to send out Ray tasks converting each Dask/Spark/Modin/Mars
data partition to a format that Datasets can understand (if needed), and using the
futures representing the return value of those conversion tasks as the Dataset
block
futures. If the upstream framework’s data partitions are already in a format that
Datasets understands (e.g. Arrow or Pandas), Datasets will elide the conversion task and
will instead reinterpret those data partitions directly as its blocks.
Note
These data processing frameworks must be running on Ray in order for these Datasets integrations to work. See how these frameworks can be run on Ray in our data processing integrations docs.
Create a Dataset
from a
Dask DataFrame. This constructs a
Dataset
backed by the distributed Pandas DataFrame partitions that underly the
Dask DataFrame.
Note
This conversion should have near-zero overhead: it involves zero data copying and zero data movement. Datasets simply reinterprets the existing Dask DataFrame partitions as Ray Datasets partitions without touching the underlying data.
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ddf = dd.from_pandas(df, npartitions=4)
# Create a tabular Dataset from a Dask DataFrame.
ds = ray.data.from_dask(ddf)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: object})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
Create a Dataset
from a Spark DataFrame.
This constructs a Dataset
backed by the distributed Spark DataFrame partitions
that underly the Spark DataFrame. When this conversion happens, Spark-on-Ray (RayDP)
will save the Spark DataFrame partitions to Ray’s object store in the Arrow format,
which Datasets will then interpret as its blocks.
import raydp
spark = raydp.init_spark(app_name="Spark -> Datasets Example",
num_executors=2,
executor_cores=2,
executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
# Create a tabular Dataset from a Spark DataFrame.
ds = ray.data.from_dask(df)
# -> Dataset(num_blocks=10, num_rows=10000, schema={col1: int64, col2: string})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
Create a Dataset
from a Modin DataFrame. This constructs a Dataset
backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame.
Note
This conversion should have near-zero overhead: it involves zero data copying and zero data movement. Datasets simply reinterprets the existing Modin DataFrame partitions as Ray Datasets partitions without touching the underlying data.
import modin.pandas as md
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df)
# Create a tabular Dataset from a Modin DataFrame.
ds = ray.data.from_modin(mdf)
# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
Create a Dataset
from a Mars DataFrame. This constructs a Dataset
backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame.
Note
This conversion should have near-zero overhead: it involves zero data copying and zero data movement. Datasets simply reinterprets the existing Mars DataFrame partitions as Ray Datasets partitions without touching the underlying data.
import mars
import mars.dataframe as md
cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df, num_partitions=8)
# Create a tabular Dataset from a Mars DataFrame.
ds = ray.data.from_mars(mdf)
# -> Dataset(num_blocks=8, num_rows=10000, schema={col1: int64, col2: object})
ds.show(3)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
Reading Files From Storage¶
Using the ray.data.read_*()
APIs, Datasets can be created from files on local disk
or remote storage system such as S3, GCS, Azure Blob Storage, or HDFS. Any filesystem
supported by pyarrow
can be used to specify file locations, and many common file formats are supported:
Parquet, CSV, JSON, NPY, text, binary.
Parallel + Distributed Reading¶
Each of these APIs take a path or list of paths to files or directories. Any directories provided will be walked in order to obtain concrete file paths, at which point all files will be read in parallel.
Datasets uses a default parallelism of 200, truncated by the number of files being read:
parallelism = min(num_files, 200)
. parallelism
parallel read tasks will be
launched, each reading one or more files and each creating a single block of data.
When reading from remote datasources, these parallel read tasks will be spread across
the nodes in your Ray cluster, creating the distributed collection of blocks that makes
up a distributed Ray Dataset.
This default parallelism can be overridden via the parallelism
argument; see the
performance guide for tips on how to tune this read
parallelism.
Deferred Read Task Execution¶
Datasets created via the ray.data.read_*()
APIs are semi-lazy: initially, only the
first read task will be executed. This avoids blocking Dataset creation on the reading
of all data files, enabling inspection functions like
ds.schema()
and
ds.show()
to be used right away. Executing further
transformations on the Dataset will trigger execution of all read tasks, and execution
of all read tasks can be triggered manually using the
ds.fully_executed()
API.
Supported File Formats¶
Read Parquet files into a tabular Dataset
. The Parquet data will be read into
Arrow Table
blocks. Although this simple example demonstrates reading a single file, note that
Datasets can also read directories of Parquet files, with one tabular block created
per file. For Parquet in particular, we also support reading partitioned Parquet
datasets with partition column values pulled from the file paths.
# Create a tabular Dataset by reading a Parquet file.
ds = ray.data.read_parquet("example://iris.parquet")
# -> Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# }
# )
ds.show(2)
# -> {
# 'sepal.length': 5.1,
# 'sepal.width': 3.5,
# 'petal.length': 1.4,
# 'petal.width': 0.2,
# 'variety': 'Setosa',
# }
# -> {
# 'sepal.length': 4.9,
# 'sepal.width': 3.0,
# 'petal.length': 1.4,
# 'petal.width': 0.2,
# 'variety': 'Setosa',
# }
Datasets’ Parquet reader also supports projection and filter pushdown, allowing column selection and row filtering to be pushed down to the file scan. For column selection, unselected columns will never be read from the file.
import pyarrow as pa
# Create a tabular Dataset by reading a Parquet file, pushing column selection and row
# filtering down to the file scan.
ds = ray.data.read_parquet(
"example://iris.parquet",
columns=["sepal.length", "variety"],
filter=pa.dataset.field("sepal.length") > 5.0,
).fully_executed() # Force a full read of the file.
# -> Dataset(num_blocks=1, num_rows=118, schema={sepal.length: double, variety: string})
ds.show(2)
# -> {'sepal.length': 5.1, 'variety': 'Setosa'}
# {'sepal.length': 5.4, 'variety': 'Setosa'}
See the API docs for read_parquet()
.
Read CSV files into a tabular Dataset
. The CSV data will be read into
Arrow Table
blocks. Although this simple example demonstrates reading a single file, note that
Datasets can also read directories of CSV files, with one tabular block created
per file.
# Create a tabular Dataset by reading a CSV file.
ds = ray.data.read_csv("example://iris.csv")
# -> Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# }
# )
ds.show(2)
# -> {
# 'sepal.length': 5.1,
# 'sepal.width': 3.5,
# 'petal.length': 1.4,
# 'petal.width': 0.2,
# 'variety': 'Setosa',
# }
# -> {
# 'sepal.length': 4.9,
# 'sepal.width': 3.0,
# 'petal.length': 1.4,
# 'petal.width': 0.2,
# 'variety': 'Setosa',
# }
See the API docs for read_csv()
.
Read JSON files into a tabular Dataset
. The JSON data will be read into
Arrow Table
blocks. Although this simple example demonstrates reading a single file, note that
Datasets can also read directories of JSON files, with one tabular block created
per file.
Currently, only newline-delimited JSON (NDJSON) is supported.
# Create a tabular Dataset by reading a JSON file.
ds = ray.data.read_json("example://iris.json")
# -> Dataset(
# num_blocks=1,
# num_rows=150,
# schema={
# sepal.length: double,
# sepal.width: double,
# petal.length: double,
# petal.width: double,
# variety: string,
# }
# )
ds.show(2)
# -> {
# 'sepal.length': 5.1,
# 'sepal.width': 3.5,
# 'petal.length': 1.4,
# 'petal.width': 0.2,
# 'variety': 'Setosa',
# }
# -> {
# 'sepal.length': 4.9,
# 'sepal.width': 3.0,
# 'petal.length': 1.4,
# 'petal.width': 0.2,
# 'variety': 'Setosa',
# }
See the API docs for read_json()
.
Read NumPy files into a tensor Dataset
. The NumPy ndarray data will be read into
single-column
Arrow Table
blocks using our
tensor extension type
,
treating the outermost ndarray dimension as the row dimension. See our
tensor data guide for more information on working
with tensors in Datasets. Although this simple example demonstrates reading a single
file, note that Datasets can also read directories of JSON files, with one tensor
block created per file.
# Create a tensor Dataset by reading a NumPy file.
ds = ray.data.read_numpy("example://mnist_subset.npy")
# -> Dataset(
# num_blocks=1,
# num_rows=3,
# schema={__RAY_TC__: <ArrowTensorType: shape=(28, 28), dtype=uint8>},
# )
ds.show(2)
# [array([[0, ...]]), array([[0, ...]])]
See the API docs for read_numpy()
.
Read text files into a Dataset
. Each line in each text file will be treated as a
row in the dataset, resulting in a list-of-strings block being created for each text
file.
# Create a tabular Dataset by reading a text file.
ds = ray.data.read_text("example://sms_spam_collection_subset.txt")
# -> Dataset(num_blocks=1, num_rows=10, schema=<class 'str'>)
ds.show(3)
# -> ham Go until jurong point, crazy.. Available only in bugis n great world la e
# buffet... Cine there got amore wat...
# ham Ok lar... Joking wif u oni...
# spam Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA
# to 87121 to receive entry question(std txt rate)T&C's apply
# 08452810075over18's
See the API docs for read_text()
.
Read binary files into a Dataset
. Each binary file will be treated as a single row
of opaque bytes. These bytes can be decoded into tensor, tabular, text, or any other
kind of data using ds.map()
to apply a per-row decoding UDF.
Although this simple example demonstrates reading a single file, note that Datasets can also read directories of binary files, with one bytes block created per file.
from io import BytesIO
import PIL
# Create a tabular Dataset by reading a binary file.
ds = ray.data.read_binary_files("example://mnist_subset_partitioned/0/1.png")
# -> Dataset(num_blocks=1, num_rows=1, schema=<class 'bytes'>)
ds = ds.map(lambda bytes_: np.asarray(PIL.Image.open(BytesIO(bytes_)).convert("L")))
# -> Dataset(
# num_blocks=1,
# num_rows=1,
# schema={__RAY_TC__: <ArrowTensorType: shape=(28, 28), dtype=uint8>},
# )
ds.show(3)
# -> ham Go until jurong point, crazy.. Available only in bugis n great world la e
# buffet... Cine there got amore wat...
# ham Ok lar... Joking wif u oni...
# spam Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA
# to 87121 to receive entry question(std txt rate)T&C's apply
# 08452810075over18's
See the API docs for read_binary_files()
.
Reading from Remote Storage¶
All of the file formats mentioned above can be read from remote storage, such as S3,
GCS, Azure Blob Storage, and HDFS. These storage systems are supported via Arrow’s
filesystem APIs natively for S3 and HDFS, and as a wrapper around fsspec for GCS and
HDFS. All ray.data.read_*()
APIs expose a filesystem
argument that accepts both
Arrow FileSystem instances
and fsspec FileSystem instances,
allowing you to configure this connection to the remote storage system, such as
authn/authz and buffer/block size.
For S3 and HDFS, the underlying FileSystem
implementation will be inferred from the URL scheme ("s3://"
and "hdfs://"
); if
the default connection configuration suffices for your workload, you won’t need to
specify a filesystem
argument.
We use Parquet files for the below examples, but all of the aforementioned file formats are supported for each of these storage systems.
The AWS S3 storage system is inferred from the URI scheme (s3://
), with required connection
configuration such as S3 credentials being pulled from the machine’s environment
(e.g. the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables).
# Create a tabular Dataset by reading a Parquet file from S3.
ds = ray.data.read_parquet("s3://ursa-labs-taxi-data/2009/01/data.parquet")
# -> Dataset(
# num_blocks=1,
# num_rows=14092413,
# schema={
# vendor_id: string,
# pickup_at: timestamp[us],
# dropoff_at: timestamp[us],
# passenger_count: int8,
# trip_distance: float,
# pickup_longitude: float,
# pickup_latitude: float,
# ...,
# },
# )
ds.show(2)
# -> {
# 'vendor_id': 'VTS',
# 'pickup_at': datetime.datetime(2009, 1, 4, 2, 52),
# 'dropoff_at': datetime.datetime(2009, 1, 4, 3, 2),
# 'passenger_count': 1,
# 'trip_distance': 2.630000114440918,
# 'pickup_longitude': -73.99195861816406,
# 'pickup_latitude': 40.72156524658203,
# ...,
# }
# {
# 'vendor_id': 'VTS',
# 'pickup_at': datetime.datetime(2009, 1, 4, 3, 31),
# 'dropoff_at': datetime.datetime(2009, 1, 4, 3, 38),
# 'passenger_count': 3,
# 'trip_distance': 4.550000190734863,
# 'pickup_longitude': -73.98210144042969,
# 'pickup_latitude': 40.736289978027344,
# ...,
# }
If needing to customize this S3 storage system connection (credentials, region,
endpoint override, etc.), you can pass in an
S3FileSystem instance
to read_parquet()
.
Note
This example is not runnable as-is; to run it on your own private S3 data, add in a path to your private bucket and specify your S3 credentials.
import pyarrow as pa
# Create a tabular Dataset by reading a Parquet file from a private S3 bucket.
# NOTE: This example is not runnable as-is; add in a path to your private bucket and the
# required S3 credentials!
ds = ray.data.read_parquet(
"s3://some/private/bucket",
filesystem=pa.fs.S3FileSystem(
region="us-west-2",
access_key="XXXX",
secret_key="XXXX",
),
)
The HDFS storage system is inferred from the URI scheme (hdfs://
), with required connection
configuration such as the host and the port being derived from the URI.
Note
This example is not runnable as-is; you’ll need to point it at your HDFS cluster/data.
# Create a tabular Dataset by reading a Parquet file from HDFS using HDFS connection
# automatically constructed based on the URI.
# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS
# cluster/data.
ds = ray.data.read_parquet("hdfs://<host:port>/path/to/file.parquet")
If needing to customize this HDFS storage system connection (host, port, user, kerb
ticket, etc.), you can pass in an HDFSFileSystem
instance to read_parquet()
.
Note
This example is not runnable as-is; you’ll need to point it at your HDFS cluster/data.
import pyarrow as pa
# Create a tabular Dataset by reading a Parquet file from HDFS, manually specifying a
# configured HDFS connection via a Pyarrow HDFSFileSystem instance.
# NOTE: This example is not runnable as-is; you'll need to point it at your HDFS
# cluster/data.
ds = ray.data.read_parquet(
"hdfs://path/to/file.parquet",
filesystem=pa.fs.HDFSFileSystem(host="localhost", port=9000, user="bob"),
)
Data can be read from Google Cloud Storage by providing a configured gcsfs GCSFileSystem, where the appropriate Google Cloud project and credentials can be specified.
Note
This example is not runnable as-is; you’ll need to point it at your GCS bucket and configure your GCP project and credentials.
import gcsfs
# Create a tabular Dataset by reading a Parquet file from GCS, passing the configured
# GCSFileSystem.
# NOTE: This example is not runnable as-is; you'll need to point it at your GCS bucket
# and configure your GCP project and credentials.
ds = ray.data.read_parquet(
"gs://path/to/file.parquet",
filesystem=gcsfs.GCSFileSystem(project="my-google-project"),
)
Data can be read from Azure Blob Storage by providing a configured adlfs AzureBlobFileSystem, where the appropriate account name and account key can be specified.
import adlfs
# Create a tabular Dataset by reading a Parquet file from Azure Blob Storage, passing
# the configured AzureBlobFileSystem.
path = (
"az://nyctlc/yellow/puYear=2009/puMonth=1/"
"part-00019-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426333-4"
".c000.snappy.parquet"
)
ds = ray.data.read_parquet(
path,
filesystem=adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)
From Torch and TensorFlow¶
If you already have a Torch dataset available, you can create a Ray Dataset using
SimpleTorchDatasource
.
Warning
SimpleTorchDatasource
doesn’t support parallel
reads. You should only use this datasource for small datasets like MNIST or
CIFAR.
import ray.data
from ray.data.datasource import SimpleTorchDatasource
import torchvision
dataset_factory = lambda: torchvision.datasets.MNIST("data", download=True)
dataset = ray.data.read_datasource(
SimpleTorchDatasource(), parallelism=1, dataset_factory=dataset_factory
)
dataset.take(1)
# (<PIL.Image.Image image mode=L size=28x28 at 0x1142CCA60>, 5)
If you already have a TensorFlow dataset available, you can create a Ray Dataset
using SimpleTensorFlowDatasource
.
Warning
SimpleTensorFlowDatasource
doesn’t support parallel reads. You
should only use this datasource for small datasets like MNIST or CIFAR.
import ray.data
from ray.data.datasource import SimpleTensorFlowDatasource
import tensorflow_datasets as tfds
def dataset_factory():
return tfds.load("cifar10", split=["train"], as_supervised=True)[0]
dataset = ray.data.read_datasource(
SimpleTensorFlowDatasource(),
parallelism=1,
dataset_factory=dataset_factory
)
features, label = dataset.take(1)[0]
features.shape # TensorShape([32, 32, 3])
label # <tf.Tensor: shape=(), dtype=int64, numpy=7>
From 🤗 (Hugging Face) Datasets¶
You can convert 🤗 Datasets into Ray Datasets by using
from_huggingface
. This function accesses the underlying Arrow table and
converts it into a Ray Dataset directly.
Warning
from_huggingface
doesn’t support parallel
reads. This will not usually be an issue with in-memory 🤗 Datasets,
but may fail with large memory-mapped 🤗 Datasets. 🤗 IterableDataset
objects are not supported.
import ray.data
from datasets import load_dataset
hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_datasets = ray.data.from_huggingface(hf_datasets)
ray_datasets["train"].take(2)
# [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]