Dataset API#

Dataset#

class ray.data.Dataset(plan: ExecutionPlan, logical_plan: LogicalPlan)[source]#

A Dataset is a distributed data collection for data loading and processing.

Datasets are distributed pipelines that produce ObjectRef[Block] outputs, where each block holds data in Arrow format, representing a shard of the overall data collection. The block also determines the unit of parallelism. For more details, see Ray Data Internals.

Datasets can be created in multiple ways: from synthetic data via range_*() APIs, from existing memory data via from_*() APIs (this creates a subclass of Dataset called MaterializedDataset), or from external storage systems such as local disk, S3, HDFS etc. via the read_*() APIs. The (potentially processed) Dataset can be saved back to external storage systems via the write_*() APIs.

Examples

import ray
# Create dataset from synthetic data.
ds = ray.data.range(1000)
# Create dataset from in-memory data.
ds = ray.data.from_items(
    [{"col1": i, "col2": i * 2} for i in range(1000)]
)
# Create dataset from external storage system.
ds = ray.data.read_parquet("s3://bucket/path")
# Save dataset back to external storage system.
ds.write_csv("s3://bucket/output")

Dataset has two kinds of operations: transformation, which takes in Dataset and outputs a new Dataset (e.g. map_batches()); and consumption, which produces values (not a data stream) as output (e.g. iter_batches()).

Dataset transformations are lazy, with execution of the transformations being triggered by downstream consumption.

Dataset supports parallel processing at scale: transformations such as map_batches(), aggregations such as min()/max()/mean(), grouping via groupby(), shuffling operations such as sort(), random_shuffle(), and repartition().

Examples

>>> import ray
>>> ds = ray.data.range(1000)
>>> # Transform batches (Dict[str, np.ndarray]) with map_batches().
>>> ds.map_batches(lambda batch: {"id": batch["id"] * 2})  
MapBatches(<lambda>)
+- Dataset(num_rows=1000, schema={id: int64})
>>> # Compute the maximum.
>>> ds.max("id")
999
>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()  
RandomShuffle
+- Dataset(num_rows=1000, schema={id: int64})
>>> # Sort it back in order.
>>> ds.sort("id")  
Sort
+- Dataset(num_rows=1000, schema={id: int64})

Both unexecuted and materialized Datasets can be passed between Ray tasks and actors without incurring a copy. Dataset supports conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.

Basic Transformations#

Dataset.add_column

Add the given column to the dataset.

Dataset.drop_columns

Drop one or more columns from the dataset.

Dataset.filter

Filter out rows that don't satisfy the given predicate.

Dataset.flat_map

Apply the given function to each row and then flatten results.

Dataset.limit

Truncate the dataset to the first limit rows.

Dataset.map

Apply the given function to each row of this dataset.

Dataset.map_batches

Apply the given function to batches of data.

Dataset.random_sample

Returns a new Dataset containing a random fraction of the rows.

Dataset.select_columns

Select one or more columns from the dataset.

Consuming Data#

Dataset.iter_batches

Return an iterable over batches of data.

Dataset.iter_rows

Return an iterable over the rows in this dataset.

Dataset.iter_tf_batches

Return an iterable over batches of data represented as TensorFlow tensors.

Dataset.iter_torch_batches

Return an iterable over batches of data represented as Torch tensors.

Dataset.iterator

Return a DataIterator over this dataset.

Dataset.show

Print up to the given number of rows from the Dataset.

Dataset.take

Return up to limit rows from the Dataset.

Dataset.take_all

Return all of the rows in this Dataset.

Dataset.take_batch

Return up to batch_size rows from the Dataset in a batch.

Execution#

Dataset.materialize

Execute and materialize this dataset into object store memory.

Grouped and Global aggregations#

Dataset.aggregate

Aggregate values using one or more functions.

Dataset.groupby

Group rows of a Dataset according to a column.

Dataset.max

Return the maximum of one or more columns.

Dataset.mean

Compute the mean of one or more columns.

Dataset.min

Return the minimum of one or more columns.

Dataset.std

Compute the standard deviation of one or more columns.

Dataset.sum

Compute the sum of one or more columns.

Dataset.unique

List the unique elements in a given column.

I/O and Conversion#

Dataset.to_dask

Convert this Dataset into a Dask DataFrame.

Dataset.to_mars

Convert this Dataset into a Mars DataFrame.

Dataset.to_modin

Convert this Dataset into a Modin DataFrame.

Dataset.to_pandas

Convert this Dataset to a single pandas DataFrame.

Dataset.to_spark

Convert this Dataset into a Spark DataFrame.

Dataset.to_tf

Return a TensorFlow Dataset over this Dataset.

Dataset.to_torch

Return a Torch IterableDataset over this Dataset.

Dataset.write_csv

Writes the Dataset to CSV files.

Dataset.write_images

Writes the Dataset to images.

Dataset.write_json

Writes the Dataset to JSON and JSONL files.

Dataset.write_mongo

Writes the Dataset to a MongoDB database.

Dataset.write_numpy

Writes a column of the Dataset to .npy files.

Dataset.write_parquet

Writes the Dataset to parquet files under the provided path.

Dataset.write_tfrecords

Write the Dataset to TFRecord files.

Dataset.write_webdataset

Writes the dataset to WebDataset files.

Inspecting Metadata#

Dataset.columns

Returns the columns of this Dataset.

Dataset.count

Count the number of rows in the dataset.

Dataset.input_files

Return the list of input files for the dataset.

Dataset.num_blocks

Return the number of blocks of this Dataset.

Dataset.schema

Return the schema of the dataset.

Dataset.size_bytes

Return the in-memory size of the dataset.

Dataset.stats

Returns a string containing execution timing information.

Sorting, Shuffling and Repartitioning#

Dataset.random_shuffle

Randomly shuffle the rows of this Dataset.

Dataset.randomize_block_order

Randomly shuffle the blocks of this Dataset.

Dataset.repartition

Repartition the Dataset into exactly this number of blocks.

Dataset.sort

Sort the dataset by the specified key column or key function.

Splitting and Merging datasets#

Dataset.split

Materialize and split the dataset into n disjoint pieces.

Dataset.split_at_indices

Materialize and split the dataset at the given indices (like np.split).

Dataset.split_proportionately

Materialize and split the dataset using proportions.

Dataset.streaming_split

Returns n DataIterators that can be used to read disjoint subsets of the dataset in parallel.

Dataset.train_test_split

Materialize and split the dataset into train and test subsets.

Dataset.union

Concatenate Datasets across rows.

Dataset.zip

Zip the columns of this dataset with the columns of another.

Schema#

class ray.data.Schema(base_schema: pyarrow.lib.Schema | PandasBlockSchema)[source]#

Dataset schema.

base_schema#

The underlying Arrow or Pandas schema.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Developer API#

Dataset.to_pandas_refs

Converts this Dataset into a distributed set of Pandas dataframes.

Dataset.to_numpy_refs

Converts this Dataset into a distributed set of NumPy ndarrays or dictionary of NumPy ndarrays.

Dataset.to_arrow_refs

Convert this Dataset into a distributed set of PyArrow tables.

Dataset.iter_internal_ref_bundles

Get an iterator over RefBundles belonging to this Dataset.

block.Block

alias of Union[pyarrow.Table, pandas.DataFrame]

block.BlockExecStats

Execution stats for this block.

block.BlockMetadata

Metadata about the block.

block.BlockAccessor

Provides accessor methods for a specific block.