Advanced: Read and Write Custom File Types#

This guide shows you how to extend Ray Data to read and write file types that aren’t natively supported. This is an advanced guide, and you’ll use unstable internal APIs.

Images are already supported with the read_images() and write_images() APIs, but this example shows you how to implement them for illustrative purposes.

Read data from files#

Tip

If you’re not contributing to Ray Data, you don’t need to create a Datasource. Instead, you can call read_binary_files() and decode files with map().

The core abstraction for reading files is FileBasedDatasource. It provides file-specific functionality on top of the Datasource interface.

To subclass FileBasedDatasource, implement the constructor and _read_stream.

Implement the constructor#

Call the superclass constructor and specify the files you want to read. Optionally, specify valid file extensions. Ray Data ignores files with other extensions.

from ray.data.datasource import FileBasedDatasource

class ImageDatasource(FileBasedDatasource):
    def __init__(self, paths: Union[str, List[str]], *, mode: str):
        super().__init__(
            paths,
            file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
        )

        self.mode = mode  # Specify read options in the constructor

Implement _read_stream#

_read_stream is a generator that yields one or more blocks of data from a file.

Blocks are a Data-internal abstraction for a collection of rows. They can be PyArrow tables, pandas DataFrames, or dictionaries of NumPy arrays.

Don’t create a block directly. Instead, add rows of data to a DelegatingBlockBuilder.

    def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
        import io
        import numpy as np
        from PIL import Image
        from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder

        data = f.readall()
        image = Image.open(io.BytesIO(data))
        image = image.convert(self.mode)

        # Each block contains one row
        builder = DelegatingBlockBuilder()
        array = np.array(image)
        item = {"image": array}
        builder.add(item)
        yield builder.build()

Read your data#

Once you’ve implemented ImageDatasource, call read_datasource() to read images into a Dataset. Ray Data reads your files in parallel.

import ray

ds = ray.data.read_datasource(
    ImageDatasource("s3://anonymous@ray-example-data/batoidea", mode="RGB")
)

Write data to files#

Note

The write interface is under active development and might change in the future. If you have feature requests, open a GitHub Issue.

The core abstractions for writing data to files are RowBasedFileDatasink and BlockBasedFileDatasink. They provide file-specific functionality on top of the Datasink interface.

If you want to write one row per file, subclass RowBasedFileDatasink. Otherwise, subclass BlockBasedFileDatasink.

In this example, you’ll write one image per file, so you’ll subclass RowBasedFileDatasink. To subclass RowBasedFileDatasink, implement the constructor and write_row_to_file().

Implement the constructor#

Call the superclass constructor and specify the folder to write to. Optionally, specify a string representing the file format (for example, "png"). Ray Data uses the file format as the file extension.

from ray.data.datasource import RowBasedFileDatasink

class ImageDatasink(RowBasedFileDatasink):
    def __init__(self, path: str, column: str, file_format: str):
        super().__init__(path, file_format=file_format)

        self.column = column
        self.file_format = file_format  # Specify write options in the constructor

Implement write_row_to_file#

write_row_to_file writes a row of data to a file. Each row is a dictionary that maps column names to values.

    def write_row_to_file(self, row: Dict[str, Any], file: pyarrow.NativeFile):
        import io
        from PIL import Image

        # PIL can't write to a NativeFile, so we have to write to a buffer first.
        image = Image.fromarray(row[self.column])
        buffer = io.BytesIO()
        image.save(buffer, format=self.file_format)
        file.write(buffer.getvalue())

Write your data#

Once you’ve implemented ImageDatasink, call write_datasink() to write images to files. Ray Data writes to multiple files in parallel.

ds.write_datasink(ImageDatasink("/tmp/results", column="image", file_format="png"))