ray.data.Dataset.flat_map#

Dataset.flat_map(fn: Callable[[Dict[str, Any]], List[Dict[str, Any]]] | Callable[[Dict[str, Any]], Iterator[List[Dict[str, Any]]]] | _CallableClassProtocol, *, compute: ComputeStrategy | None = None, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset[source]#

Apply the given function to each row and then flatten results.

Use this method if your transformation returns multiple rows for each input row.

You can use either a function or a callable class to perform the transformation. For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses stateful Ray actors. For more information, see Stateful Transforms.

Tip

map_batches() can also modify the number of rows. If your transformation is vectorized like most NumPy and pandas operations, it might be faster.

Warning

Specifying both num_cpus and num_gpus for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Ray team.

Examples

from typing import Any, Dict, List
import ray

def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    return [row] * 2

print(
    ray.data.range(3)
    .flat_map(duplicate_row)
    .take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]

Time complexity: O(dataset size / parallelism)

Parameters:
  • fn – The function or generator to apply to each record, or a class type that can be instantiated to create such a callable.

  • compute – This argument is deprecated. Use concurrency argument.

  • fn_args – Positional arguments to pass to fn after the first argument. These arguments are top-level arguments to the underlying Ray task.

  • fn_kwargs – Keyword arguments to pass to fn. These arguments are top-level arguments to the underlying Ray task.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Ray actor construction task.

  • num_cpus – The number of CPUs to reserve for each parallel map worker.

  • num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify num_gpus=1 to request 1 GPU for each parallel map worker.

  • concurrency – The number of Ray workers to use concurrently. For a fixed-sized worker pool of size n, specify concurrency=n. For an autoscaling worker pool from m to n workers, specify concurrency=(m, n).

  • ray_remote_args_fn – A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor/task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in ray_remote_args. Note: this is an advanced, experimental feature.

  • ray_remote_args – Additional resource requirements to request from Ray for each map worker. See ray.remote() for details.

See also

map_batches()

Call this method to transform batches of data.

map()

Call this method to transform one row at time.