ray.data.llm.vLLMEngineProcessorConfig#

class ray.data.llm.vLLMEngineProcessorConfig(*, batch_size: int = 32, resources_per_bundle: Dict[str, float] | None = None, accelerator_type: str | None = None, concurrency: int | Tuple[int, int] = 1, experimental: Dict[str, Any] = None, model_source: str, runtime_env: Dict[str, Any] | None = None, max_pending_requests: int | None = None, max_concurrent_batches: int = 8, apply_chat_template: bool = True, chat_template: str | None = None, tokenize: bool = True, detokenize: bool = True, has_image: bool = False, engine_kwargs: Dict[str, Any] = None, task_type: vLLMTaskType = vLLMTaskType.GENERATE, dynamic_lora_loading_path: str | None = None, placement_group_config: Dict[str, Any] | None = None)[source]#

The configuration for the vLLM engine processor.

Parameters:
  • model_source – The model source to use for the vLLM engine.

  • batch_size – The batch size to send to the vLLM engine. Large batch sizes are likely to saturate the compute resources and could achieve higher throughput. On the other hand, small batch sizes are more fault-tolerant and could reduce bubbles in the data pipeline. You can tune the batch size to balance the throughput and fault-tolerance based on your use case.

  • engine_kwargs – The kwargs to pass to the vLLM engine. Default engine kwargs are pipeline_parallel_size: 1, tensor_parallel_size: 1, max_num_seqs: 128, distributed_executor_backend: “mp”.

  • task_type – The task type to use. If not specified, will use ‘generate’ by default.

  • runtime_env – The runtime environment to use for the vLLM engine. See this doc for more details.

  • max_pending_requests – The maximum number of pending requests. If not specified, will use the default value from the vLLM engine.

  • max_concurrent_batches – The maximum number of concurrent batches in the engine. This is to overlap the batch processing to avoid the tail latency of each batch. The default value may not be optimal when the batch size or the batch processing latency is too small, but it should be good enough for batch size >= 64.

  • apply_chat_template – Whether to apply chat template.

  • chat_template – The chat template to use. This is usually not needed if the model checkpoint already contains the chat template.

  • tokenize – Whether to tokenize the input before passing it to the vLLM engine. If not, vLLM will tokenize the prompt in the engine.

  • detokenize – Whether to detokenize the output.

  • has_image – Whether the input messages have images.

  • accelerator_type – The accelerator type used by the LLM stage in a processor. Default to None, meaning that only the CPU will be used.

  • concurrency – The number of workers for data parallelism. Default to 1. If concurrency is a tuple (m, n), Ray creates an autoscaling actor pool that scales between m and n workers (1 <= m <= n). If concurrency is an int n, CPU stages use an autoscaling pool from (1, n), while GPU stages use a fixed pool of n workers.

Examples

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="meta-llama/Meta-Llama-3.1-8B-Instruct",
    engine_kwargs=dict(
        enable_prefix_caching=True,
        enable_chunked_prefill=True,
        max_num_batched_tokens=4096,
    ),
    concurrency=1,
    batch_size=64,
)
processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a calculator"},
            {"role": "user", "content": f"{row['id']} ** 3 = ?"},
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=20,
            detokenize=False,
        ),
    ),
    postprocess=lambda row: dict(
        resp=row["generated_text"],
    ),
)

# The processor requires specific input columns, which depend on
# your processor config. You can use the following API to check
# the required input columns:
processor.log_input_column_names()
# Example log:
# The first stage of the processor is ChatTemplateStage.
# Required input columns:
#     messages: A list of messages in OpenAI chat format.

ds = ray.data.range(300)
ds = processor(ds)
for row in ds.take_all():
    print(row)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'accelerator_type': FieldInfo(annotation=Union[str, NoneType], required=False, description='The accelerator type used by the LLM stage in a processor. Default to None, meaning that only the CPU will be used.'), 'apply_chat_template': FieldInfo(annotation=bool, required=False, default=True, description='Whether to apply chat template.'), 'batch_size': FieldInfo(annotation=int, required=False, default=32, description='Large batch sizes are likely to saturate the compute resources and could achieve higher throughput. On the other hand, small batch sizes are more fault-tolerant and could reduce bubbles in the data pipeline. You can tune the batch size to balance the throughput and fault-tolerance based on your use case. Defaults to 32.'), 'chat_template': FieldInfo(annotation=Union[str, NoneType], required=False, description='The chat template to use. This is usually not needed if the model checkpoint already contains the chat template.'), 'concurrency': FieldInfo(annotation=Union[int, Tuple[int, int]], required=False, default=1, description='The number of workers for data parallelism. Default to 1. If ``concurrency`` is a ``tuple`` ``(m, n)``, Ray creates an autoscaling actor pool that scales between ``m`` and ``n`` workers (``1 <= m <= n``). If ``concurrency`` is an ``int`` ``n``, Ray uses either a fixed pool of ``n`` workers or an autoscaling pool from ``1`` to ``n`` workers, depending on the processor and stage.'), 'detokenize': FieldInfo(annotation=bool, required=False, default=True, description='Whether to detokenize the output.'), 'dynamic_lora_loading_path': FieldInfo(annotation=Union[str, NoneType], required=False, description="The path to the dynamic LoRA adapter. It is expected to hold subfolders each for a different lora checkpoint. If not specified and LoRA is enabled, then the 'model' in LoRA requests will be interpreted as model ID used by HF transformers."), 'engine_kwargs': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='The kwargs to pass to the vLLM engine. See https://docs.vllm.ai/en/latest/serving/engine_args.html for more details.'), 'experimental': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='[Experimental] Experimental configurations.Supported keys:\n`max_tasks_in_flight_per_actor`: The maximum number of tasks in flight per actor. Default to 4.'), 'has_image': FieldInfo(annotation=bool, required=False, default=False, description='Whether the input messages have images.'), 'max_concurrent_batches': FieldInfo(annotation=int, required=False, default=8, description='The maximum number of concurrent batches in the engine. This is to overlap the batch processing to avoid the tail latency of each batch. The default value may not be optimal when the batch size or the batch processing latency is too small, but it should be good enough for batch size >= 32.'), 'max_pending_requests': FieldInfo(annotation=Union[int, NoneType], required=False, description='The maximum number of pending requests. If not specified, will use the default value from the backend engine.'), 'model_source': FieldInfo(annotation=str, required=True, description='The model source to use for the offline processing.'), 'placement_group_config': FieldInfo(annotation=Union[Dict[str, Any], NoneType], required=False, description="Ray placement group configuration for scheduling vLLM engine workers. Should be a dictionary with 'bundles' (list of resource dicts, e.g., {'CPU': 1, 'GPU': 1}) and an optional 'strategy' key ('PACK', 'STRICT_PACK', 'SPREAD', or 'STRICT_SPREAD'). For ray distributed executor backend, each bundle must specify at most one GPU. For mp backend, the 'strategy' field is ignored."), 'resources_per_bundle': FieldInfo(annotation=Union[Dict[str, float], NoneType], required=False, description='[DEPRECATED] This parameter is deprecated and will be removed in a future version. ', json_schema_extra={'deprecated': True}), 'runtime_env': FieldInfo(annotation=Union[Dict[str, Any], NoneType], required=False, description='The runtime environment to use for the offline processing.'), 'task_type': FieldInfo(annotation=vLLMTaskType, required=False, default=<vLLMTaskType.GENERATE: 'generate'>, description="The task type to use. If not specified, will use 'generate' by default."), 'tokenize': FieldInfo(annotation=bool, required=False, default=True, description='Whether to tokenize the input before passing it to the backend engine. If not, the backend engine will tokenize the prompt.')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.