Dataset API

class ray.data.Dataset(plan: ray.data._internal.plan.ExecutionPlan, epoch: int, lazy: bool, *, defer_execution: bool = False)[source]

A Dataset is a distributed data collection for data loading and processing.

Datasets are implemented as a list of ObjectRef[Block], where each block holds an ordered collection of items, representing a shard of the overall data collection. The block can be either a pyarrow.Table, or Python list. The block also determines the unit of parallelism.

Datasets can be created in multiple ways: from synthetic data via range_*() APIs, from existing memory data via from_*() APIs, or from external storage systems such as local disk, S3, HDFS etc. via the read_*() APIs. The (potentially processed) Dataset can be saved back to external storage systems via the write_*() APIs.

Examples

>>> import ray
>>> # Create dataset from synthetic data.
>>> ds = ray.data.range(1000)
>>> # Create dataset from in-memory data.
>>> ds = ray.data.from_items(
...     [{"col1": i, "col2": i * 2} for i in range(1000)])
>>> # Create dataset from external storage system.
>>> ds = ray.data.read_parquet("s3://bucket/path") 
>>> # Save dataset back to external storage system.
>>> ds.write_csv("s3//bucket/output") 

Datasets has two kinds of operations: tranformation, which takes in Datasets and outputs a new Dataset (e.g. map_batches()); and consumption, which produces values (not Dataset) as output (e.g. iter_batches()).

Datasets supports parallel processing at scale: transformations such as map_batches(), aggregations such as min()/max()/mean(), grouping via groupby(), shuffling operations such as sort(), random_shuffle(), and repartition().

Examples

>>> import ray
>>> ds = ray.data.range(1000)
>>> # Transform in parallel with map_batches().
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
Dataset(num_blocks=..., num_rows=1000, schema=<class 'int'>)
>>> # Compute max.
>>> ds.max()
999
>>> # Group the data.
>>> ds.groupby(lambda x: x % 3).count()
Dataset(num_blocks=..., num_rows=3, schema=<class 'tuple'>)
>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()
Dataset(num_blocks=..., num_rows=1000, schema=<class 'int'>)
>>> # Sort it back in order.
>>> ds.sort()
Dataset(num_blocks=..., num_rows=1000, schema=<class 'int'>)

Since Datasets are just lists of Ray object refs, they can be passed between Ray tasks and actors without incurring a copy. Datasets support conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.

PublicAPI: This API is stable across Ray releases.

Basic Transformations

ray.data.Dataset.map

Apply the given function to each record of this dataset.

ray.data.Dataset.map_batches

Apply the given function to batches of data.

ray.data.Dataset.flat_map

Apply the given function to each record and then flatten results.

ray.data.Dataset.filter

Filter out records that do not satisfy the given predicate.

ray.data.Dataset.add_column

Add the given column to the dataset.

ray.data.Dataset.drop_columns

Drop one or more columns from the dataset.

ray.data.Dataset.random_sample

Randomly samples a fraction of the elements of this dataset.

ray.data.Dataset.limit

Truncate the dataset to the first limit records.

Sorting, Shuffling, Repartitioning

ray.data.Dataset.sort

Sort the dataset by the specified key column or key function.

ray.data.Dataset.random_shuffle

Randomly shuffle the elements of this dataset.

ray.data.Dataset.randomize_block_order

Randomly shuffle the blocks of this dataset.

ray.data.Dataset.repartition

Repartition the dataset into exactly this number of blocks.

Splitting and Merging Datasets

ray.data.Dataset.split

Split the dataset into n disjoint pieces.

ray.data.Dataset.split_at_indices

Split the dataset at the given indices (like np.split).

ray.data.Dataset.split_proportionately

Split the dataset using proportions.

ray.data.Dataset.train_test_split

Split the dataset into train and test subsets.

ray.data.Dataset.union

Combine this dataset with others of the same type.

ray.data.Dataset.zip

Zip this dataset with the elements of another.

Grouped and Global Aggregations

ray.data.Dataset.groupby

Group the dataset by the key function or column name.

ray.data.Dataset.aggregate

Aggregate the entire dataset as one group.

ray.data.Dataset.sum

Compute sum over entire dataset.

ray.data.Dataset.min

Compute minimum over entire dataset.

ray.data.Dataset.max

Compute maximum over entire dataset.

ray.data.Dataset.mean

Compute mean over entire dataset.

ray.data.Dataset.std

Compute standard deviation over entire dataset.

Converting to Pipelines

ray.data.Dataset.repeat

Convert this into a DatasetPipeline by looping over this dataset.

ray.data.Dataset.window

Convert this into a DatasetPipeline by windowing over data blocks.

Consuming Datasets

ray.data.Dataset.show

Print up to the given number of records from the dataset.

ray.data.Dataset.take

Return up to limit records from the dataset.

ray.data.Dataset.take_all

Return all of the records in the dataset.

ray.data.Dataset.iter_rows

Return a local row iterator over the dataset.

ray.data.Dataset.iter_batches

Return a local batched iterator over the dataset.

ray.data.Dataset.iter_torch_batches

Return a local batched iterator of Torch Tensors over the dataset.

ray.data.Dataset.iter_tf_batches

Return a local batched iterator of TensorFlow Tensors over the dataset.

I/O and Conversion

ray.data.Dataset.write_parquet

Write the dataset to parquet.

ray.data.Dataset.write_json

Write the dataset to json.

ray.data.Dataset.write_csv

Write the dataset to csv.

ray.data.Dataset.write_numpy

Write a tensor column of the dataset to npy files.

ray.data.Dataset.write_datasource

Write the dataset to a custom datasource.

ray.data.Dataset.to_torch

Return a Torch IterableDataset over this dataset.

ray.data.Dataset.to_tf

Return a TF Dataset over this dataset.

ray.data.Dataset.to_dask

Convert this dataset into a Dask DataFrame.

ray.data.Dataset.to_mars

Convert this dataset into a MARS dataframe.

ray.data.Dataset.to_modin

Convert this dataset into a Modin dataframe.

ray.data.Dataset.to_spark

Convert this dataset into a Spark dataframe.

ray.data.Dataset.to_pandas

Convert this dataset into a single Pandas DataFrame.

ray.data.Dataset.to_pandas_refs

Convert this dataset into a distributed set of Pandas dataframes.

ray.data.Dataset.to_numpy_refs

Convert this dataset into a distributed set of NumPy ndarrays.

ray.data.Dataset.to_arrow_refs

Convert this dataset into a distributed set of Arrow tables.

ray.data.Dataset.to_random_access_dataset

Convert this Dataset into a distributed RandomAccessDataset (EXPERIMENTAL).

Inspecting Metadata

ray.data.Dataset.count

Count the number of records in the dataset.

ray.data.Dataset.schema

Return the schema of the dataset.

ray.data.Dataset.default_batch_format

Return this dataset's default batch format.

ray.data.Dataset.num_blocks

Return the number of blocks of this dataset.

ray.data.Dataset.size_bytes

Return the in-memory size of the dataset.

ray.data.Dataset.input_files

Return the list of input files for the dataset.

ray.data.Dataset.stats

Returns a string containing execution timing information.

ray.data.Dataset.get_internal_block_refs

Get a list of references to the underlying blocks of this dataset.

Execution

ray.data.Dataset.fully_executed

Force full evaluation of the blocks of this dataset.

ray.data.Dataset.is_fully_executed

Returns whether this Dataset has been fully executed.

ray.data.Dataset.lazy

Enable lazy evaluation.

Serialization

ray.data.Dataset.has_serializable_lineage

Whether this dataset's lineage is able to be serialized for storage and later deserialized, possibly on a different cluster.

ray.data.Dataset.serialize_lineage

Serialize this dataset's lineage, not the actual data or the existing data futures, to bytes that can be stored and later deserialized, possibly on a different cluster.

ray.data.Dataset.deserialize_lineage

Deserialize the provided lineage-serialized Dataset.

Basic Transformations

Dataset.map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], _CallableClassProtocol[T, U]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, **ray_remote_args) Dataset[U][source]

Apply the given function to each record of this dataset.

This is a blocking operation. Note that mapping individual records can be quite slow. Consider using map_batches() for performance.

Examples

>>> import ray
>>> # Transform python objects.
>>> ds = ray.data.range(1000)
>>> ds.map(lambda x: x * 2)
Dataset(num_blocks=..., num_rows=1000, schema=<class 'int'>)
>>> # Transform Arrow records.
>>> ds = ray.data.from_items(
...     [{"value": i} for i in range(1000)])
>>> ds.map(lambda record: {"v2": record["value"] * 2})
Dataset(num_blocks=..., num_rows=1000, schema={v2: int64})
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> init_model = ... 
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_model()
...    def __call__(self, batch):
...        return self.model(batch)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> from ray.data._internal.compute import ActorPoolStrategy
>>> ds.map(CachedModel, 
...        compute=ActorPoolStrategy(2, 8),
...        num_gpus=1)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling actor pool. If wanting to configure the min or max size of the autoscaling actor pool, you can provide an ActorPoolStrategy(min, max) instance. If using callable classes for fn, the actor compute strategy must be used.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Dataset.map_batches(fn: Union[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], _CallableClassProtocol], *, batch_size: Optional[int] = 4096, compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, batch_format: typing_extensions.Literal[default, pandas, pyarrow, numpy] = 'default', fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, **ray_remote_args) Dataset[Any][source]

Apply the given function to batches of data.

Batches are represented as dataframes, ndarrays, or lists. The default batch type is determined by your dataset’s schema. To determine the default batch type, call default_batch_format(). Alternatively, set the batch type with batch_format.

To learn more about writing functions for map_batches(), read writing user-defined functions.

Tip

If you’re using Ray AIR for training or batch inference, consider using BatchMapper. It’s more performant and easier to use.

Examples

>>> import pandas as pd
>>> import ray
>>> df = pd.DataFrame({
...     "name": ["Luna", "Rory", "Scout"],
...     "age": [4, 14, 9]
... })
>>> ds = ray.data.from_pandas(df)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64})

Call default_batch_format() to determine the default batch type.

>>> ds.default_batch_format()
<class 'pandas.core.frame.DataFrame'>

Tip

Datasets created from tabular data like Arrow tables and Parquet files yield pd.DataFrame batches.

Once you know the batch type, define a function that transforms batches of data. ds.map_batches applies the function in parallel.

>>> def map_fn(batch: pd.DataFrame) -> pd.DataFrame:
...     batch["age_in_dog_years"] = 7 * batch["age"]
...     return batch
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64, age_in_dog_years: int64})

Your fn can return a different type than the input type. To learn more about supported output types, read user-defined function output types.

>>> from typing import List
>>> def map_fn(batch: pd.DataFrame) -> List[int]:
...     return list(batch["age_in_dog_years"])
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema=<class 'int'>)

Actors can improve the performance of some workloads. For example, you can use actors to load a model once per worker instead of once per inference.

To transform batches with actors, pass a callable type to fn and specify an ActorPoolStrategy>.

In the example below, CachedModel is called on an autoscaling pool of two to eight actors, each allocated one GPU by Ray.

>>> from ray.data import ActorPoolStrategy
>>> init_large_model = ... 
>>> class CachedModel:
...    def __init__(self):
...        self.model = init_large_model()
...    def __call__(self, item):
...        return self.model(item)
>>> ds.map_batches( 
...     CachedModel, 
...     batch_size=256, 
...     compute=ActorPoolStrategy(2, 8), 
...     num_gpus=1,
... ) 
Parameters
  • fn – The function to apply to each record batch, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • batch_size – The number of rows in each batch, or None to use entire blocks as batches. Blocks can contain different number of rows, and the last batch can include fewer than batch_size rows. Defaults to 4096.

  • compute – The compute strategy, either "tasks" (default) to use Ray tasks, or "actors" to use an autoscaling actor pool. If you want to configure the size of the autoscaling actor pool, provide an ActorPoolStrategy instance. If you’re passing callable type to fn, you must pass an ActorPoolStrategy.

  • batch_format – Specify "default" to use the default block format (promotes tables to Pandas and tensors to NumPy), "pandas" to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or "numpy" to select numpy.ndarray for tensor datasets and Dict[str, numpy.ndarray] for tabular datasets. Default is “default”.

  • fn_args – Positional arguments to pass to fn after the first argument. These arguments are top-level arguments to the underlying Ray task.

  • fn_kwargs – Keyword arguments to pass to fn. These arguments are top-level arguments to the underlying Ray task.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

See also

iter_batches()

Call this function to iterate over batches of data.

default_batch_format()

Call this function to determine the default batch type.

Dataset.flat_map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], _CallableClassProtocol[T, U]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, **ray_remote_args) Dataset[U][source]

Apply the given function to each record and then flatten results.

This is a blocking operation. Consider using .map_batches() for better performance (the batch size can be altered in map_batches).

Examples

>>> import ray
>>> ds = ray.data.range(1000)
>>> ds.flat_map(lambda x: [x, x ** 2, x ** 3])
Dataset(num_blocks=..., num_rows=3000, schema=<class 'int'>)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The function to apply to each record, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling actor pool. If wanting to configure the min or max size of the autoscaling actor pool, you can provide an ActorPoolStrategy(min, max) instance. If using callable classes for fn, the actor compute strategy must be used.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Dataset.filter(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], _CallableClassProtocol[T, U]], *, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, **ray_remote_args) Dataset[T][source]

Filter out records that do not satisfy the given predicate.

This is a blocking operation. Consider using .map_batches() for better performance (you can implement filter by dropping records).

Examples

>>> import ray
>>> ds = ray.data.range(100)
>>> ds.filter(lambda x: x % 2 == 0)
Dataset(num_blocks=..., num_rows=50, schema=<class 'int'>)

Time complexity: O(dataset size / parallelism)

Parameters
  • fn – The predicate to apply to each record, or a class type that can be instantiated to create such a callable. Callable classes are only supported for the actor compute strategy.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or “actors” to use an autoscaling actor pool. If wanting to configure the min or max size of the autoscaling actor pool, you can provide an ActorPoolStrategy(min, max) instance. If using callable classes for fn, the actor compute strategy must be used.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Dataset.add_column(col: str, fn: Callable[[pandas.DataFrame], pandas.Series], *, compute: Optional[str] = None, **ray_remote_args) Dataset[T][source]

Add the given column to the dataset.

This is only supported for datasets convertible to pandas format. A function generating the new column values given the batch in pandas format must be specified.

Examples

>>> import ray
>>> ds = ray.data.range_table(100)
>>> # Add a new column equal to value * 2.
>>> ds = ds.add_column(
...     "new_col", lambda df: df["value"] * 2)
>>> # Overwrite the existing "value" with zeros.
>>> ds = ds.add_column("value", lambda df: 0)

Time complexity: O(dataset size / parallelism)

Parameters
  • col – Name of the column to add. If the name already exists, the column will be overwritten.

  • fn – Map function generating the column values given a batch of records in pandas format.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Dataset.drop_columns(cols: List[str], *, compute: Optional[str] = None, **ray_remote_args) ray.data.dataset.Dataset[ray.data.block.U][source]

Drop one or more columns from the dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ds = ray.data.range_table(100)
>>> # Add a new column equal to value * 2.
>>> ds = ds.add_column(
...     "new_col", lambda df: df["value"] * 2)
>>> # Drop the existing "value" column.
>>> ds = ds.drop_columns(["value"])

Time complexity: O(dataset size / parallelism)

Parameters
  • cols – Names of the columns to drop. If any name does not exist, an exception will be raised.

  • compute – The compute strategy, either “tasks” (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.

  • ray_remote_args – Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks).

Dataset.random_sample(fraction: float, *, seed: Optional[int] = None) ray.data.dataset.Dataset[ray.data.block.T][source]

Randomly samples a fraction of the elements of this dataset.

Note that the exact number of elements returned is not guaranteed, and that the number of elements being returned is roughly fraction * total_rows.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.random_sample(0.1) 
>>> ds.random_sample(0.2, seed=12345) 
Parameters
  • fraction – The fraction of elements to sample.

  • seed – Seeds the python random pRNG generator.

Returns

Returns a Dataset containing the sampled elements.

Dataset.limit(limit: int) ray.data.dataset.Dataset[ray.data.block.T][source]

Truncate the dataset to the first limit records.

Contrary to :meth`.take`, this will not move any data to the caller’s machine. Instead, it will return a new Dataset pointing to the truncated distributed data.

Examples

>>> import ray
>>> ds = ray.data.range(1000)
>>> ds.limit(100).map(lambda x: x * 2).take()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

Time complexity: O(limit specified)

Parameters

limit – The size of the dataset to truncate to.

Returns

The truncated dataset.

Sorting, Shuffling, Repartitioning

Dataset.sort(key: Union[None, str, Callable[[ray.data.block.T], Any]] = None, descending: bool = False) ray.data.dataset.Dataset[ray.data.block.T][source]

Sort the dataset by the specified key column or key function.

This is a blocking operation.

Examples

>>> import ray
>>> # Sort using the entire record as the key.
>>> ds = ray.data.range(100)
>>> ds.sort()
Dataset(num_blocks=..., num_rows=100, schema=<class 'int'>)
>>> # Sort by a single column in descending order.
>>> ds = ray.data.from_items(
...     [{"value": i} for i in range(1000)])
>>> ds.sort("value", descending=True)
Dataset(num_blocks=..., num_rows=1000, schema={value: int64})
>>> # Sort by a key function.
>>> ds.sort(lambda record: record["value"]) 

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters
  • key

    • For Arrow tables, key must be a single column name.

    • For datasets of Python objects, key can be either a lambda function that returns a comparison key to sort by, or None to sort by the original value.

  • descending – Whether to sort in descending order.

Returns

A new, sorted dataset.

Dataset.random_shuffle(*, seed: Optional[int] = None, num_blocks: Optional[int] = None) ray.data.dataset.Dataset[ray.data.block.T][source]

Randomly shuffle the elements of this dataset.

This is a blocking operation similar to repartition().

Examples

>>> import ray
>>> ds = ray.data.range(100)
>>> # Shuffle this dataset randomly.
>>> ds.random_shuffle()
Dataset(num_blocks=..., num_rows=100, schema=<class 'int'>)
>>> # Shuffle this dataset with a fixed random seed.
>>> ds.random_shuffle(seed=12345)
Dataset(num_blocks=..., num_rows=100, schema=<class 'int'>)

Time complexity: O(dataset size / parallelism)

Parameters
  • seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

  • num_blocks – The number of output blocks after the shuffle, or None to retain the number of blocks.

Returns

The shuffled dataset.

Dataset.randomize_block_order(*, seed: Optional[int] = None) ray.data.dataset.Dataset[ray.data.block.T][source]

Randomly shuffle the blocks of this dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> # Randomize the block order.
>>> ds.randomize_block_order() 
>>> # Randomize the block order with a fixed random seed.
>>> ds.randomize_block_order(seed=12345) 
Parameters

seed – Fix the random seed to use, otherwise one will be chosen based on system randomness.

Returns

The block-shuffled dataset.

Dataset.repartition(num_blocks: int, *, shuffle: bool = False) ray.data.dataset.Dataset[ray.data.block.T][source]

Repartition the dataset into exactly this number of blocks.

This is a blocking operation. After repartitioning, all blocks in the returned dataset will have approximately the same number of rows.

Examples

>>> import ray
>>> ds = ray.data.range(100)
>>> # Set the number of output partitions to write to disk.
>>> ds.repartition(10).write_parquet("/tmp/test")

Time complexity: O(dataset size / parallelism)

Parameters
  • num_blocks – The number of blocks.

  • shuffle – Whether to perform a distributed shuffle during the repartition. When shuffle is enabled, each output block contains a subset of data rows from each input block, which requires all-to-all data movement. When shuffle is disabled, output blocks are created from adjacent input blocks, minimizing data movement.

Returns

The repartitioned dataset.

Splitting and Merging Datasets

Dataset.split(n: int, *, equal: bool = False, locality_hints: Optional[List[Any]] = None) List[ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset into n disjoint pieces.

This returns a list of sub-datasets that can be passed to Ray tasks and actors and used to read the dataset records in parallel.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> workers = ... 
>>> # Split up a dataset to process over `n` worker actors.
>>> shards = ds.split(len(workers), locality_hints=workers) 
>>> for shard, worker in zip(shards, workers): 
...     worker.consume.remote(shard) 

Time complexity: O(1)

See also: Dataset.split_at_indices, Dataset.split_proportionately

Parameters
  • n – Number of child datasets to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – [Experimental] A list of Ray actor handles of size n. The system will try to co-locate the blocks of the i-th dataset with the i-th actor to maximize data locality.

Returns

A list of n disjoint dataset splits.

Dataset.split_at_indices(indices: List[int]) List[ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset at the given indices (like np.split).

Examples

>>> import ray
>>> ds = ray.data.range(10)
>>> d1, d2, d3 = ds.split_at_indices([2, 5])
>>> d1.take()
[0, 1]
>>> d2.take()
[2, 3, 4]
>>> d3.take()
[5, 6, 7, 8, 9]

Time complexity: O(num splits)

See also: Dataset.split, Dataset.split_proportionately

Parameters

indices – List of sorted integers which indicate where the dataset will be split. If an index exceeds the length of the dataset, an empty dataset will be returned.

Returns

The dataset splits.

Dataset.split_proportionately(proportions: List[float]) List[ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset using proportions.

A common use case for this would be splitting the dataset into train and test sets (equivalent to eg. scikit-learn’s train_test_split). See also Dataset.train_test_split for a higher level abstraction.

The indices to split at will be calculated in such a way so that all splits always contains at least one element. If that is not possible, an exception will be raised.

This is equivalent to caulculating the indices manually and calling Dataset.split_at_indices.

Examples

>>> import ray
>>> ds = ray.data.range(10)
>>> d1, d2, d3 = ds.split_proportionately([0.2, 0.5])
>>> d1.take()
[0, 1]
>>> d2.take()
[2, 3, 4, 5, 6]
>>> d3.take()
[7, 8, 9]

Time complexity: O(num splits)

See also: Dataset.split, Dataset.split_at_indices, Dataset.train_test_split

Parameters

proportions – List of proportions to split the dataset according to. Must sum up to less than 1, and each proportion has to be bigger than 0.

Returns

The dataset splits.

Dataset.train_test_split(test_size: Union[int, float], *, shuffle: bool = False, seed: Optional[int] = None) Tuple[ray.data.dataset.Dataset[ray.data.block.T], ray.data.dataset.Dataset[ray.data.block.T]][source]

Split the dataset into train and test subsets.

Examples

>>> import ray
>>> ds = ray.data.range(8)
>>> train, test = ds.train_test_split(test_size=0.25)
>>> train.take()
[0, 1, 2, 3, 4, 5]
>>> test.take()
[6, 7]
Parameters
  • test_size – If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. The train split will always be the compliment of the test split.

  • shuffle – Whether or not to globally shuffle the dataset before splitting. Defaults to False. This may be a very expensive operation with large datasets.

  • seed – Fix the random seed to use for shuffle, otherwise one will be chosen based on system randomness. Ignored if shuffle=False.

Returns

Train and test subsets as two Datasets.

Dataset.union(*other: List[ray.data.dataset.Dataset[ray.data.block.T]]) ray.data.dataset.Dataset[ray.data.block.T][source]

Combine this dataset with others of the same type.

The order of the blocks in the datasets is preserved, as is the relative ordering between the datasets passed in the argument list.

NOTE: Unioned datasets are not lineage-serializable, i.e. they can not be used as a tunable hyperparameter in Ray Tune.

Parameters

other – List of datasets to combine with this one. The datasets must have the same schema as this dataset, otherwise the behavior is undefined.

Returns

A new dataset holding the union of their data.

Dataset.zip(other: Dataset[U]) Dataset[T, U][source]

Zip this dataset with the elements of another.

The datasets must have identical num rows, block types, and block sizes (e.g., one was produced from a .map() of another). For Arrow blocks, the schema will be concatenated, and any duplicate column names disambiguated with _1, _2, etc. suffixes.

NOTE: Zipped datasets are not lineage-serializable, i.e. they can not be used as a tunable hyperparameter in Ray Tune.

Time complexity: O(dataset size / parallelism)

Parameters

other – The dataset to zip with on the right hand side.

Examples

>>> import ray
>>> ds = ray.data.range(5)
>>> ds.zip(ds).take()
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
Returns

A Dataset with (k, v) pairs (or concatenated Arrow schema) where k comes from the first dataset and v comes from the second.

Grouped and Global Aggregations

Dataset.groupby(key: Union[None, str, Callable[[ray.data.block.T], Any]]) GroupedDataset[T][source]

Group the dataset by the key function or column name.

This is a lazy operation.

Examples

>>> import ray
>>> # Group by a key function and aggregate.
>>> ray.data.range(100).groupby(lambda x: x % 3).count()
Dataset(num_blocks=..., num_rows=3, schema=<class 'tuple'>)
>>> # Group by an Arrow table column and aggregate.
>>> ray.data.from_items([
...     {"A": x % 3, "B": x} for x in range(100)]).groupby(
...     "A").count()
Dataset(num_blocks=..., num_rows=3, schema={A: int64, count(): int64})

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters

key – A key function or Arrow column name. If this is None, the grouping is global.

Returns

A lazy GroupedDataset that can be aggregated later.

Dataset.aggregate(*aggs: ray.data.aggregate.AggregateFn) ray.data.block.U[source]

Aggregate the entire dataset as one group.

This is a blocking operation.

Examples

>>> import ray
>>> from ray.data.aggregate import Max, Mean
>>> ray.data.range(100).aggregate(Max())
(99,)
>>> ray.data.range_table(100).aggregate(
...    Max("value"), Mean("value"))
{'max(value)': 99, 'mean(value)': 49.5}

Time complexity: O(dataset size / parallelism)

Parameters

aggs – Aggregations to do.

Returns

If the input dataset is a simple dataset then the output is a tuple of (agg1, agg2, ...) where each tuple element is the corresponding aggregation result. If the input dataset is an Arrow dataset then the output is an ArrowRow where each column is the corresponding aggregation result. If the dataset is empty, return None.

Dataset.sum(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute sum over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).sum()
4950
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).sum(lambda x: x[1])
328350
>>> ray.data.range_table(100).sum("value")
4950
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).sum(["A", "B"])
{'sum(A)': 4950, 'sum(B)': 328350}
Parameters
  • on

    The data subset on which to compute the sum.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar sum of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise sum of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the sum; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The sum result.

For a simple dataset, the output is:

  • on=None: a scalar representing the sum of all rows,

  • on=callable: a scalar representing the sum of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (sum_1, ..., sum_n) representing the sum of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise sum of all columns,

  • on="col": a scalar representing the sum of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise sum of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

Dataset.min(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute minimum over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).min()
0
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).min(lambda x: x[1])
0
>>> ray.data.range_table(100).min("value")
0
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).min(["A", "B"])
{'min(A)': 0, 'min(B)': 0}
Parameters
  • on

    The data subset on which to compute the min.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar min of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise min of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the min; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The min result.

For a simple dataset, the output is:

  • on=None: a scalar representing the min of all rows,

  • on=callable: a scalar representing the min of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (min_1, ..., min_n) representing the min of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise min of all columns,

  • on="col": a scalar representing the min of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise min of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

Dataset.max(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute maximum over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).max()
99
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).max(lambda x: x[1])
9801
>>> ray.data.range_table(100).max("value")
99
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).max(["A", "B"])
{'max(A)': 99, 'max(B)': 9801}
Parameters
  • on

    The data subset on which to compute the max.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar max of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise max of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the max; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The max result.

For a simple dataset, the output is:

  • on=None: a scalar representing the max of all rows,

  • on=callable: a scalar representing the max of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (max_1, ..., max_n) representing the max of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise max of all columns,

  • on="col": a scalar representing the max of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise max of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

Dataset.mean(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ignore_nulls: bool = True) ray.data.block.U[source]

Compute mean over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).mean()
49.5
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).mean(lambda x: x[1])
3283.5
>>> ray.data.range_table(100).mean("value")
49.5
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).mean(["A", "B"])
{'mean(A)': 49.5, 'mean(B)': 3283.5}
Parameters
  • on

    The data subset on which to compute the mean.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar mean of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise mean of all columns.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the mean; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The mean result.

For a simple dataset, the output is:

  • on=None: a scalar representing the mean of all rows,

  • on=callable: a scalar representing the mean of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (mean_1, ..., mean_n) representing the mean of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise mean of all columns,

  • on="col": a scalar representing the mean of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise mean of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

Dataset.std(on: Union[None, str, Callable[[ray.data.block.T], Any], List[Union[None, str, Callable[[ray.data.block.T], Any]]]] = None, ddof: int = 1, ignore_nulls: bool = True) ray.data.block.U[source]

Compute standard deviation over entire dataset.

This is a blocking operation.

Examples

>>> import ray
>>> ray.data.range(100).std()
29.011491975882016
>>> ray.data.from_items([
...     (i, i**2)
...     for i in range(100)]).std(lambda x: x[1])
2968.1748039269296
>>> ray.data.range_table(100).std("value", ddof=0)
28.86607004772212
>>> ray.data.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)]).std(["A", "B"])
{'std(A)': 29.011491975882016, 'std(B)': 2968.1748039269296}

NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

Parameters
  • on

    The data subset on which to compute the std.

    • For a simple dataset: it can be a callable or a list thereof, and the default is to return a scalar std of all rows.

    • For an Arrow dataset: it can be a column name or a list thereof, and the default is to return an ArrowRow containing the column-wise std of all columns.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the std; if False, if a null value is encountered, the output will be None. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns

The standard deviation result.

For a simple dataset, the output is:

  • on=None: a scalar representing the std of all rows,

  • on=callable: a scalar representing the std of the outputs of the callable called on each row,

  • on=[callable_1, ..., calalble_n]: a tuple of (std_1, ..., std_n) representing the std of the outputs of the corresponding callables called on each row.

For an Arrow dataset, the output is:

  • on=None: an ArrowRow containing the column-wise std of all columns,

  • on="col": a scalar representing the std of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column ArrowRow containing the column-wise std of the provided columns.

If the dataset is empty, all values are null, or any value is null AND ignore_nulls is False, then the output will be None.

Converting to Pipeline

Dataset.repeat(times: Optional[int] = None) DatasetPipeline[T][source]

Convert this into a DatasetPipeline by looping over this dataset.

Transformations prior to the call to repeat() are evaluated once. Transformations done on the returned pipeline are evaluated on each loop of the pipeline over the base dataset.

Note that every repeat of the dataset is considered an “epoch” for the purposes of DatasetPipeline.iter_epochs().

Examples

>>> import ray
>>> # Infinite pipeline of numbers [0, 5)
>>> ray.data.range(5).repeat().take()
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, ...]
>>> # Can apply transformations to the pipeline.
>>> ray.data.range(5).repeat().map(lambda x: -x).take()
[0, -1, -2, -3, -4, 0, -1, -2, -3, -4, ...]
>>> # Can shuffle each epoch (dataset) in the pipeline.
>>> ray.data.range(5).repeat().random_shuffle().take() 
[2, 3, 0, 4, 1, 4, 0, 2, 1, 3, ...]
Parameters

times – The number of times to loop over this dataset, or None to repeat indefinitely.

Dataset.window(*, blocks_per_window: Optional[int] = None, bytes_per_window: Optional[int] = None) DatasetPipeline[T][source]

Convert this into a DatasetPipeline by windowing over data blocks.

Transformations prior to the call to window() are evaluated in bulk on the entire dataset. Transformations done on the returned pipeline are evaluated incrementally per window of blocks as data is read from the output of the pipeline.

Windowing execution allows for output to be read sooner without waiting for all transformations to fully execute, and can also improve efficiency if transforms use different resources (e.g., GPUs).

Without windowing:

[preprocessing......]
                      [inference.......]
                                         [write........]
Time ----------------------------------------------------------->

With windowing:

[prep1] [prep2] [prep3]
        [infer1] [infer2] [infer3]
                 [write1] [write2] [write3]
Time ----------------------------------------------------------->

Examples

>>> import ray
>>> # Create an inference pipeline.
>>> ds = ray.data.read_binary_files(dir) 
>>> infer = ... 
>>> pipe = ds.window(blocks_per_window=10).map(infer) 
DatasetPipeline(num_windows=40, num_stages=2)
>>> # The higher the stage parallelism, the shorter the pipeline.
>>> pipe = ds.window(blocks_per_window=20).map(infer) 
DatasetPipeline(num_windows=20, num_stages=2)
>>> # Outputs can be incrementally read from the pipeline.
>>> for item in pipe.iter_rows(): 
...    print(item) 
Parameters
  • blocks_per_window – The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.

  • bytes_per_window – Specify the window size in bytes instead of blocks. This will be treated as an upper bound for the window size, but each window will still include at least one block. This is mutually exclusive with blocks_per_window.

Consuming Datasets

Dataset.show(limit: int = 20) None[source]

Print up to the given number of records from the dataset.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to print.

Dataset.take(limit: int = 20) List[ray.data.block.T][source]

Return up to limit records from the dataset.

This will move up to limit records to the caller’s machine; if limit is very large, this can result in an OutOfMemory crash on the caller.

Time complexity: O(limit specified)

Parameters

limit – The max number of records to return.

Returns

A list of up to limit records from the dataset.

Dataset.take_all(limit: int = 100000) List[ray.data.block.T][source]

Return all of the records in the dataset.

This will move the entire dataset to the caller’s machine; if the dataset is very large, this can result in an OutOfMemory crash on the caller.

Time complexity: O(dataset size)

Parameters

limit – Raise an error if the size exceeds the specified limit.

Returns

A list of all the records in the dataset.

Dataset.iter_rows(*, prefetch_blocks: int = 0) Iterator[Union[ray.data.block.T, ray.data.row.TableRow]][source]

Return a local row iterator over the dataset.

If the dataset is a tabular dataset (Arrow/Pandas blocks), dict-like mappings TableRow are yielded for each row by the iterator. If the dataset is not tabular, the raw row is yielded.

Examples

>>> import ray
>>> for i in ray.data.range(1000000).iter_rows(): 
...     print(i) 

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the entire dataset.

Dataset.iter_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, batch_format: str = 'default', drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]][source]

Return a local batched iterator over the dataset.

Examples

>>> import ray
>>> for batch in ray.data.range(1000000).iter_batches(): 
...     print(batch) 

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than batch_size rows if drop_last is False. Defaults to 256.

  • batch_format – The format in which to return each batch. Specify “default” to use the default block format (promoting tables to Pandas and tensors to NumPy), “pandas” to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or “numpy” to select numpy.ndarray for tensor datasets and Dict[str, numpy.ndarray] for tabular datasets. Default is “default”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

  • local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained.

  • local_shuffle_seed – The seed to use for the local random shuffle.

Returns

An iterator over record batches.

Dataset.iter_torch_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, dtypes: Optional[Union[torch.dtype, Dict[str, torch.dtype]]] = None, device: Optional[str] = None, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[Union[torch.Tensor, Dict[str, torch.Tensor]]][source]

Return a local batched iterator of Torch Tensors over the dataset.

This iterator will yield single-tensor batches if the underlying dataset consists of a single column; otherwise, it will yield a dictionary of column-tensors. If looking for more flexibility in the tensor conversion (e.g. casting dtypes) or the batch format, try use iter_batches directly, which is a lower-level API.

Examples

>>> import ray
>>> for batch in ray.data.range( 
...     12,
... ).iter_torch_batches(batch_size=4):
...     print(batch.shape) 
torch.Size([4, 1])
torch.Size([4, 1])
torch.Size([4, 1])

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than batch_size rows if drop_last is False. Defaults to 256.

  • dtypes – The Torch dtype(s) for the created tensor(s); if None, the dtype will be inferred from the tensor data.

  • device – The device on which the tensor should be placed; if None, the Torch tensor will be constructed on the CPU.

  • drop_last – Whether to drop the last batch if it’s incomplete.

  • local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to batch_size, and therefore batch_size must also be specified when using local shuffling.

  • local_shuffle_seed – The seed to use for the local random shuffle.

Returns

An iterator over Torch Tensor batches.

Dataset.iter_tf_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, dtypes: Optional[Union[tf.dtypes.DType, Dict[str, tf.dtypes.DType]]] = None, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[Union[tf.Tensor, Dict[str, tf.Tensor]]][source]

Return a local batched iterator of TensorFlow Tensors over the dataset.

This iterator will yield single-tensor batches of the underlying dataset consists of a single column; otherwise, it will yield a dictionary of column-tensors. If looking for more flexibility in the tensor conversion (e.g. casting dtypes) or the batch format, try using to_tf, which has a declarative API for tensor casting and batch formatting, or use iter_batches directly, which is a lower-level API.

Examples

>>> import ray
>>> for batch in ray.data.range( 
...     12,
... ).iter_tf_batches(batch_size=4):
...     print(batch.shape) 
(4, 1)
(4, 1)
(4, 1)

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than batch_size rows if drop_last is False. Defaults to 256.

  • dtypes – The TensorFlow dtype(s) for the created tensor(s); if None, the dtype will be inferred from the tensor data.

  • drop_last – Whether to drop the last batch if it’s incomplete.

  • local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to batch_size, and therefore batch_size must also be specified when using local shuffling.

  • local_shuffle_seed – The seed to use for the local random shuffle.

Returns

An iterator over TensorFlow Tensor batches.

I/O and Conversion

Dataset.write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_parquet_args) None[source]

Write the dataset to parquet.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.parquet, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_parquet("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where Parquet files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_parquet_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. This should be used instead of arrow_parquet_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_parquet_args – Options to pass to pyarrow.parquet.write_table(), which is used to write out each block to a file.

Dataset.write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **pandas_json_args) None[source]

Write the dataset to json.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.json, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_json("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where json files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • pandas_json_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from pandas_json_args. This should be used instead of pandas_json_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • pandas_json_args – These args will be passed to pandas.DataFrame.to_json(), which we use under the hood to write out each Datasets block. These are dict(orient=”records”, lines=True) by default.

Dataset.write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function Dataset.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_csv_args) None[source]

Write the dataset to csv.

This is only supported for datasets convertible to Arrow records. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {uuid}_{block_idx}.csv, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_csv("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where csv files will be written to.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • arrow_csv_args_fn – Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_csv_args. This should be used instead of arrow_csv_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • arrow_csv_args – Other CSV write options to pass to pyarrow.

Dataset.write_numpy(path: str, *, column: str = '__value__', filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, ray_remote_args: Dict[str, Any] = None) None[source]

Write a tensor column of the dataset to npy files.

This is only supported for datasets convertible to Arrow records that contain a TensorArray column. To control the number of files, use .repartition().

Unless a custom block path provider is given, the format of the output files will be {self._uuid}_{block_idx}.npy, where uuid is an unique id for the dataset.

Examples

>>> import ray
>>> ds = ray.data.range(100) 
>>> ds.write_numpy("s3://bucket/path") 

Time complexity: O(dataset size / parallelism)

Parameters
  • path – The path to the destination root directory, where npy files will be written to.

  • column – The name of the table column that contains the tensor to be written. The default is "__value__", the column name that Datasets uses for storing tensors in single-column tables.

  • filesystem – The filesystem implementation to write to.

  • try_create_dir – Try to create all directories in destination path if True. Does nothing if all directories already exist.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream

  • block_path_provider – BlockWritePathProvider implementation to write each dataset block to a custom output path.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

Dataset.write_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.block.T], *, ray_remote_args: Optional[Dict[str, Any]] = None, **write_args) None[source]

Write the dataset to a custom datasource.

Examples

>>> import ray
>>> from ray.data.datasource import Datasource
>>> ds = ray.data.range(100) 
>>> class CustomDatasource(Datasource): 
...     # define custom data source
...     pass 
>>> ds.write_datasource(CustomDatasource(...)) 

Time complexity: O(dataset size / parallelism)

Parameters
  • datasource – The datasource to write to.

  • ray_remote_args – Kwargs passed to ray.remote in the write tasks.

  • write_args – Additional write args to pass to the datasource.

Dataset.to_torch(*, label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[Union[torch.dtype, List[torch.dtype], Dict[str, torch.dtype]]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, unsqueeze_label_tensor: bool = True, unsqueeze_feature_tensors: bool = True) torch.utils.data.IterableDataset[source]

Return a Torch IterableDataset over this dataset.

This is only supported for datasets convertible to Arrow records.

It is recommended to use the returned IterableDataset directly instead of passing it into a torch DataLoader.

Each element in IterableDataset will be a tuple consisting of 2 elements. The first item contains the feature tensor(s), and the second item is the label tensor. Those can take on different forms, depending on the specified arguments.

For the features tensor (N is the batch_size and n, m, k are the number of features per tensor):

  • If feature_columns is a List[str], the features will be a tensor of shape (N, n), with columns corresponding to feature_columns

  • If feature_columns is a List[List[str]], the features will be a list of tensors of shape [(N, m),…,(N, k)], with columns of each tensor corresponding to the elements of feature_columns

  • If feature_columns is a Dict[str, List[str]], the features will be a dict of key-tensor pairs of shape {key1: (N, m),…, keyN: (N, k)}, with columns of each tensor corresponding to the value of feature_columns under the key.

If unsqueeze_label_tensor=True (default), the label tensor will be of shape (N, 1). Otherwise, it will be of shape (N,). If label_column is specified as None, then no column from the Dataset will be treated as the label, and the output label tensor will be None.

Note that you probably want to call .split() on this dataset if there are to be multiple Torch workers consuming the data.

Time complexity: O(1)

Parameters
  • label_column – The name of the column used as the label (second element of the output list). Can be None for prediction, in which case the second element of returned tuple will also be None.

  • feature_columns – The names of the columns to use as the features. Can be a list of lists or a dict of string-list pairs for multi-tensor output. If None, then use all columns except the label column as the features.

  • label_column_dtype – The torch dtype to use for the label column. If None, then automatically infer the dtype.

  • feature_column_dtypes – The dtypes to use for the feature tensors. This should match the format of feature_columns, or be a single dtype, in which case it will be applied to all tensors. If None, then automatically infer the dtype.

  • batch_size – How many samples per batch to yield at a time. Defaults to 1.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • drop_last – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

  • local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to batch_size, and therefore batch_size must also be specified when using local shuffling.

  • local_shuffle_seed – The seed to use for the local random shuffle.

  • unsqueeze_label_tensor – If set to True, the label tensor will be unsqueezed (reshaped to (N, 1)). Otherwise, it will be left as is, that is (N, ). In general, regression loss functions expect an unsqueezed tensor, while classification loss functions expect a squeezed one. Defaults to True.

  • unsqueeze_feature_tensors – If set to True, the features tensors will be unsqueezed (reshaped to (N, 1)) before being concatenated into the final features tensor. Otherwise, they will be left as is, that is (N, ). Defaults to True.

Returns

A torch IterableDataset.

Dataset.to_tf(*, output_signature: Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec], Tuple[Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec]], tf.TypeSpec]], label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, prefetch_blocks: int = 0, batch_size: int = 1, drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) tf.data.Dataset[source]

Return a TF Dataset over this dataset.

The TF Dataset will be created from the generator returned by the iter_batches method. prefetch_blocks and batch_size arguments will be passed to that method.

For the features tensor (N is the batch_size and n1, …, nk are the number of features per tensor):

  • If feature_columns is a List[str], the features will be a tensor of shape (N, n), with columns corresponding to feature_columns

  • If feature_columns is a List[List[str]], the features will be a list of tensors of shape [(N, n1),…,(N, nk)], with columns of each tensor corresponding to the elements of feature_columns

  • If feature_columns is a Dict[str, List[str]], the features will be a dict of key-tensor pairs of shape {key1: (N, n1),…, keyN: (N, nk)}, with columns of each tensor corresponding to the value of feature_columns under the key.

This is only supported for datasets convertible to Arrow records.

Requires all datasets to have the same columns.

It is recommended to call .split() on this dataset if there are to be multiple TensorFlow workers consuming the data.

The elements generated must be compatible with the given output_signature argument (same as in tf.data.Dataset.from_generator).

Time complexity: O(1)

Parameters
  • output_signature – If label_column is specified, a two-element tuple containing a FeatureTypeSpec and tf.TypeSpec object corresponding to (features, label). Otherwise, a single TensorflowFeatureTypeSpec corresponding to features tensor. A TensorflowFeatureTypeSpec is a tf.TypeSpec, List["tf.TypeSpec"], or Dict[str, "tf.TypeSpec"].

  • label_column – The name of the column used as the label (second element of the output tuple). If not specified, output will be just one tensor instead of a tuple.

  • feature_columns – The names of the columns to use as the features. Can be a list of lists or a dict of string-list pairs for multi-tensor output. If None, then use all columns except the label columns as the features.

  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – Record batch size. Defaults to 1.

  • drop_last – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

  • local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to batch_size, and therefore batch_size must also be specified when using local shuffling.

  • local_shuffle_seed – The seed to use for the local random shuffle.

Returns

A tf.data.Dataset.

Dataset.to_dask(meta: Optional[Union[pandas.DataFrame, pandas.Series, Dict[str, Any], Iterable[Any], Tuple[Any]]] = None) dask.DataFrame[source]

Convert this dataset into a Dask DataFrame.

This is only supported for datasets convertible to Arrow records.

Note that this function will set the Dask scheduler to Dask-on-Ray globally, via the config.

Time complexity: O(dataset size / parallelism)

Parameters

meta – An empty pandas DataFrame or Series that matches the dtypes and column names of the Dataset. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. By default, this will be inferred from the underlying Dataset schema, with this argument supplying an optional override.

Returns

A Dask DataFrame created from this dataset.

Dataset.to_mars() mars.DataFrame[source]

Convert this dataset into a MARS dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A MARS dataframe created from this dataset.

Dataset.to_modin() modin.DataFrame[source]

Convert this dataset into a Modin dataframe.

This works by first converting this dataset into a distributed set of Pandas dataframes (using .to_pandas_refs()). Please see caveats there. Then the individual dataframes are used to create the modin DataFrame using modin.distributed.dataframe.pandas.partitions.from_partitions().

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A Modin dataframe created from this dataset.

Dataset.to_spark(spark: pyspark.sql.SparkSession) pyspark.sql.DataFrame[source]

Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns

A Spark dataframe created from this dataset.

Dataset.to_pandas(limit: int = 100000) pandas.DataFrame[source]

Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow or Pandas records. An error is raised if the number of records exceeds the provided limit. Note that you can use .limit() on the dataset beforehand to truncate the dataset manually.

Time complexity: O(dataset size)

Parameters

limit – The maximum number of records to return. An error will be raised if the limit is exceeded.

Returns

A Pandas DataFrame created from this dataset, containing a limited number of records.

Dataset.to_pandas_refs() List[ray.types.ObjectRef[pandas.DataFrame]][source]

Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Returns

A list of remote Pandas dataframes created from this dataset.

Dataset.to_numpy_refs(*, column: Optional[str] = None) List[ray.types.ObjectRef[numpy.ndarray]][source]

Convert this dataset into a distributed set of NumPy ndarrays.

This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using .to_arrow() or .get_internal_block_refs().

Time complexity: O(dataset size / parallelism)

Parameters
  • column – The name of the column to convert to numpy, or None to specify the

  • blocks (entire row. If not specified for Arrow or Pandas) –

  • returned (each) –

  • ndarrays. (future will represent a dict of column) –

Returns

A list of remote NumPy ndarrays created from this dataset.

Dataset.to_arrow_refs() List[ray.types.ObjectRef[pyarrow.Table]][source]

Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records. This function is zero-copy if the existing data is already in Arrow format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns

A list of remote Arrow tables created from this dataset.

Dataset.to_random_access_dataset(key: str, num_workers: Optional[int] = None) ray.data.random_access_dataset.RandomAccessDataset[source]

Convert this Dataset into a distributed RandomAccessDataset (EXPERIMENTAL).

RandomAccessDataset partitions the dataset across the cluster by the given sort key, providing efficient random access to records via binary search. A number of worker actors are created, each of which has zero-copy access to the underlying sorted data blocks of the Dataset.

Note that the key must be unique in the dataset. If there are duplicate keys, an arbitrary value is returned.

This is only supported for Arrow-format datasets.

Parameters
  • key – The key column over which records can be queried.

  • num_workers – The number of actors to use to serve random access queries. By default, this is determined by multiplying the number of Ray nodes in the cluster by four. As a rule of thumb, you can expect each worker to provide ~3000 records / second via get_async(), and ~10000 records / second via multiget().

Inspecting Metadata

Dataset.count() int[source]

Count the number of records in the dataset.

Time complexity: O(dataset size / parallelism), O(1) for parquet

Returns

The number of records in the dataset.

Dataset.schema(fetch_if_missing: bool = False) Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset.

For datasets of Arrow records, this will return the Arrow schema. For datasets of Python objects, this returns their Python type.

Time complexity: O(1)

Parameters

fetch_if_missing – If True, synchronously fetch the schema if it’s not known. Default is False, where None is returned if the schema is not known.

Returns

The Python type or Arrow schema of the records, or None if the schema is not known and fetch_if_missing is False.

Dataset.default_batch_format() Type[source]

Return this dataset’s default batch format.

The default batch format describes what batches of data look like. To learn more about batch formats, read writing user-defined functions.

Example

If your dataset represents a list of Python objects, then the default batch format is list.

>>> ds = ray.data.range(100)
>>> ds  
Dataset(num_blocks=20, num_rows=100, schema=<class 'int'>)
>>> ds.default_batch_format()
<class 'list'>
>>> next(ds.iter_batches(batch_size=4))
[0, 1, 2, 3]

If your dataset contains a single TensorDtype or ArrowTensorType column named __value__ (as created by ray.data.from_numpy()), then the default batch format is np.ndarray. For more information on tensor datasets, read the tensor support guide.

>>> ds = ray.data.range_tensor(100)
>>> ds  
Dataset(num_blocks=20, num_rows=100, schema={__value__: ArrowTensorType(shape=(1,), dtype=int64)})
>>> ds.default_batch_format()
<class 'numpy.ndarray'>
>>> next(ds.iter_batches(batch_size=4))
array([[0],
       [1],
       [2],
       [3]])

If your dataset represents tabular data and doesn’t only consist of a __value__ tensor column (such as is created by ray.data.from_numpy()), then the default batch format is pd.DataFrame.

>>> import pandas as pd
>>> df = pd.DataFrame({"foo": ["a", "b"], "bar": [0, 1]})
>>> ds = ray.data.from_pandas(df)
>>> ds  
Dataset(num_blocks=1, num_rows=2, schema={foo: object, bar: int64})
>>> ds.default_batch_format()
<class 'pandas.core.frame.DataFrame'>
>>> next(ds.iter_batches(batch_size=4))
  foo  bar
0   a    0
1   b    1

See also

map_batches()

Call this function to transform batches of data.

iter_batches()

Call this function to iterate over batches of data.

Dataset.num_blocks() int[source]

Return the number of blocks of this dataset.

Note that during read and transform operations, the number of blocks may be dynamically adjusted to respect memory limits, increasing the number of blocks at runtime.

Time complexity: O(1)

Returns

The number of blocks of this dataset.

Dataset.size_bytes() int[source]

Return the in-memory size of the dataset.

Time complexity: O(1)

Returns

The in-memory size of the dataset in bytes, or None if the in-memory size is not known.

Dataset.input_files() List[str][source]

Return the list of input files for the dataset.

Time complexity: O(num input files)

Returns

The list of input files used to create the dataset, or an empty list if the input files is not known.

Dataset.stats() str[source]

Returns a string containing execution timing information.

Dataset.get_internal_block_refs() List[ray.types.ObjectRef[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes]]][source]

Get a list of references to the underlying blocks of this dataset.

This function can be used for zero-copy access to the data. It blocks until the underlying blocks are computed.

Time complexity: O(1)

Returns

A list of references to this dataset’s blocks.

DeveloperAPI: This API may change across minor Ray releases.

Execution

Dataset.fully_executed() ray.data.dataset.Dataset[ray.data.block.T][source]

Force full evaluation of the blocks of this dataset.

This can be used to read all blocks into memory. By default, Datasets doesn’t read blocks from the datasource until the first transform.

Returns

A Dataset with all blocks fully materialized in memory.

Dataset.is_fully_executed() bool[source]

Returns whether this Dataset has been fully executed.

This will return False if this Dataset is lazy and if the output of its final stage hasn’t been computed yet.

Dataset.lazy() ray.data.dataset.Dataset[ray.data.block.T][source]

Enable lazy evaluation.

The returned dataset is a lazy dataset, where all subsequent operations on the dataset won’t be executed until the dataset is consumed (e.g. .take(), .iter_batches(), .to_torch(), .to_tf(), etc.) or execution is manually triggered via .fully_executed().

Serialization

Dataset.has_serializable_lineage() bool[source]

Whether this dataset’s lineage is able to be serialized for storage and later deserialized, possibly on a different cluster.

Only datasets that are created from data that we know will still exist at deserialization time, e.g. data external to this Ray cluster such as persistent cloud object stores, support lineage-based serialization. All of the ray.data.read_*() APIs support lineage-based serialization.

Dataset.serialize_lineage() bytes[source]

Serialize this dataset’s lineage, not the actual data or the existing data futures, to bytes that can be stored and later deserialized, possibly on a different cluster.

Note that this will drop all computed data, and that everything will be recomputed from scratch after deserialization.

Use Dataset.deserialize_lineage() to deserialize the serialized bytes returned from this method into a Dataset.

NOTE: Unioned and zipped datasets, produced by :py:meth`Dataset.union` and Dataset.zip(), are not lineage-serializable.

Returns

Serialized bytes containing the lineage of this dataset.

DeveloperAPI: This API may change across minor Ray releases.

static Dataset.deserialize_lineage(serialized_ds: bytes) ray.data.dataset.Dataset[source]

Deserialize the provided lineage-serialized Dataset.

This assumes that the provided serialized bytes were serialized using Dataset.serialize_lineage().

Parameters

serialized_ds – The serialized Dataset that we wish to deserialize.

Returns

A deserialized Dataset instance.

DeveloperAPI: This API may change across minor Ray releases.