ray.data.llm.build_llm_processor#
- ray.data.llm.build_llm_processor(config: ProcessorConfig, preprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None, postprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None, builder_kwargs: Dict[str, Any] | None = None) Processor [source]#
Build a LLM processor using the given config.
- Parameters:
config – The processor config.
preprocess – An optional lambda function that takes a row (dict) as input and returns a preprocessed row (dict). The output row must contain the required fields for the following processing stages. Each row can contain a
sampling_params
field which will be used by the engine for row-specific sampling parameters. Note that all columns will be carried over until the postprocess stage.postprocess – An optional lambda function that takes a row (dict) as input and returns a postprocessed row (dict). To keep all the original columns, you can use the
**row
syntax to return all the original columns.builder_kwargs – Optional additional kwargs to pass to the processor builder function. These will be passed through to the registered builder and should match the signature of the specific builder being used. For example, vLLM and SGLang processors support
chat_template_kwargs
.
- Returns:
The built processor.
Examples
Basic usage:
import ray from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor config = vLLMEngineProcessorConfig( model_source="meta-llama/Meta-Llama-3.1-8B-Instruct", engine_kwargs=dict( enable_prefix_caching=True, enable_chunked_prefill=True, max_num_batched_tokens=4096, ), concurrency=1, batch_size=64, ) processor = build_llm_processor( config, preprocess=lambda row: dict( messages=[ {"role": "system", "content": "You are a calculator"}, {"role": "user", "content": f"{row['id']} ** 3 = ?"}, ], sampling_params=dict( temperature=0.3, max_tokens=20, detokenize=False, ), ), postprocess=lambda row: dict( resp=row["generated_text"], **row, # This will return all the original columns in the dataset. ), ) ds = ray.data.range(300) ds = processor(ds) for row in ds.take_all(): print(row)
Using builder_kwargs to pass chat_template_kwargs:
import ray from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor config = vLLMEngineProcessorConfig( model_source="Qwen/Qwen3-0.6B", apply_chat_template=True, concurrency=1, batch_size=64, ) processor = build_llm_processor( config, preprocess=lambda row: dict( messages=[ {"role": "user", "content": row["prompt"]}, ], sampling_params=dict( temperature=0.6, max_tokens=100, ), ), builder_kwargs=dict( chat_template_kwargs={"enable_thinking": True}, ), ) ds = ray.data.from_items([{"prompt": "What is 2+2?"}]) ds = processor(ds) for row in ds.take_all(): print(row)
PublicAPI (alpha): This API is in alpha and may change before becoming stable.