ray.data.llm.SGLangEngineProcessorConfig#
- class ray.data.llm.SGLangEngineProcessorConfig(*, batch_size: int = 32, resources_per_bundle: ~typing.Dict[str, float] | None = None, accelerator_type: str | None = None, concurrency: int | ~typing.Tuple[int, int] = 1, experimental: ~typing.Dict[str, ~typing.Any] = <factory>, model_source: str, runtime_env: ~typing.Dict[str, ~typing.Any] | None = None, max_pending_requests: int | None = None, max_concurrent_batches: int = 8, max_tasks_in_flight_per_actor: int | None = None, should_continue_on_error: bool = False, apply_chat_template: bool = True, chat_template: str | None = None, tokenize: bool = True, detokenize: bool = True, chat_template_stage: ~typing.Any = True, tokenize_stage: ~typing.Any = True, detokenize_stage: ~typing.Any = True, prepare_multimodal_stage: ~typing.Any = False, engine_kwargs: ~typing.Dict[str, ~typing.Any] = <factory>, task_type: ~typing.Literal['generate'] = 'generate')[source]#
The configuration for the SGLang engine processor.
- Parameters:
model_source – The model source to use for the SGLang engine.
batch_size – The batch size to send to the SGLang engine. Large batch sizes are likely to saturate the compute resources and could achieve higher throughput. On the other hand, small batch sizes are more fault-tolerant and could reduce bubbles in the data pipeline. You can tune the batch size to balance the throughput and fault-tolerance based on your use case.
engine_kwargs – The kwargs to pass to the SGLang engine. Default engine kwargs are tp_size: 1, dp_size: 1, skip_tokenizer_init: True.
task_type – The task type to use. If not specified, will use ‘generate’ by default.
runtime_env – The runtime environment to use for the SGLang engine. See this doc for more details.
max_pending_requests – The maximum number of pending requests. If not specified, will use the default value from the SGLang engine.
max_concurrent_batches – The maximum number of concurrent batches in the engine. This is to overlap the batch processing to avoid the tail latency of each batch. The default value may not be optimal when the batch size or the batch processing latency is too small, but it should be good enough for batch size >= 64. Sets the engine actor’s Ray Core
max_concurrency.max_tasks_in_flight_per_actor – Max tasks Ray Data submits concurrently to each engine actor. Passed through to
ray.data.ActorPoolStrategy. If unset, Ray Data usesray.data.DataContext.max_tasks_in_flight_per_actorif set globally. Otherwise, it defaults to2 * max_concurrent_batches; the factor can be overridden via theRAY_DATA_ACTOR_DEFAULT_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTORenv var.chat_template_stage – Chat templating stage config (bool | dict | ChatTemplateStageConfig). Defaults to True. Use nested config for per-stage control over batch_size, concurrency, runtime_env, num_cpus, and memory. Legacy
apply_chat_templateandchat_templatefields are deprecated but still supported.tokenize_stage – Tokenizer stage config (bool | dict | TokenizerStageConfig). Defaults to True. Use nested config for per-stage control over batch_size, concurrency, runtime_env, num_cpus, memory, and model_source. Legacy
tokenizefield is deprecated but still supported.detokenize_stage – Detokenizer stage config (bool | dict | DetokenizeStageConfig). Defaults to True. Use nested config for per-stage control over batch_size, concurrency, runtime_env, num_cpus, memory, and model_source. Legacy
detokenizefield is deprecated but still supported.accelerator_type – The accelerator type used by the LLM stage in a processor. Default to None, meaning that only the CPU will be used.
concurrency – The number of workers for data parallelism. Default to 1. If
concurrencyis a tuple(m, n), Ray creates an autoscaling actor pool that scales betweenmandnworkers (1 <= m <= n). Ifconcurrencyis anintn, both CPU and GPU stages use an autoscaling pool from(1, n). Stage-specific concurrency can be set via nested stage configs.
Examples
import ray from ray.data.llm import SGLangEngineProcessorConfig, build_processor config = SGLangEngineProcessorConfig( model_source="meta-llama/Meta-Llama-3.1-8B-Instruct", engine_kwargs=dict( dtype="half", ), concurrency=1, batch_size=64, ) processor = build_processor( config, preprocess=lambda row: dict( messages=[ {"role": "system", "content": "You are a calculator"}, {"role": "user", "content": f"{row['id']} ** 3 = ?"}, ], sampling_params=dict( temperature=0.3, max_new_tokens=20, ), ), postprocess=lambda row: dict( resp=row["generated_text"], ), ) ds = ray.data.range(300) ds = processor(ds) for row in ds.take_all(): print(row)
PublicAPI (beta): This API is in beta and may change before becoming stable.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#
Configuration for the model, should be a dictionary conforming to [
ConfigDict][pydantic.config.ConfigDict].