Getting Started
Contents
Getting Started#
A Ray Dataset
is a distributed data collection. It holds
a list of Ray object references pointing to distributed data blocks, and has APIs
for distributed data loading and processing. Each block holds an ordered collection
of items in either an Arrow table
(when creating from or transforming to tabular or tensor data) or a Python list (for non-tabular Python objects).
In this tutorial you will learn how to:
Create and save a Ray
Dataset
.Transform a
Dataset
.Pass a
Dataset
to Ray tasks/actors and access the data inside.
Tip
Run pip install "ray[data]"
to get started!
Creating and Saving Datasets#
You can create a Dataset from Python objects. These objects can be held inside Dataset as the plain Python objects (where the schema is a Python type), or as Arrow records (in which case their schema is Arrow).
import ray
# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 1, 2, 3, 4]
ds.schema()
# <class 'int'>
# Create a Dataset from Python objects, which are held as Arrow records.
ds = ray.data.from_items([
{"sepal.length": 5.1, "sepal.width": 3.5,
"petal.length": 1.4, "petal.width": 0.2, "variety": "Setosa"},
{"sepal.length": 4.9, "sepal.width": 3.0,
"petal.length": 1.4, "petal.width": 0.2, "variety": "Setosa"},
{"sepal.length": 4.7, "sepal.width": 3.2,
"petal.length": 1.3, "petal.width": 0.2, "variety": "Setosa"},
])
# Dataset(num_blocks=3, num_rows=3,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
ds.show()
# -> {'sepal.length': 5.1, 'sepal.width': 3.5,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0,
# 'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.7, 'sepal.width': 3.2,
# 'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}
ds.schema()
# -> sepal.length: double
# -> sepal.width: double
# -> petal.length: double
# -> petal.width: double
# -> variety: string
Datasets can also be created from files on local disk or remote datasources such as S3. Any filesystem supported by pyarrow can be used to specify file locations. See more at Creating Datasets.
# Create from CSV.
# Tip: "example://" is a convenient protocol to access the
# python/ray/data/examples/data directory.
ds = ray.data.read_csv("example://iris.csv")
# Dataset(num_blocks=1, num_rows=150,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
# Create from Parquet.
ds = ray.data.read_parquet("example://iris.parquet")
# Dataset(num_blocks=1, num_rows=150,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
Once you have a Dataset (potentially after transformation), you can save it to local
or remote storage in desired format using methods such as write_csv()
,
write_json()
, and write_parquet()
.
See more at Saving Datasets.
# Write to Parquet files in /tmp/iris.
ds.write_parquet("/tmp/iris")
# -> /tmp/iris/data_000000.parquet
# Use repartition to control the number of output files:
ds.repartition(2).write_parquet("/tmp/iris2")
# -> /tmp/iris2/data_000000.parquet
# -> /tmp/iris2/data_000001.parquet
See the Creating Datasets and Saving Datasets guides for more details on how to create and save datasets.
Transforming Datasets#
Once you have a Dataset
, you can transform it by applying a
user-defined function, which produces another
Dataset
. Under the hood, the transformation is executed in parallel
for performance at scale.
import pandas
# Create 10 blocks for parallelism.
ds = ds.repartition(10)
# Dataset(num_blocks=10, num_rows=150,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]
transformed_ds = ds.map_batches(transform_batch)
# Dataset(num_blocks=10, num_rows=3,
# schema={sepal.length: float64, sepal.width: float64,
# petal.length: float64, petal.width: float64, variety: object})
transformed_ds.show()
# -> {'sepal.length': 5.2, 'sepal.width': 2.7,
# 'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 5.4, 'sepal.width': 3.0,
# 'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'}
# -> {'sepal.length': 4.9, 'sepal.width': 2.5,
# 'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'}
Tip
Datasets also provide the convenience transformation methods map()
,
flat_map()
, and filter()
, which are not
vectorized (slower than map_batches()
), but may be useful for development.
These transformations are composable. You can further apply transformations on the output Dataset, forming a chain of transformations to express more complex logic.
By default, transformations are executed using Ray tasks.
For transformations that require setup, you may want to use Ray actors by specifying
compute=ray.data.ActorPoolStrategy(min, max)
and Ray will use an autoscaling
actor pool of min
to max
actors to execute your transforms. This will cache
the stateful setup at the actor creation time, which is particularly useful if the
setup is expensive.
See the Transforming Datasets guide for an in-depth guide on transforming datasets.
Accessing and exchanging datasets#
Datasets can be passed to Ray tasks or actors and accessed with
iter_batches()
or
iter_rows()
.
This does not incur a copy, since the blocks of the Dataset are passed by reference
as Ray objects:
@ray.remote
def consume(data) -> int:
num_batches = 0
for batch in data.iter_batches(batch_size=10):
num_batches += 1
return num_batches
ray.get(consume.remote(ds))
# -> 15
Datasets can be split up into disjoint sub-datasets.
Locality-aware splitting is supported if you pass in a list of actor handles to the
split()
function along with the number of desired
splits.
This is a common pattern useful for loading and splitting data between distributed
training actors:
@ray.remote
class Worker:
def __init__(self, rank: int):
pass
def train(self, shard) -> int:
for batch in shard.iter_batches(batch_size=256):
pass
return shard.count()
workers = [Worker.remote(i) for i in range(4)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]
shards = ds.split(n=4, locality_hints=workers)
# -> [
# Dataset(num_blocks=3, num_rows=45,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string}),
# Dataset(num_blocks=3, num_rows=45,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string}),
# Dataset(num_blocks=2, num_rows=30,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string}),
# Dataset(num_blocks=2, num_rows=30,
# schema={sepal.length: double, sepal.width: double,
# petal.length: double, petal.width: double, variety: string}),
# ]
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [45, 45, 30, 30]
See the Consuming Datasets guide for an in-depth guide on accessing and exchanging datasets.