BatchPredictor.predict(data: Union[ray.data.dataset.Dataset, ray.data.dataset_pipeline.DatasetPipeline], *, feature_columns: Optional[List[str]] = None, keep_columns: Optional[List[str]] = None, batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, num_cpus_per_worker: Optional[int] = None, num_gpus_per_worker: Optional[int] = None, separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs) Union[ray.data.dataset.Dataset, ray.data.dataset_pipeline.DatasetPipeline][source]#

Run batch scoring on a Dataset.

  • data – Ray dataset or pipeline to run batch prediction on.

  • feature_columns – List of columns in the preprocessed dataset to use for prediction. Columns not specified will be dropped from data before being passed to the predictor. If None, use all columns in the preprocessed dataset.

  • keep_columns – List of columns in the preprocessed dataset to include in the prediction result. This is useful for calculating final accuracies/metrics on the result dataset. If None, the columns in the output dataset will contain just the prediction results.

  • batch_size – Split dataset into batches of this size for prediction.

  • min_scoring_workers – Minimum number of scoring actors.

  • max_scoring_workers – If set, specify the maximum number of scoring actors.

  • num_cpus_per_worker – Number of CPUs to allocate per scoring worker.

  • num_gpus_per_worker – Number of GPUs to allocate per scoring worker.

  • separate_gpu_stage – If using GPUs, specifies whether to execute GPU processing in a separate stage (enabled by default). This avoids running expensive preprocessing steps on GPU workers.

  • ray_remote_args – Additional resource requirements to request from ray.

  • predict_kwargs – Keyword arguments passed to the predictor’s predict() method.


Dataset containing scoring results.


import pandas as pd
import ray
from ray.train.batch_predictor import BatchPredictor

def calculate_accuracy(df):
    return pd.DataFrame({"correct": df["preds"] == df["label"]})

# Create a batch predictor that returns identity as the predictions.
batch_pred = BatchPredictor.from_pandas_udf(
    lambda data: pd.DataFrame({"preds": data["feature_1"]}))

# Create a dummy dataset.
ds = ray.data.from_pandas(pd.DataFrame({
    "feature_1": [1, 2, 3], "label": [1, 2, 3]}))

# Execute batch prediction using this predictor.
predictions = batch_pred.predict(ds,
    feature_columns=["feature_1"], keep_columns=["label"])

# print predictions and calculate final accuracy
correct = predictions.map_batches(calculate_accuracy)
print(f"Final accuracy: {correct.sum(on='correct') / correct.count()}")
Dataset(num_blocks=1, num_rows=3, schema={preds: int64, label: int64})
Final accuracy: 1.0