BatchPredictor.predict_pipelined(data: ray.data.dataset.Dataset, *, blocks_per_window: Optional[int] = None, bytes_per_window: Optional[int] = None, feature_columns: Optional[List[str]] = None, keep_columns: Optional[List[str]] = None, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: Optional[int] = None, num_gpus_per_worker: Optional[int] = None, separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) ray.data.dataset_pipeline.DatasetPipeline[source]#

Setup a prediction pipeline for batch scoring.

Unlike predict(), this generates a DatasetPipeline object and does not perform execution. Execution can be triggered by pulling from the pipeline.

This is a convenience wrapper around calling window() on the Dataset prior to passing it BatchPredictor.predict().

  • data – Ray dataset to run batch prediction on.

  • blocks_per_window – The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.

  • bytes_per_window – Specify the window size in bytes instead of blocks. This will be treated as an upper bound for the window size, but each window will still include at least one block. This is mutually exclusive with blocks_per_window.

  • feature_columns – List of columns in data to use for prediction. Columns not specified will be dropped from data before being passed to the predictor. If None, use all columns.

  • keep_columns – List of columns in data to include in the prediction result. This is useful for calculating final accuracies/metrics on the result dataset. If None, the columns in the output dataset will contain just the prediction results.

  • batch_size – Split dataset into batches of this size for prediction.

  • min_scoring_workers – Minimum number of scoring actors.

  • max_scoring_workers – If set, specify the maximum number of scoring actors.

  • num_cpus_per_worker – Number of CPUs to allocate per scoring worker.

  • num_gpus_per_worker – Number of GPUs to allocate per scoring worker.

  • separate_gpu_stage – If using GPUs, specifies whether to execute GPU processing in a separate stage (enabled by default). This avoids running expensive preprocessing steps on GPU workers.

  • ray_remote_args – Additional resource requirements to request from ray.

  • predict_kwargs – Keyword arguments passed to the predictor’s predict() method.


DatasetPipeline that generates scoring results.


import pandas as pd
import ray
from ray.train.batch_predictor import BatchPredictor

# Create a batch predictor that always returns `42` for each input.
batch_pred = BatchPredictor.from_pandas_udf(
    lambda data: pd.DataFrame({"a": [42] * len(data)}))

# Create a dummy dataset.
ds = ray.data.range_tensor(1000, parallelism=4)

# Setup a prediction pipeline.
print(batch_pred.predict_pipelined(ds, blocks_per_window=1))
DatasetPipeline(num_windows=4, num_stages=3)