Dataset API#
Dataset#
- class ray.data.Dataset(plan: ExecutionPlan, logical_plan: LogicalPlan)[source]#
A Dataset is a distributed data collection for data loading and processing.
Datasets are distributed pipelines that produce
ObjectRef[Block]
outputs, where each block holds data in Arrow format, representing a shard of the overall data collection. The block also determines the unit of parallelism. For more details, see Ray Data Internals.Datasets can be created in multiple ways: from synthetic data via
range_*()
APIs, from existing memory data viafrom_*()
APIs (this creates a subclass of Dataset calledMaterializedDataset
), or from external storage systems such as local disk, S3, HDFS etc. via theread_*()
APIs. The (potentially processed) Dataset can be saved back to external storage systems via thewrite_*()
APIs.Examples
import ray # Create dataset from synthetic data. ds = ray.data.range(1000) # Create dataset from in-memory data. ds = ray.data.from_items( [{"col1": i, "col2": i * 2} for i in range(1000)] ) # Create dataset from external storage system. ds = ray.data.read_parquet("s3://bucket/path") # Save dataset back to external storage system. ds.write_csv("s3://bucket/output")
Dataset has two kinds of operations: transformation, which takes in Dataset and outputs a new Dataset (e.g.
map_batches()
); and consumption, which produces values (not a data stream) as output (e.g.iter_batches()
).Dataset transformations are lazy, with execution of the transformations being triggered by downstream consumption.
Dataset supports parallel processing at scale: transformations such as
map_batches()
, aggregations such asmin()
/max()
/mean()
, grouping viagroupby()
, shuffling operations such assort()
,random_shuffle()
, andrepartition()
.Examples
>>> import ray >>> ds = ray.data.range(1000) >>> # Transform batches (Dict[str, np.ndarray]) with map_batches(). >>> ds.map_batches(lambda batch: {"id": batch["id"] * 2}) MapBatches(<lambda>) +- Dataset(num_rows=1000, schema={id: int64}) >>> # Compute the maximum. >>> ds.max("id") 999 >>> # Shuffle this dataset randomly. >>> ds.random_shuffle() RandomShuffle +- Dataset(num_rows=1000, schema={id: int64}) >>> # Sort it back in order. >>> ds.sort("id") Sort +- Dataset(num_rows=1000, schema={id: int64})
Both unexecuted and materialized Datasets can be passed between Ray tasks and actors without incurring a copy. Dataset supports conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.
Basic Transformations#
Add the given column to the dataset. |
|
Drop one or more columns from the dataset. |
|
Filter out rows that don't satisfy the given predicate. |
|
Apply the given function to each row and then flatten results. |
|
Truncate the dataset to the first |
|
Apply the given function to each row of this dataset. |
|
Apply the given function to batches of data. |
|
Returns a new |
|
Rename columns in the dataset. |
|
Select one or more columns from the dataset. |
Consuming Data#
Return an iterable over batches of data. |
|
Return an iterable over the rows in this dataset. |
|
Return an iterable over batches of data represented as Torch tensors. |
|
Return a |
|
Print up to the given number of rows from the |
|
Return up to |
|
Return all of the rows in this |
|
Return up to |
Execution#
Execute and materialize this dataset into object store memory. |
Grouped and Global aggregations#
Aggregate values using one or more functions. |
|
Group rows of a |
|
Return the maximum of one or more columns. |
|
Compute the mean of one or more columns. |
|
Return the minimum of one or more columns. |
|
Compute the standard deviation of one or more columns. |
|
Compute the sum of one or more columns. |
|
List the unique elements in a given column. |
I/O and Conversion#
Convert this |
|
Convert this |
|
Convert this |
|
Convert this |
|
Convert this |
|
Return a TensorFlow Dataset over this |
|
Writes the |
|
Writes the |
|
Writes the |
|
Writes the |
|
Writes a column of the |
|
Writes the |
|
Write the |
|
Writes the dataset to WebDataset files. |
Inspecting Metadata#
Returns the columns of this Dataset. |
|
Count the number of rows in the dataset. |
|
Return the list of input files for the dataset. |
|
Return the number of blocks of this |
|
Return the schema of the dataset. |
|
Return the in-memory size of the dataset. |
|
Returns a string containing execution timing information. |
Sorting, Shuffling and Repartitioning#
Randomly shuffle the rows of this |
|
Sort the dataset by the specified key column or key function. |
Splitting and Merging datasets#
Materialize and split the dataset into |
|
Materialize and split the dataset at the given indices (like |
|
Materialize and split the dataset using proportions. |
|
Returns |
|
Materialize and split the dataset into train and test subsets. |
|
Concatenate |
|
Zip the columns of this dataset with the columns of another. |
Schema#
- class ray.data.Schema(base_schema: pyarrow.lib.Schema | PandasBlockSchema, *, data_context: DataContext | None = None)[source]#
Dataset schema.
- base_schema#
The underlying Arrow or Pandas schema.
PublicAPI (beta): This API is in beta and may change before becoming stable.
Developer API#
Converts this |
|
Converts this |
|
Convert this |
|
Get an iterator over |
|
alias of |
|
Execution stats for this block. |
|
Metadata about the block. |
|
Provides accessor methods for a specific block. |
Deprecated API#
Return an iterable over batches of data represented as TensorFlow tensors. |
|
Return a Torch IterableDataset over this |