ray.data.llm.build_llm_processor#

ray.data.llm.build_llm_processor(config: ProcessorConfig, preprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None, postprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None) Processor[source]#

Build a LLM processor using the given config.

Parameters:
  • config – The processor config.

  • preprocess – An optional lambda function that takes a row (dict) as input and returns a preprocessed row (dict). The output row must contain the required fields for the following processing stages. Each row can contain a sampling_params field which will be used by the engine for row-specific sampling parameters. Note that all columns will be carried over until the postprocess stage.

  • postprocess – An optional lambda function that takes a row (dict) as input and returns a postprocessed row (dict). To keep all the original columns, you can use the **row syntax to return all the original columns.

Returns:

The built processor.

Example

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model="meta-llama/Meta-Llama-3.1-8B-Instruct",
    engine_kwargs=dict(
        enable_prefix_caching=True,
        enable_chunked_prefill=True,
        max_num_batched_tokens=4096,
    ),
    concurrency=1,
    batch_size=64,
)

processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a calculator"},
            {"role": "user", "content": f"{row['id']} ** 3 = ?"},
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=20,
            detokenize=False,
        ),
    ),
    postprocess=lambda row: dict(
        resp=row["generated_text"],
        **row,  # This will return all the original columns in the dataset.
    ),
)

ds = ray.data.range(300)
ds = processor(ds)
for row in ds.take_all():
    print(row)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.