Python SDK API Reference#

For an overview with examples see Ray Jobs.

For the CLI reference see Ray Job Submission CLI Reference.

JobSubmissionClient#

ray.job_submission.JobSubmissionClient

A local client for submitting and interacting with jobs on a remote cluster.

ray.job_submission.JobSubmissionClient.submit_job

Submit and execute a job asynchronously.

ray.job_submission.JobSubmissionClient.stop_job

Request a job to exit asynchronously.

ray.job_submission.JobSubmissionClient.get_job_status

Get the most recent status of a job.

ray.job_submission.JobSubmissionClient.get_job_info

Get the latest status and other information associated with a job.

ray.job_submission.JobSubmissionClient.list_jobs

List all jobs along with their status and other information.

ray.job_submission.JobSubmissionClient.get_job_logs

Get all logs produced by a job.

ray.job_submission.JobSubmissionClient.tail_job_logs

Get an iterator that follows the logs of a job.

class ray.job_submission.JobSubmissionClient(address: Optional[str] = None, create_cluster_if_needed: bool = False, cookies: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, Any]] = None)[source]#

A local client for submitting and interacting with jobs on a remote cluster.

Submits requests over HTTP to the job server on the cluster using the REST API.

Parameters
  • address – Either (1) the address of the Ray cluster, or (2) the HTTP address of the dashboard server on the head node, e.g. β€œhttp://<head-node-ip>:8265”. In case (1) it must be specified as an address that can be passed to ray.init(), e.g. a Ray Client address (ray://<head_node_host>:10001), or β€œauto”, or β€œlocalhost:<port>”. If unspecified, will try to connect to a running local Ray cluster. This argument is always overridden by the RAY_ADDRESS environment variable.

  • create_cluster_if_needed – Indicates whether the cluster at the specified address needs to already be running. Ray doesn’t start a cluster before interacting with jobs, but third-party job managers may do so.

  • cookies – Cookies to use when sending requests to the HTTP job server.

  • metadata – Arbitrary metadata to store along with all jobs. New metadata specified per job will be merged with the global metadata provided here via a simple dict update.

  • headers – Headers to use when sending requests to the HTTP job server, used for cases like authentication to a remote cluster.

submit_job(*, entrypoint: str, job_id: Optional[str] = None, runtime_env: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, str]] = None, submission_id: Optional[str] = None, entrypoint_num_cpus: Optional[Union[int, float]] = None, entrypoint_num_gpus: Optional[Union[int, float]] = None, entrypoint_resources: Optional[Dict[str, float]] = None) str[source]#

Submit and execute a job asynchronously.

When a job is submitted, it runs once to completion or failure. Retries or different runs with different parameters should be handled by the submitter. Jobs are bound to the lifetime of a Ray cluster, so if the cluster goes down, all running jobs on that cluster will be terminated.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") 
>>> client.submit_job( 
...     entrypoint="python script.py",
...     runtime_env={
...         "working_dir": "./",
...         "pip": ["requests==2.26.0"]
...     }
... )  
'raysubmit_4LamXRuQpYdSMg7J'
Parameters
  • entrypoint – The shell command to run for this job.

  • submission_id – A unique ID for this job.

  • runtime_env – The runtime environment to install and run this job in.

  • metadata – Arbitrary data to store along with this job.

  • job_id – DEPRECATED. This has been renamed to submission_id

  • entrypoint_num_cpus – The quantity of CPU cores to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0.

  • entrypoint_num_gpus – The quantity of GPUs to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it. Defaults to 0.

  • entrypoint_resources – The quantity of custom resources to reserve for the execution of the entrypoint command, separately from any tasks or actors launched by it.

Returns

The submission ID of the submitted job. If not specified, this is a randomly generated unique ID.

Raises

RuntimeError – If the request to the job server fails, or if the specified submission_id has already been used by a job on this cluster.

PublicAPI: This API is stable across Ray releases.

stop_job(job_id: str) bool[source]#

Request a job to exit asynchronously.

Attempts to terminate process first, then kills process after timeout.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") 
>>> sub_id = client.submit_job(entrypoint="sleep 10") 
>>> client.stop_job(sub_id) 
True
Parameters

job_id – The job ID or submission ID for the job to be stopped.

Returns

True if the job was running, otherwise False.

Raises

RuntimeError – If the job does not exist or if the request to the job server fails.

PublicAPI: This API is stable across Ray releases.

delete_job(job_id: str) bool[source]#

Delete a job in a terminal state and all of its associated data.

If the job is not already in a terminal state, raises an error. This does not delete the job logs from disk. Submitting a job with the same submission ID as a previously deleted job is not supported and may lead to unexpected behavior.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient() 
>>> job_id = client.submit_job(entrypoint="echo hello") 
>>> client.delete_job(job_id) 
True
Parameters

job_id – submission ID for the job to be deleted.

Returns

True if the job was deleted, otherwise False.

Raises

RuntimeError – If the job does not exist, if the request to the job server fails, or if the job is not in a terminal state.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

get_job_info(job_id: str) ray.dashboard.modules.job.pydantic_models.JobDetails[source]#

Get the latest status and other information associated with a job.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") 
>>> submission_id = client.submit_job(entrypoint="sleep 1") 
>>> job_submission_client.get_job_info(submission_id) 
JobInfo(status='SUCCEEDED', message='Job finished successfully.',
error_type=None, start_time=1647388711, end_time=1647388712,
metadata={}, runtime_env={})
Parameters

job_id – The job ID or submission ID of the job whose information is being requested.

Returns

The JobInfo for the job.

Raises

RuntimeError – If the job does not exist or if the request to the job server fails.

PublicAPI: This API is stable across Ray releases.

list_jobs() List[ray.dashboard.modules.job.pydantic_models.JobDetails][source]#

List all jobs along with their status and other information.

Lists all jobs that have ever run on the cluster, including jobs that are currently running and jobs that are no longer running.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") 
>>> client.submit_job(entrypoint="echo hello") 
>>> client.submit_job(entrypoint="sleep 2") 
>>> client.list_jobs() 
[JobDetails(status='SUCCEEDED',
job_id='03000000', type='submission',
submission_id='raysubmit_4LamXRuQpYdSMg7J',
message='Job finished successfully.', error_type=None,
start_time=1647388711, end_time=1647388712, metadata={}, runtime_env={}),
JobDetails(status='RUNNING',
job_id='04000000', type='submission',
submission_id='raysubmit_1dxCeNvG1fCMVNHG',
message='Job is currently running.', error_type=None,
start_time=1647454832, end_time=None, metadata={}, runtime_env={})]
Returns

A dictionary mapping job_ids to their information.

Raises

RuntimeError – If the request to the job server fails.

PublicAPI: This API is stable across Ray releases.

get_job_status(job_id: str) ray.dashboard.modules.job.common.JobStatus[source]#

Get the most recent status of a job.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") 
>>> client.submit_job(entrypoint="echo hello") 
>>> client.get_job_status("raysubmit_4LamXRuQpYdSMg7J") 
'SUCCEEDED'
Parameters

job_id – The job ID or submission ID of the job whose status is being requested.

Returns

The JobStatus of the job.

Raises

RuntimeError – If the job does not exist or if the request to the job server fails.

PublicAPI: This API is stable across Ray releases.

get_job_logs(job_id: str) str[source]#

Get all logs produced by a job.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") 
>>> sub_id = client.submit_job(entrypoint="echo hello") 
>>> client.get_job_logs(sub_id) 
'hello\n'
Parameters

job_id – The job ID or submission ID of the job whose logs are being requested.

Returns

A string containing the full logs of the job.

Raises

RuntimeError – If the job does not exist or if the request to the job server fails.

PublicAPI: This API is stable across Ray releases.

async tail_job_logs(job_id: str) Iterator[str][source]#

Get an iterator that follows the logs of a job.

Example

>>> from ray.job_submission import JobSubmissionClient
>>> client = JobSubmissionClient("http://127.0.0.1:8265") 
>>> submission_id = client.submit_job( 
...     entrypoint="echo hi && sleep 5 && echo hi2")
>>> async for lines in client.tail_job_logs( 
...           'raysubmit_Xe7cvjyGJCyuCvm2'):
...     print(lines, end="") 
hi
hi2
Parameters

job_id – The job ID or submission ID of the job whose logs are being requested.

Returns

The iterator.

Raises

RuntimeError – If the job does not exist or if the request to the job server fails.

PublicAPI: This API is stable across Ray releases.

JobStatus#

class ray.job_submission.JobStatus(value)[source]#

An enumeration for describing the status of a job.

PublicAPI: This API is stable across Ray releases.

PENDING = 'PENDING'#

The job has not started yet, likely waiting for the runtime_env to be set up.

RUNNING = 'RUNNING'#

The job is currently running.

STOPPED = 'STOPPED'#

The job was intentionally stopped by the user.

SUCCEEDED = 'SUCCEEDED'#

The job finished successfully.

FAILED = 'FAILED'#

The job failed.

is_terminal() bool[source]#

Return whether or not this status is terminal.

A terminal status is one that cannot transition to any other status. The terminal statuses are β€œSTOPPED”, β€œSUCCEEDED”, and β€œFAILED”.

Returns

True if this status is terminal, otherwise False.

JobInfo#

class ray.job_submission.JobInfo(status: ray.dashboard.modules.job.common.JobStatus, entrypoint: str, message: Optional[str] = None, error_type: Optional[str] = None, start_time: Optional[int] = None, end_time: Optional[int] = None, metadata: Optional[Dict[str, str]] = None, runtime_env: Optional[Dict[str, Any]] = None, entrypoint_num_cpus: Optional[Union[int, float]] = None, entrypoint_num_gpus: Optional[Union[int, float]] = None, entrypoint_resources: Optional[Dict[str, float]] = None, driver_agent_http_address: Optional[str] = None, driver_node_id: Optional[str] = None)[source]#

A class for recording information associated with a job and its execution.

PublicAPI: This API is stable across Ray releases.

status: ray.dashboard.modules.job.common.JobStatus#

The status of the job.

entrypoint: str#

The entrypoint command for this job.

message: Optional[str] = None#

A message describing the status in more detail.

start_time: Optional[int] = None#

The time when the job was started. A Unix timestamp in ms.

end_time: Optional[int] = None#

The time when the job moved into a terminal state. A Unix timestamp in ms.

metadata: Optional[Dict[str, str]] = None#

Arbitrary user-provided metadata for the job.

runtime_env: Optional[Dict[str, Any]] = None#

The runtime environment for the job.

entrypoint_num_cpus: Optional[Union[int, float]] = None#

The quantity of CPU cores to reserve for the entrypoint command.

entrypoint_num_gpus: Optional[Union[int, float]] = None#

The number of GPUs to reserve for the entrypoint command.

entrypoint_resources: Optional[Dict[str, float]] = None#

The quantity of various custom resources to reserve for the entrypoint command.

driver_agent_http_address: Optional[str] = None#

Driver agent http address

to_json() Dict[str, Any][source]#

Convert this object to a JSON-serializable dictionary.

Returns

A JSON-serializable dictionary representing the JobInfo object.

classmethod from_json(json_dict: Dict[str, Any]) None[source]#

Initialize this object from a JSON dictionary.

Parameters

json_dict – A JSON dictionary to use to initialize the JobInfo object.

JobDetails#

pydantic model ray.job_submission.JobDetails[source]#

Job data with extra details about its driver and its submission.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Show JSON schema
{
   "title": "JobDetails",
   "description": "    Job data with extra details about its driver and its submission.\n\n**PublicAPI (beta):** This API is in beta and may change before becoming stable.",
   "type": "object",
   "properties": {
      "type": {
         "description": "The type of job.",
         "allOf": [
            {
               "$ref": "#/definitions/JobType"
            }
         ]
      },
      "entrypoint": {
         "title": "Entrypoint",
         "description": "The entrypoint command for this job.",
         "type": "string"
      },
      "job_id": {
         "title": "Job Id",
         "description": "The job ID. An ID that is created for every job that is launched in Ray. This can be used to fetch data about jobs using Ray Core APIs.",
         "type": "string"
      },
      "submission_id": {
         "title": "Submission Id",
         "description": "A submission ID is an ID created for every job submitted viathe Ray Jobs API. It can be used to fetch data about jobs using the Ray Jobs API.",
         "type": "string"
      },
      "driver_info": {
         "title": "Driver Info",
         "description": "The driver related to this job. For jobs submitted via the Ray Jobs API, it is the last driver launched by that job submission, or None if there is no driver.",
         "allOf": [
            {
               "$ref": "#/definitions/DriverInfo"
            }
         ]
      },
      "status": {
         "description": "The status of the job.",
         "allOf": [
            {
               "$ref": "#/definitions/JobStatus"
            }
         ]
      },
      "message": {
         "title": "Message",
         "description": "A message describing the status in more detail.",
         "type": "string"
      },
      "error_type": {
         "title": "Error Type",
         "description": "Internal error or user script error.",
         "type": "string"
      },
      "start_time": {
         "title": "Start Time",
         "description": "The time when the job was started. A Unix timestamp in ms.",
         "type": "integer"
      },
      "end_time": {
         "title": "End Time",
         "description": "The time when the job moved into a terminal state. A Unix timestamp in ms.",
         "type": "integer"
      },
      "metadata": {
         "title": "Metadata",
         "description": "Arbitrary user-provided metadata for the job.",
         "type": "object",
         "additionalProperties": {
            "type": "string"
         }
      },
      "runtime_env": {
         "title": "Runtime Env",
         "description": "The runtime environment for the job.",
         "type": "object"
      },
      "driver_agent_http_address": {
         "title": "Driver Agent Http Address",
         "description": "The HTTP address of the JobAgent on the node the job entrypoint command is running on.",
         "type": "string"
      },
      "driver_node_id": {
         "title": "Driver Node Id",
         "description": "The node ID of the node the job entrypoint command is running on.",
         "type": "string"
      }
   },
   "required": [
      "type",
      "entrypoint",
      "status"
   ],
   "definitions": {
      "JobType": {
         "title": "JobType",
         "description": "An enumeration for describing the different job types.\n\n**PublicAPI (beta):** This API is in beta and may change before becoming stable.",
         "enum": [
            "SUBMISSION",
            "DRIVER"
         ],
         "type": "string"
      },
      "DriverInfo": {
         "title": "DriverInfo",
         "description": "A class for recording information about the driver related to the job.\n\n**PublicAPI (beta):** This API is in beta and may change before becoming stable.",
         "type": "object",
         "properties": {
            "id": {
               "title": "Id",
               "description": "The id of the driver",
               "type": "string"
            },
            "node_ip_address": {
               "title": "Node Ip Address",
               "description": "The IP address of the node the driver is running on.",
               "type": "string"
            },
            "pid": {
               "title": "Pid",
               "description": "The PID of the worker process the driver is using.",
               "type": "string"
            }
         },
         "required": [
            "id",
            "node_ip_address",
            "pid"
         ]
      },
      "JobStatus": {
         "title": "JobStatus",
         "description": "An enumeration for describing the status of a job.\n\nPublicAPI: This API is stable across Ray releases.",
         "enum": [
            "PENDING",
            "RUNNING",
            "STOPPED",
            "SUCCEEDED",
            "FAILED"
         ],
         "type": "string"
      }
   }
}

Fields
  • driver_agent_http_address (Optional[str])

  • driver_info (Optional[ray.dashboard.modules.job.pydantic_models.DriverInfo])

  • driver_node_id (Optional[str])

  • end_time (Optional[int])

  • entrypoint (str)

  • error_type (Optional[str])

  • job_id (Optional[str])

  • message (Optional[str])

  • metadata (Optional[Dict[str, str]])

  • runtime_env (Optional[Dict[str, Any]])

  • start_time (Optional[int])

  • status (ray.dashboard.modules.job.common.JobStatus)

  • submission_id (Optional[str])

  • type (ray.dashboard.modules.job.pydantic_models.JobType)

field driver_agent_http_address: Optional[str] = None#

The HTTP address of the JobAgent on the node the job entrypoint command is running on.

field driver_info: Optional[ray.dashboard.modules.job.pydantic_models.DriverInfo] = None#

The driver related to this job. For jobs submitted via the Ray Jobs API, it is the last driver launched by that job submission, or None if there is no driver.

field driver_node_id: Optional[str] = None#

The node ID of the node the job entrypoint command is running on.

field end_time: Optional[int] = None#

The time when the job moved into a terminal state. A Unix timestamp in ms.

field entrypoint: str [Required]#

The entrypoint command for this job.

field error_type: Optional[str] = None#

Internal error or user script error.

field job_id: Optional[str] = None#

The job ID. An ID that is created for every job that is launched in Ray. This can be used to fetch data about jobs using Ray Core APIs.

field message: Optional[str] = None#

A message describing the status in more detail.

field metadata: Optional[Dict[str, str]] = None#

Arbitrary user-provided metadata for the job.

field runtime_env: Optional[Dict[str, Any]] = None#

The runtime environment for the job.

field start_time: Optional[int] = None#

The time when the job was started. A Unix timestamp in ms.

field status: ray.dashboard.modules.job.common.JobStatus [Required]#

The status of the job.

field submission_id: Optional[str] = None#

A submission ID is an ID created for every job submitted viathe Ray Jobs API. It can be used to fetch data about jobs using the Ray Jobs API.

field type: ray.dashboard.modules.job.pydantic_models.JobType [Required]#

The type of job.

JobType#

class ray.job_submission.JobType(value)[source]

An enumeration for describing the different job types.

PublicAPI (beta): This API is in beta and may change before becoming stable.

SUBMISSION = 'SUBMISSION'

A job that was initiated by the Ray Jobs API.

DRIVER = 'DRIVER'

A job that was initiated by a driver script.

DriverInfo#

pydantic model ray.job_submission.DriverInfo[source]#

A class for recording information about the driver related to the job.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Show JSON schema
{
   "title": "DriverInfo",
   "description": "A class for recording information about the driver related to the job.\n\n**PublicAPI (beta):** This API is in beta and may change before becoming stable.",
   "type": "object",
   "properties": {
      "id": {
         "title": "Id",
         "description": "The id of the driver",
         "type": "string"
      },
      "node_ip_address": {
         "title": "Node Ip Address",
         "description": "The IP address of the node the driver is running on.",
         "type": "string"
      },
      "pid": {
         "title": "Pid",
         "description": "The PID of the worker process the driver is using.",
         "type": "string"
      }
   },
   "required": [
      "id",
      "node_ip_address",
      "pid"
   ]
}

Fields
  • id (str)

  • node_ip_address (str)

  • pid (str)

field id: str [Required]#

The id of the driver

field node_ip_address: str [Required]#

The IP address of the node the driver is running on.

field pid: str [Required]#

The PID of the worker process the driver is using.