ray.data.llm.build_llm_processor#
- ray.data.llm.build_llm_processor(config: ProcessorConfig, preprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None, postprocess: Callable[[T], U] | Callable[[T], Iterator[U]] | _CallableClassProtocol | None = None) Processor [source]#
Build a LLM processor using the given config.
- Parameters:
config – The processor config.
preprocess – An optional lambda function that takes a row (dict) as input and returns a preprocessed row (dict). The output row must contain the required fields for the following processing stages. Each row can contain a
sampling_params
field which will be used by the engine for row-specific sampling parameters. Note that all columns will be carried over until the postprocess stage.postprocess – An optional lambda function that takes a row (dict) as input and returns a postprocessed row (dict). To keep all the original columns, you can use the
**row
syntax to return all the original columns.
- Returns:
The built processor.
Example
import ray from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor config = vLLMEngineProcessorConfig( model="meta-llama/Meta-Llama-3.1-8B-Instruct", engine_kwargs=dict( enable_prefix_caching=True, enable_chunked_prefill=True, max_num_batched_tokens=4096, ), concurrency=1, batch_size=64, ) processor = build_llm_processor( config, preprocess=lambda row: dict( messages=[ {"role": "system", "content": "You are a calculator"}, {"role": "user", "content": f"{row['id']} ** 3 = ?"}, ], sampling_params=dict( temperature=0.3, max_tokens=20, detokenize=False, ), ), postprocess=lambda row: dict( resp=row["generated_text"], **row, # This will return all the original columns in the dataset. ), ) ds = ray.data.range(300) ds = processor(ds) for row in ds.take_all(): print(row)
PublicAPI (alpha): This API is in alpha and may change before becoming stable.