Source code for ray.data.datasource.filename_provider
from typing import Any, Dict, Optional
from ray.data.block import Block
from ray.util.annotations import PublicAPI
[docs]
@PublicAPI(stability="alpha")
class FilenameProvider:
"""Generates filenames when you write a :class:`~ray.data.Dataset`.
Use this class to customize the filenames used when writing a Dataset.
Some methods write each row to a separate file, while others write each block to a
separate file. For example, :meth:`ray.data.Dataset.write_images` writes individual
rows, and :func:`ray.data.Dataset.write_parquet` writes blocks of data. For more
information about blocks, see :ref:`Data internals <datasets_scheduling>`.
If you're writing each row to a separate file, implement
:meth:`~FilenameProvider.get_filename_for_row`. Otherwise, implement
:meth:`~FilenameProvider.get_filename_for_block`.
Example:
This snippet shows you how to encode labels in written files. For example, if
`"cat"` is a label, you might write a file named `cat_000000_000000_000000.png`.
.. testcode::
import ray
from ray.data.datasource import FilenameProvider
class ImageFilenameProvider(FilenameProvider):
def __init__(self, file_format: str):
self.file_format = file_format
def get_filename_for_row(self, row, task_index, block_index, row_index):
return (
f"{row['label']}_{task_index:06}_{block_index:06}"
f"_{row_index:06}.{self.file_format}"
)
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/images.parquet")
ds.write_images(
"/tmp/results",
column="image",
filename_provider=ImageFilenameProvider("png")
)
""" # noqa: E501
[docs]
def get_filename_for_block(
self, block: Block, task_index: int, block_index: int
) -> str:
"""Generate a filename for a block of data.
.. note::
Filenames must be unique and deterministic for a given task and block index.
A block consists of multiple rows and corresponds to a single output file.
Each task might produce a different number of blocks.
Args:
block: The block that will be written to a file.
task_index: The index of the the write task.
block_index: The index of the block *within* the write task.
"""
raise NotImplementedError
[docs]
def get_filename_for_row(
self, row: Dict[str, Any], task_index: int, block_index: int, row_index: int
) -> str:
"""Generate a filename for a row.
.. note::
Filenames must be unique and deterministic for a given task, block, and row
index.
A block consists of multiple rows, and each row corresponds to a single
output file. Each task might produce a different number of blocks, and each
block might contain a different number of rows.
.. tip::
If you require a contiguous row index into the global dataset, use
:meth:`~ray.data.Dataset.iter_rows`. This method is single-threaded and
isn't recommended for large datasets.
Args:
row: The row that will be written to a file.
task_index: The index of the the write task.
block_index: The index of the block *within* the write task.
row_index: The index of the row *within* the block.
"""
raise NotImplementedError
class _DefaultFilenameProvider(FilenameProvider):
def __init__(
self, dataset_uuid: Optional[str] = None, file_format: Optional[str] = None
):
self._dataset_uuid = dataset_uuid
self._file_format = file_format
def get_filename_for_block(
self, block: Block, task_index: int, block_index: int
) -> str:
file_id = f"{task_index:06}_{block_index:06}"
return self._generate_filename(file_id)
def get_filename_for_row(
self, row: Dict[str, Any], task_index: int, block_index: int, row_index: int
) -> str:
file_id = f"{task_index:06}_{block_index:06}_{row_index:06}"
return self._generate_filename(file_id)
def _generate_filename(self, file_id: str) -> str:
filename = ""
if self._dataset_uuid is not None:
filename += f"{self._dataset_uuid}_"
filename += file_id
if self._file_format is not None:
filename += f".{self._file_format}"
return filename