ray.data.llm.vLLMEngineProcessorConfig#

class ray.data.llm.vLLMEngineProcessorConfig(*, batch_size: int = 32, resources_per_bundle: ~typing.Dict[str, float] | None = None, accelerator_type: str | None = None, concurrency: int | ~typing.Tuple[int, int] = 1, experimental: ~typing.Dict[str, ~typing.Any] = <factory>, model_source: str, runtime_env: ~typing.Dict[str, ~typing.Any] | None = None, max_pending_requests: int | None = None, max_concurrent_batches: int = 8, max_tasks_in_flight_per_actor: int | None = None, should_continue_on_error: bool = False, apply_chat_template: bool = True, chat_template: str | None = None, tokenize: bool = True, detokenize: bool = True, chat_template_stage: ~typing.Any = True, tokenize_stage: ~typing.Any = True, detokenize_stage: ~typing.Any = True, prepare_multimodal_stage: ~typing.Any = False, engine_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>, task_type: ~typing.Literal['generate', 'embed', 'classify', 'score'] = 'generate', log_engine_metrics: bool = True, dynamic_lora_loading_path: str | None = None, placement_group_config: ~typing.Dict[str, ~typing.Any] | None = None)[source]#

The configuration for the vLLM engine processor.

Parameters:
  • model_source – The model source to use for the vLLM engine.

  • batch_size – The batch size to send to the vLLM engine. Large batch sizes are likely to saturate the compute resources and could achieve higher throughput. On the other hand, small batch sizes are more fault-tolerant and could reduce bubbles in the data pipeline. You can tune the batch size to balance the throughput and fault-tolerance based on your use case.

  • engine_kwargs – The kwargs to pass to the vLLM engine. Defaults to pipeline_parallel_size: 1 and tensor_parallel_size: 1. Ray Data LLM sets distributed_executor_backend to "uni" when tp*pp == 1 and "ray" otherwise. vLLM’s max_num_seqs default is resolved by vLLM and is GPU-dependent (e.g., 256 on A100/A10G, 1024 on H100/MI300x).

  • task_type – The task type to use. If not specified, will use ‘generate’ by default.

  • runtime_env – The runtime environment to use for the vLLM engine. See this doc for more details.

  • max_pending_requests – The maximum number of pending requests. If unset, defaults to ceil(1.1 * max_num_seqs * pipeline_parallel_size) using vLLM’s resolved engine config.

  • max_concurrent_batches – The maximum number of concurrent batches in the engine. Overlapping batch processing reduces per-batch tail latency. Sets the engine actor’s Ray Core max_concurrency. The default is tuned for batch sizes >= 32; consider increasing it for smaller batch sizes or short per-batch latencies.

  • max_tasks_in_flight_per_actor – Max tasks Ray Data submits concurrently to each engine actor. Passed through to ray.data.ActorPoolStrategy. If unset, Ray Data uses ray.data.DataContext.max_tasks_in_flight_per_actor if set globally. Otherwise, it defaults to 2 * max_concurrent_batches; the factor can be overridden via the RAY_DATA_ACTOR_DEFAULT_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR env var.

  • should_continue_on_error – If True, continue processing when inference fails for a row instead of raising an exception. Failed rows will have a non-empty __inference_error__ column containing the error message; the other output columns are populated with type-appropriate defaults (empty string/list, None, 0, or -1). Error rows bypass postprocess. If False (default), any inference error will raise an exception.

  • log_engine_metrics – If True (default), export vLLM engine metrics (prefix cache hit rate, TTFT, TPOT, KV cache utilization, etc.) via Ray’s Prometheus endpoint.

  • dynamic_lora_loading_path – Path holding dynamic LoRA adapter checkpoints (one per subfolder). If unset and LoRA is used, the model in a LoRA request is interpreted as a HF model ID.

  • placement_group_config – Optional placement group config for scheduling vLLM engine workers. Accepts bundle_per_worker (auto-replicated by tp*pp) or bundles (full list of resource dicts), plus an optional strategy (PACK/STRICT_PACK/SPREAD/STRICT_SPREAD).

  • chat_template_stage – Chat templating stage config (bool | dict | ChatTemplateStageConfig). Defaults to True. Use nested config for per-stage control over batch_size, concurrency, runtime_env, num_cpus, memory, and model_source. Legacy apply_chat_template and chat_template fields are deprecated but still supported.

  • tokenize_stage – Tokenizer stage config (bool | dict | TokenizerStageConfig). Defaults to True. Use nested config for per-stage control over batch_size, concurrency, runtime_env, num_cpus, memory, and model_source. Legacy tokenize field is deprecated but still supported.

  • detokenize_stage – Detokenizer stage config (bool | dict | DetokenizeStageConfig). Defaults to True. Use nested config for per-stage control over batch_size, concurrency, runtime_env, num_cpus, memory, and model_source. Legacy detokenize field is deprecated but still supported.

  • prepare_multimodal_stage – Multimodal preprocessing stage config (bool | dict | PrepareMultimodalStageConfig). Defaults to False. Use nested config for per-stage control over batch_size, concurrency, runtime_env, num_cpus, memory, model_config_kwargs, chat_template_content_format, and apply_sys_msg_formatting.

  • accelerator_type – The accelerator type required for the vLLM engine workers (e.g., “H100”, “A100”).

  • concurrency – The number of workers for data parallelism. Default to 1. If concurrency is a tuple (m, n), Ray creates an autoscaling actor pool that scales between m and n workers (1 <= m <= n). If concurrency is an int n, both CPU and GPU stages use an autoscaling pool from (1, n). Stage-specific concurrency can be set via nested stage configs.

Examples

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_processor

config = vLLMEngineProcessorConfig(
    model_source="meta-llama/Meta-Llama-3.1-8B-Instruct",
    engine_kwargs=dict(
        enable_prefix_caching=True,
        enable_chunked_prefill=True,
        max_num_batched_tokens=4096,
    ),
    concurrency=1,
    batch_size=64,
)
processor = build_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a calculator"},
            {"role": "user", "content": f"{row['id']} ** 3 = ?"},
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=20,
            detokenize=False,
        ),
    ),
    postprocess=lambda row: dict(
        resp=row["generated_text"],
    ),
)

# The processor requires specific input columns, which depend on
# your processor config. You can use the following API to check
# the required input columns:
processor.log_input_column_names()
# Example log:
# The first stage of the processor is ChatTemplateStage.
# Required input columns:
#     messages: A list of messages in OpenAI chat format.

ds = ray.data.range(300)
ds = processor(ds)
for row in ds.take_all():
    print(row)

PublicAPI (beta): This API is in beta and may change before becoming stable.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].