Source code for ray.data.read_api

import itertools
import logging
from typing import List, Any, Dict, Union, Optional, Tuple, Callable, \
    TypeVar, TYPE_CHECKING

import numpy as np
if TYPE_CHECKING:
    import pyarrow
    import pandas
    import dask
    import mars
    import modin
    import pyspark

import ray
from ray.types import ObjectRef
from ray.util.annotations import PublicAPI, DeveloperAPI
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.data.context import DatasetContext
from ray.data.dataset import Dataset
from ray.data.datasource import Datasource, RangeDatasource, \
    JSONDatasource, CSVDatasource, ParquetDatasource, BinaryDatasource, \
    NumpyDatasource, ReadTask
from ray.data.impl.arrow_block import ArrowRow, \
    DelegatingArrowBlockBuilder
from ray.data.impl.block_list import BlockList
from ray.data.impl.lazy_block_list import LazyBlockList, BlockPartition, \
    BlockPartitionMetadata
from ray.data.impl.remote_fn import cached_remote_fn
from ray.data.impl.util import _get_spread_resources_iter

T = TypeVar("T")

logger = logging.getLogger(__name__)


[docs]@PublicAPI(stability="beta") def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]: """Create a dataset from a list of local Python objects. Examples: >>> ray.data.from_items([1, 2, 3, 4, 5]) Args: items: List of local Python objects. parallelism: The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items. Returns: Dataset holding the items. """ block_size = max(1, len(items) // parallelism) blocks: List[ObjectRef[Block]] = [] metadata: List[BlockMetadata] = [] i = 0 while i < len(items): builder = DelegatingArrowBlockBuilder() for item in items[i:i + block_size]: builder.add(item) block = builder.build() blocks.append(ray.put(block)) metadata.append( BlockAccessor.for_block(block).get_metadata(input_files=None)) i += block_size return Dataset(BlockList(blocks, metadata), 0)
[docs]@PublicAPI(stability="beta") def range(n: int, *, parallelism: int = 200) -> Dataset[int]: """Create a dataset from a range of integers [0..n). Examples: >>> ray.data.range(10000).map(lambda x: x * 2).show() Args: n: The upper bound of the range of integers. parallelism: The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items. Returns: Dataset holding the integers. """ return read_datasource( RangeDatasource(), parallelism=parallelism, n=n, block_format="list")
[docs]@PublicAPI(stability="beta") def range_arrow(n: int, *, parallelism: int = 200) -> Dataset[ArrowRow]: """Create an Arrow dataset from a range of integers [0..n). Examples: >>> ds = ray.data.range_arrow(1000) >>> ds.map(lambda r: {"v2": r["value"] * 2}).show() This is similar to range(), but uses Arrow tables to hold the integers in Arrow records. The dataset elements take the form {"value": N}. Args: n: The upper bound of the range of integer records. parallelism: The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items. Returns: Dataset holding the integers as Arrow records. """ return read_datasource( RangeDatasource(), parallelism=parallelism, n=n, block_format="arrow")
[docs]@PublicAPI(stability="beta") def range_tensor(n: int, *, shape: Tuple = (1, ), parallelism: int = 200) -> Dataset[ArrowRow]: """Create a Tensor dataset from a range of integers [0..n). Examples: >>> ds = ray.data.range_tensor(1000, shape=(3, 10)) >>> ds.map_batches(lambda arr: arr * 2, batch_format="pandas").show() This is similar to range_arrow(), but uses the ArrowTensorArray extension type. The dataset elements take the form {"value": array(N, shape=shape)}. Args: n: The upper bound of the range of integer records. shape: The shape of each record. parallelism: The amount of parallelism to use for the dataset. Parallelism may be limited by the number of items. Returns: Dataset holding the integers as Arrow tensor records. """ return read_datasource( RangeDatasource(), parallelism=parallelism, n=n, block_format="tensor", tensor_shape=tuple(shape))
[docs]@PublicAPI(stability="beta") def read_datasource(datasource: Datasource[T], *, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, _spread_resource_prefix: Optional[str] = None, **read_args) -> Dataset[T]: """Read a dataset from a custom data source. Args: datasource: The datasource to read data from. parallelism: The requested parallelism of the read. Parallelism may be limited by the available partitioning of the datasource. read_args: Additional kwargs to pass to the datasource impl. ray_remote_args: kwargs passed to ray.remote in the read tasks. Returns: Dataset holding the data read from the datasource. """ read_tasks = datasource.prepare_read(parallelism, **read_args) context = DatasetContext.get_current() def remote_read(task: ReadTask) -> Block: DatasetContext._set_current(context) return task() if ray_remote_args is None: ray_remote_args = {} # Increase the read parallelism by default to maximize IO throughput. This # is particularly important when reading from e.g., remote storage. if "num_cpus" not in ray_remote_args: # Note that the too many workers warning triggers at 4x subscription, # so we go at 0.5 to avoid the warning message. ray_remote_args["num_cpus"] = 0.5 remote_read = cached_remote_fn(remote_read) if _spread_resource_prefix is not None: # Use given spread resource prefix for round-robin resource-based # scheduling. nodes = ray.nodes() resource_iter = _get_spread_resources_iter( nodes, _spread_resource_prefix, ray_remote_args) else: # If no spread resource prefix given, yield an empty dictionary. resource_iter = itertools.repeat({}) calls: List[Callable[[], ObjectRef[BlockPartition]]] = [] metadata: List[BlockPartitionMetadata] = [] for task in read_tasks: calls.append( lambda task=task, resources=next(resource_iter): remote_read.options( **ray_remote_args, resources=resources).remote(task)) metadata.append(task.get_metadata()) block_list = LazyBlockList(calls, metadata) # Get the schema from the first block synchronously. if metadata and metadata[0].schema is None: block_list.ensure_schema_for_first_block() return Dataset(block_list, 0)
[docs]@PublicAPI(stability="beta") def read_parquet(paths: Union[str, List[str]], *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, columns: Optional[List[str]] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, _tensor_column_schema: Optional[Dict[str, Tuple[ np.dtype, Tuple[int, ...]]]] = None, **arrow_parquet_args) -> Dataset[ArrowRow]: """Create an Arrow dataset from parquet files. Examples: >>> # Read a directory of files in remote storage. >>> ray.data.read_parquet("s3://bucket/path") >>> # Read multiple local files. >>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) Args: paths: A single file path or a list of file paths (or directories). filesystem: The filesystem implementation to read from. columns: A list of column names to read. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. ray_remote_args: kwargs passed to ray.remote in the read tasks. _tensor_column_schema: A dict of column name --> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw NumPy array format in C-contiguous order (e.g. via `arr.tobytes()`). arrow_parquet_args: Other parquet read options to pass to pyarrow. Returns: Dataset holding Arrow records read from the specified paths. """ if _tensor_column_schema is not None: existing_block_udf = arrow_parquet_args.pop("_block_udf", None) def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table": from ray.data.extensions import ArrowTensorArray for tensor_col_name, (dtype, shape) in _tensor_column_schema.items(): # NOTE(Clark): We use NumPy to consolidate these potentially # non-contiguous buffers, and to do buffer bookkeeping in # general. np_col = np.array([ np.ndarray(shape, buffer=buf.as_buffer(), dtype=dtype) for buf in block.column(tensor_col_name) ]) block = block.set_column( block._ensure_integer_index(tensor_col_name), tensor_col_name, ArrowTensorArray.from_numpy(np_col)) if existing_block_udf is not None: # Apply UDF after casting the tensor columns. block = existing_block_udf(block) return block arrow_parquet_args["_block_udf"] = _block_udf return read_datasource( ParquetDatasource(), parallelism=parallelism, paths=paths, filesystem=filesystem, columns=columns, ray_remote_args=ray_remote_args, **arrow_parquet_args)
[docs]@PublicAPI(stability="beta") def read_json(paths: Union[str, List[str]], *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **arrow_json_args) -> Dataset[ArrowRow]: """Create an Arrow dataset from json files. Examples: >>> # Read a directory of files in remote storage. >>> ray.data.read_json("s3://bucket/path") >>> # Read multiple local files. >>> ray.data.read_json(["/path/to/file1", "/path/to/file2"]) >>> # Read multiple directories. >>> ray.data.read_json(["s3://bucket/path1", "s3://bucket/path2"]) Args: paths: A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories. filesystem: The filesystem implementation to read from. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. ray_remote_args: kwargs passed to ray.remote in the read tasks. arrow_open_stream_args: kwargs passed to pyarrow.fs.FileSystem.open_input_stream arrow_json_args: Other json read options to pass to pyarrow. Returns: Dataset holding Arrow records read from the specified paths. """ return read_datasource( JSONDatasource(), parallelism=parallelism, paths=paths, filesystem=filesystem, ray_remote_args=ray_remote_args, open_stream_args=arrow_open_stream_args, **arrow_json_args)
[docs]@PublicAPI(stability="beta") def read_csv(paths: Union[str, List[str]], *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **arrow_csv_args) -> Dataset[ArrowRow]: """Create an Arrow dataset from csv files. Examples: >>> # Read a directory of files in remote storage. >>> ray.data.read_csv("s3://bucket/path") >>> # Read multiple local files. >>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"]) >>> # Read multiple directories. >>> ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"]) Args: paths: A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories. filesystem: The filesystem implementation to read from. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. ray_remote_args: kwargs passed to ray.remote in the read tasks. arrow_open_stream_args: kwargs passed to pyarrow.fs.FileSystem.open_input_stream arrow_csv_args: Other csv read options to pass to pyarrow. Returns: Dataset holding Arrow records read from the specified paths. """ return read_datasource( CSVDatasource(), parallelism=parallelism, paths=paths, filesystem=filesystem, ray_remote_args=ray_remote_args, open_stream_args=arrow_open_stream_args, **arrow_csv_args)
[docs]@PublicAPI(stability="beta") def read_text( paths: Union[str, List[str]], *, encoding: str = "utf-8", filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = 200, arrow_open_stream_args: Optional[Dict[str, Any]] = None, ) -> Dataset[str]: """Create a dataset from lines stored in text files. Examples: >>> # Read a directory of files in remote storage. >>> ray.data.read_text("s3://bucket/path") >>> # Read multiple local files. >>> ray.data.read_text(["/path/to/file1", "/path/to/file2"]) Args: paths: A single file path or a list of file paths (or directories). encoding: The encoding of the files (e.g., "utf-8" or "ascii"). filesystem: The filesystem implementation to read from. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. arrow_open_stream_args: kwargs passed to pyarrow.fs.FileSystem.open_input_stream Returns: Dataset holding lines of text read from the specified paths. """ return read_binary_files( paths, filesystem=filesystem, parallelism=parallelism, arrow_open_stream_args=arrow_open_stream_args).flat_map( lambda x: x.decode(encoding).split("\n"))
[docs]@PublicAPI(stability="beta") def read_numpy(paths: Union[str, List[str]], *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = 200, arrow_open_stream_args: Optional[Dict[str, Any]] = None, **numpy_load_args) -> Dataset[ArrowRow]: """Create an Arrow dataset from csv files. Examples: >>> # Read a directory of files in remote storage. >>> ray.data.read_numpy("s3://bucket/path") >>> # Read multiple local files. >>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"]) >>> # Read multiple directories. >>> ray.data.read_numpy(["s3://bucket/path1", "s3://bucket/path2"]) Args: paths: A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories. filesystem: The filesystem implementation to read from. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. arrow_open_stream_args: kwargs passed to pyarrow.fs.FileSystem.open_input_stream numpy_load_args: Other options to pass to np.load. Returns: Dataset holding Tensor records read from the specified paths. """ return read_datasource( NumpyDatasource(), parallelism=parallelism, paths=paths, filesystem=filesystem, open_stream_args=arrow_open_stream_args, **numpy_load_args)
[docs]@PublicAPI(stability="beta") def read_binary_files( paths: Union[str, List[str]], *, include_paths: bool = False, filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, ) -> Dataset[Union[Tuple[str, bytes], bytes]]: """Create a dataset from binary files of arbitrary contents. Examples: >>> # Read a directory of files in remote storage. >>> ray.data.read_binary_files("s3://bucket/path") >>> # Read multiple local files. >>> ray.data.read_binary_files(["/path/to/file1", "/path/to/file2"]) Args: paths: A single file path or a list of file paths (or directories). include_paths: Whether to include the full path of the file in the dataset records. When specified, the dataset records will be a tuple of the file path and the file contents. filesystem: The filesystem implementation to read from. ray_remote_args: kwargs passed to ray.remote in the read tasks. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. arrow_open_stream_args: kwargs passed to pyarrow.fs.FileSystem.open_input_stream Returns: Dataset holding Arrow records read from the specified paths. """ return read_datasource( BinaryDatasource(), parallelism=parallelism, paths=paths, include_paths=include_paths, filesystem=filesystem, ray_remote_args=ray_remote_args, open_stream_args=arrow_open_stream_args, schema=bytes)
[docs]@PublicAPI(stability="beta") def from_dask(df: "dask.DataFrame") -> Dataset[ArrowRow]: """Create a dataset from a Dask DataFrame. Args: df: A Dask DataFrame. Returns: Dataset holding Arrow records read from the DataFrame. """ import dask from ray.util.dask import ray_dask_get partitions = df.to_delayed() persisted_partitions = dask.persist(*partitions, scheduler=ray_dask_get) return from_pandas_refs( [next(iter(part.dask.values())) for part in persisted_partitions])
[docs]@PublicAPI(stability="beta") def from_mars(df: "mars.DataFrame") -> Dataset[ArrowRow]: """Create a dataset from a MARS dataframe. Args: df: A MARS dataframe, which must be executed by MARS-on-Ray. Returns: Dataset holding Arrow records read from the dataframe. """ raise NotImplementedError # P1
[docs]@PublicAPI(stability="beta") def from_modin(df: "modin.DataFrame") -> Dataset[ArrowRow]: """Create a dataset from a Modin dataframe. Args: df: A Modin dataframe, which must be using the Ray backend. Returns: Dataset holding Arrow records read from the dataframe. """ from modin.distributed.dataframe.pandas.partitions import unwrap_partitions parts = unwrap_partitions(df, axis=0) return from_pandas_refs(parts)
[docs]@PublicAPI(stability="beta") def from_pandas(dfs: Union["pandas.DataFrame", List["pandas.DataFrame"]] ) -> Dataset[ArrowRow]: """Create a dataset from a list of Pandas dataframes. Args: dfs: A Pandas dataframe or a list of Pandas dataframes. Returns: Dataset holding Arrow records read from the dataframes. """ import pandas as pd if isinstance(dfs, pd.DataFrame): dfs = [dfs] return from_pandas_refs([ray.put(df) for df in dfs])
[docs]@DeveloperAPI def from_pandas_refs(dfs: Union[ObjectRef["pandas.DataFrame"], List[ObjectRef[ "pandas.DataFrame"]]]) -> Dataset[ArrowRow]: """Create a dataset from a list of Ray object references to Pandas dataframes. Args: dfs: A Ray object references to pandas dataframe, or a list of Ray object references to pandas dataframes. Returns: Dataset holding Arrow records read from the dataframes. """ if isinstance(dfs, ray.ObjectRef): dfs = [dfs] df_to_block = cached_remote_fn(_df_to_block, num_returns=2) res = [df_to_block.remote(df) for df in dfs] blocks, metadata = zip(*res) return Dataset(BlockList(blocks, ray.get(list(metadata))), 0)
[docs]def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]: """Create a dataset from a set of NumPy ndarrays. Args: ndarrays: A list of Ray object references to NumPy ndarrays. Returns: Dataset holding the given ndarrays. """ ndarray_to_block = cached_remote_fn(_ndarray_to_block, num_returns=2) res = [ndarray_to_block.remote(ndarray) for ndarray in ndarrays] blocks, metadata = zip(*res) return Dataset(BlockList(blocks, ray.get(list(metadata))), 0)
[docs]@PublicAPI(stability="beta") def from_arrow(tables: Union["pyarrow.Table", bytes, List[Union[ "pyarrow.Table", bytes]]]) -> Dataset[ArrowRow]: """Create a dataset from a list of Arrow tables. Args: tables: An Arrow table, or a list of Arrow tables, or its streaming format in bytes. Returns: Dataset holding Arrow records from the tables. """ import pyarrow as pa if isinstance(tables, (pa.Table, bytes)): tables = [tables] return from_arrow_refs([ray.put(t) for t in tables])
[docs]@DeveloperAPI def from_arrow_refs( tables: Union[ObjectRef[Union["pyarrow.Table", bytes]], List[ObjectRef[ Union["pyarrow.Table", bytes]]]]) -> Dataset[ArrowRow]: """Create a dataset from a set of Arrow tables. Args: tables: A Ray object reference to Arrow table, or list of Ray object references to Arrow tables, or its streaming format in bytes. Returns: Dataset holding Arrow records from the tables. """ if isinstance(tables, ray.ObjectRef): tables = [tables] get_metadata = cached_remote_fn(_get_metadata) metadata = [get_metadata.remote(t) for t in tables] return Dataset(BlockList(tables, ray.get(metadata)), 0)
[docs]@PublicAPI(stability="beta") def from_spark(df: "pyspark.sql.DataFrame", *, parallelism: Optional[int] = None) -> Dataset[ArrowRow]: """Create a dataset from a Spark dataframe. Args: spark: A SparkSession, which must be created by RayDP (Spark-on-Ray). df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. If not provided, it will be equal to the number of partitions of the original Spark dataframe. Returns: Dataset holding Arrow records read from the dataframe. """ import raydp return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism)
def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: import pyarrow as pa block = pa.table(df) return (block, BlockAccessor.for_block(block).get_metadata(input_files=None)) def _ndarray_to_block(ndarray: np.ndarray) -> Block[np.ndarray]: import pyarrow as pa from ray.data.extensions import TensorArray table = pa.Table.from_pydict({"value": TensorArray(ndarray)}) return (table, BlockAccessor.for_block(table).get_metadata(input_files=None)) def _get_metadata(table: "pyarrow.Table") -> BlockMetadata: return BlockAccessor.for_block(table).get_metadata(input_files=None)