ray.data.llm.HttpRequestProcessorConfig#

class ray.data.llm.HttpRequestProcessorConfig(*, batch_size: int = 64, resources_per_bundle: Dict[str, float] | None = None, accelerator_type: str | None = None, concurrency: int | Tuple[int, int] = 1, experimental: Dict[str, Any] = None, url: str, headers: Dict[str, Any] | None = None, qps: int | None = None, max_retries: int = 0, base_retry_wait_time_in_s: float = 1, session_factory: Any | None = None)[source]#

The configuration for the HTTP request processor.

Parameters:
  • batch_size – The batch size to send to the HTTP request.

  • url – The URL to send the HTTP request to.

  • headers – The headers to send with the HTTP request.

  • concurrency – The number of concurrent requests to send. Default to 1. If concurrency is a tuple (m, n), autoscaling strategy is used (1 <= m <= n).

Examples

import ray
from ray.data.llm import HttpRequestProcessorConfig, build_llm_processor

config = HttpRequestProcessorConfig(
    url="https://api.openai.com/v1/chat/completions",
    headers={"Authorization": "Bearer sk-..."},
    concurrency=1,
)
processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        payload=dict(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a calculator"},
                {"role": "user", "content": f"{row['id']} ** 3 = ?"},
            ],
            temperature=0.3,
            max_tokens=20,
        ),
    ),
    postprocess=lambda row: dict(
        resp=row["http_response"]["choices"][0]["message"]["content"],
    ),
)

ds = ray.data.range(10)
ds = processor(ds)
for row in ds.take_all():
    print(row)

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'accelerator_type': FieldInfo(annotation=Union[str, NoneType], required=False, description='The accelerator type used by the LLM stage in a processor. Default to None, meaning that only the CPU will be used.'), 'base_retry_wait_time_in_s': FieldInfo(annotation=float, required=False, default=1, description='The base wait time for a retry during exponential backoff.'), 'batch_size': FieldInfo(annotation=int, required=False, default=64, description='The batch size.'), 'concurrency': FieldInfo(annotation=Union[int, Tuple[int, int]], required=False, default=1, description='The number of workers for data parallelism. Default to 1. If ``concurrency`` is a ``tuple`` ``(m, n)``, Ray creates an autoscaling actor pool that scales between ``m`` and ``n`` workers (``1 <= m <= n``). If ``concurrency`` is an ``int`` ``n``, Ray uses either a fixed pool of ``n`` workers or an autoscaling pool from ``1`` to ``n`` workers, depending on the processor and stage.'), 'experimental': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='[Experimental] Experimental configurations.Supported keys:\n`max_tasks_in_flight_per_actor`: The maximum number of tasks in flight per actor. Default to 4.'), 'headers': FieldInfo(annotation=Union[Dict[str, Any], NoneType], required=False, description="The query header. Note that we will add 'Content-Type: application/json' to be the header for sure because we only deal with requests body in JSON."), 'max_retries': FieldInfo(annotation=int, required=False, default=0, description='The maximum number of retries per request in the event of failures.'), 'qps': FieldInfo(annotation=Union[int, NoneType], required=False, description='The maximum number of requests per second to avoid rate limit. If None, the request will be sent sequentially.'), 'resources_per_bundle': FieldInfo(annotation=Union[Dict[str, float], NoneType], required=False, description='[DEPRECATED] This parameter is deprecated and will be removed in a future version. ', json_schema_extra={'deprecated': True}), 'session_factory': FieldInfo(annotation=Union[Any, NoneType], required=False, description='Optional session factory to be used for initializing a client session. Type: Callable[[], ClientSession]', exclude=True), 'url': FieldInfo(annotation=str, required=True, description='The URL to query.')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.