ray.data.llm.build_llm_processor#

ray.data.llm.build_llm_processor(config: ProcessorConfig, preprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None, postprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None, builder_kwargs: Dict[str, Any] | None = None) Processor[source]#

Build a LLM processor using the given config.

Parameters:
  • config – The processor config.

  • preprocess – An optional lambda function that takes a row (dict) as input and returns a preprocessed row (dict). The output row must contain the required fields for the following processing stages. Each row can contain a sampling_params field which will be used by the engine for row-specific sampling parameters. Note that all columns will be carried over until the postprocess stage.

  • postprocess – An optional lambda function that takes a row (dict) as input and returns a postprocessed row (dict). To keep all the original columns, you can use the **row syntax to return all the original columns.

  • builder_kwargs – Optional additional kwargs to pass to the processor builder function. These will be passed through to the registered builder and should match the signature of the specific builder being used. For example, vLLM and SGLang processors support chat_template_kwargs.

Returns:

The built processor.

Examples

Basic usage:

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="meta-llama/Meta-Llama-3.1-8B-Instruct",
    engine_kwargs=dict(
        enable_prefix_caching=True,
        enable_chunked_prefill=True,
        max_num_batched_tokens=4096,
    ),
    concurrency=1,
    batch_size=64,
)

processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a calculator"},
            {"role": "user", "content": f"{row['id']} ** 3 = ?"},
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=20,
            detokenize=False,
        ),
    ),
    postprocess=lambda row: dict(
        resp=row["generated_text"],
        **row,  # This will return all the original columns in the dataset.
    ),
)

ds = ray.data.range(300)
ds = processor(ds)
for row in ds.take_all():
    print(row)

Using builder_kwargs to pass chat_template_kwargs:

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="Qwen/Qwen3-0.6B",
    apply_chat_template=True,
    concurrency=1,
    batch_size=64,
)

processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "user", "content": row["prompt"]},
        ],
        sampling_params=dict(
            temperature=0.6,
            max_tokens=100,
        ),
    ),
    builder_kwargs=dict(
        chat_template_kwargs={"enable_thinking": True},
    ),
)

ds = ray.data.from_items([{"prompt": "What is 2+2?"}])
ds = processor(ds)
for row in ds.take_all():
    print(row)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.