ray.data.Dataset#

class ray.data.Dataset(plan: ray.data._internal.plan.ExecutionPlan, epoch: int, lazy: bool = True, logical_plan: Optional[ray.data._internal.logical.interfaces.logical_plan.LogicalPlan] = None)[source]#

Bases: object

A Dataset is a distributed data collection for data loading and processing.

Datasets are distributed pipelines that produce ObjectRef[Block] outputs, where each block holds data in Arrow format, representing a shard of the overall data collection. The block also determines the unit of parallelism. For more details, see Ray Data Internals.

Datasets can be created in multiple ways: from synthetic data via range_*() APIs, from existing memory data via from_*() APIs (this creates a subclass of Dataset called MaterializedDataset), or from external storage systems such as local disk, S3, HDFS etc. via the read_*() APIs. The (potentially processed) Dataset can be saved back to external storage systems via the write_*() APIs.

Examples

import ray
# Create dataset from synthetic data.
ds = ray.data.range(1000)
# Create dataset from in-memory data.
ds = ray.data.from_items(
    [{"col1": i, "col2": i * 2} for i in range(1000)]
)
# Create dataset from external storage system.
ds = ray.data.read_parquet("s3://bucket/path")
# Save dataset back to external storage system.
ds.write_csv("s3://bucket/output")

Dataset has two kinds of operations: transformation, which takes in Dataset and outputs a new Dataset (e.g. map_batches()); and consumption, which produces values (not a data stream) as output (e.g. iter_batches()).

Dataset transformations are lazy, with execution of the transformations being triggered by downstream consumption.

Dataset supports parallel processing at scale: transformations such as map_batches(), aggregations such as min()/max()/mean(), grouping via groupby(), shuffling operations such as sort(), random_shuffle(), and repartition().

Examples

>>> import ray
>>> ds = ray.data.range(1000)
>>> # Transform batches (Dict[str, np.ndarray]) with map_batches().
>>> ds.map_batches(lambda batch: {"id": batch["id"] * 2})  
MapBatches(<lambda>)
+- Dataset(num_blocks=..., num_rows=1000, schema={id: int64})
>>> # Compute the maximum.
>>> ds.max("id")
999
>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()  
RandomShuffle
+- Dataset(num_blocks=..., num_rows=1000, schema={id: int64})
>>> # Sort it back in order.
>>> ds.sort("id")  
Sort
+- Dataset(num_blocks=..., num_rows=1000, schema={id: int64})

Both unexecuted and materialized Datasets can be passed between Ray tasks and actors without incurring a copy. Dataset supports conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.

PublicAPI: This API is stable across Ray releases.

Methods

__init__(plan, epoch[, lazy, logical_plan])

Construct a Dataset (internal API).

add_column(col, fn, *[, compute])

Add the given column to the dataset.

aggregate(*aggs)

Aggregate values using one or more functions.

columns([fetch_if_missing])

Returns the columns of this Dataset.

count()

Count the number of records in the dataset.

dataset_format()

default_batch_format()

deserialize_lineage(serialized_ds)

Deserialize the provided lineage-serialized Dataset.

drop_columns(cols, *[, compute])

Drop one or more columns from the dataset.

filter(fn, *[, compute])

Filter out rows that don't satisfy the given predicate.

flat_map(fn, *[, compute, ...])

Apply the given function to each row and then flatten results.

fully_executed()

get_internal_block_refs()

Get a list of references to the underlying blocks of this dataset.

groupby(key)

Group rows of a Dataset according to a column.

has_serializable_lineage()

Whether this dataset's lineage is able to be serialized for storage and later deserialized, possibly on a different cluster.

input_files()

Return the list of input files for the dataset.

is_fully_executed()

iter_batches(*[, prefetch_batches, ...])

Return an iterator over batches of data.

iter_rows(*[, prefetch_blocks])

Return an iterator over the rows in this dataset.

iter_tf_batches(*[, prefetch_batches, ...])

Return an iterator over batches of data represented as TensorFlow tensors.

iter_torch_batches(*[, prefetch_batches, ...])

Return an iterator over batches of data represented as Torch tensors.

iterator()

Return a DataIterator over this dataset.

lazy()

Enable lazy evaluation.

limit(limit)

Truncate the dataset to the first limit rows.

map(fn, *[, compute, fn_constructor_args, ...])

Apply the given function to each row of this dataset.

map_batches(fn, *[, batch_size, compute, ...])

Apply the given function to batches of data.

materialize()

Execute and materialize this dataset into object store memory.

max([on, ignore_nulls])

Return the maximum of one or more columns.

mean([on, ignore_nulls])

Compute the mean of one or more columns.

min([on, ignore_nulls])

Return the minimum of one or more columns.

num_blocks()

Return the number of blocks of this dataset.

random_sample(fraction, *[, seed])

Returns a new Dataset containing a random fraction of the rows.

random_shuffle(*[, seed, num_blocks])

Randomly shuffle the rows of this Dataset.

randomize_block_order(*[, seed])

Randomly shuffle the blocks of this Dataset.

repartition(num_blocks, *[, shuffle])

Repartition the Dataset into exactly this number of blocks.

repeat([times])

Convert this into a DatasetPipeline by looping over this dataset.

schema([fetch_if_missing])

Return the schema of the dataset.

select_columns(cols, *[, compute])

Select one or more columns from the dataset.

serialize_lineage()

Serialize this dataset's lineage, not the actual data or the existing data futures, to bytes that can be stored and later deserialized, possibly on a different cluster.

show([limit])

Print up to the given number of rows from the Dataset.

size_bytes()

Return the in-memory size of the dataset.

sort([key, descending])

Sort the dataset by the specified key column or key function.

split(n, *[, equal, locality_hints])

Materialize and split the dataset into n disjoint pieces.

split_at_indices(indices)

Materialize and split the dataset at the given indices (like np.split).

split_proportionately(proportions)

Materialize and split the dataset using proportions.

stats()

Returns a string containing execution timing information.

std([on, ddof, ignore_nulls])

Compute the standard deviation of one or more columns.

streaming_split(n, *[, equal, locality_hints])

Returns n DataIterators that can be used to read disjoint subsets of the dataset in parallel.

sum([on, ignore_nulls])

Compute the sum of one or more columns.

take([limit])

Return up to limit rows from the Dataset.

take_all([limit])

Return all of the rows in this Dataset.

take_batch([batch_size, batch_format])

Return up to batch_size rows from the Dataset in a batch.

to_arrow_refs()

Convert this Dataset into a distributed set of PyArrow tables.

to_dask([meta, verify_meta])

Convert this Dataset into a Dask DataFrame.

to_mars()

Convert this Dataset into a Mars DataFrame.

to_modin()

Convert this Dataset into a Modin DataFrame.

to_numpy_refs(*[, column])

Converts this Dataset into a distributed set of NumPy ndarrays or dictionary of NumPy ndarrays.

to_pandas([limit])

Convert this Dataset to a single pandas DataFrame.

to_pandas_refs()

Converts this Dataset into a distributed set of Pandas dataframes.

to_random_access_dataset(key[, num_workers])

Convert this dataset into a distributed RandomAccessDataset (EXPERIMENTAL).

to_spark(spark)

Convert this Dataset into a Spark DataFrame.

to_tf(feature_columns, label_columns, *[, ...])

Return a TensorFlow Dataset over this Dataset.

to_torch(*[, label_column, feature_columns, ...])

Return a Torch IterableDataset over this Dataset.

train_test_split(test_size, *[, shuffle, seed])

Materialize and split the dataset into train and test subsets.

union(*other)

Materialize and concatenate Datasets across rows.

unique(column)

List the unique elements in a given column.

window(*[, blocks_per_window, bytes_per_window])

Convert this into a DatasetPipeline by windowing over data blocks.

write_csv(path, *[, filesystem, ...])

Writes the Dataset to CSV files.

write_datasource(datasource, *[, ...])

Writes the dataset to a custom Datasource.

write_images(path, column[, file_format, ...])

Writes the Dataset to images.

write_json(path, *[, filesystem, ...])

Writes the Dataset to JSON and JSONL files.

write_mongo(uri, database, collection[, ...])

Writes the Dataset to a MongoDB database.

write_numpy(path, *, column[, filesystem, ...])

Writes a column of the Dataset to .npy files.

write_parquet(path, *[, filesystem, ...])

Writes the Dataset to parquet files under the provided path.

write_sql(sql, connection_factory[, ...])

Write to a database that provides a Python DB API2-compliant connector.

write_tfrecords(path, *[, tf_schema, ...])

Write the Dataset to TFRecord files.

write_webdataset(path, *[, filesystem, ...])

Writes the dataset to WebDataset files.

zip(other)

Materialize and zip the columns of this dataset with the columns of another.

Attributes

context

Return the DataContext used to create this Dataset.