Deploying on Spark Standalone cluster#

This document describes a couple high-level steps to run Ray clusters on Spark Standalone cluster.

Running a basic example#

This is a spark application example code that starts Ray cluster on spark, and then execute ray application code, then shut down initiated ray cluster.

1) Create a python file that contains a spark application code, Assuming the python file name is ‘ray-on-spark-example1.py’.

from pyspark.sql import SparkSession
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Ray on spark example 1") \
        .config("spark.task.cpus", "4") \
        .getOrCreate()

    # Set up a ray cluster on this spark application, it creates a background
    # spark job that each spark task launches one ray worker node.
    # ray head node is launched in spark application driver side.
    # Resources (CPU / GPU / memory) allocated to each ray worker node is equal
    # to resources allocated to the corresponding spark task.
    setup_ray_cluster(max_worker_nodes=MAX_NUM_WORKER_NODES)

    # You can any ray application code here, the ray application will be executed
    # on the ray cluster setup above.
    # You don't need to set address for `ray.init`,
    # it will connect to the cluster created above automatically.
    ray.init()
    ...

    # Terminate ray cluster explicitly.
    # If you don't call it, when spark application is terminated, the ray cluster
    # will also be terminated.
    shutdown_ray_cluster()
  1. Submit the spark application above to spark standalone cluster.

#!/bin/bash
spark-submit \
  --master spark://{spark_master_IP}:{spark_master_port} \
  path/to/ray-on-spark-example1.py

Creating a long running ray cluster on spark cluster#

This is a spark application example code that starts a long running Ray cluster on spark. The created ray cluster can be accessed by remote python processes.

1) Create a python file that contains a spark application code, Assuming the python file name is ‘long-running-ray-cluster-on-spark.py’.

from pyspark.sql import SparkSession
import time
from ray.util.spark import setup_ray_cluster, MAX_NUM_WORKER_NODES

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("long running ray cluster on spark") \
        .config("spark.task.cpus", "4") \
        .getOrCreate()

    cluster_address = setup_ray_cluster(
        max_worker_nodes=MAX_NUM_WORKER_NODES
    )
    print("Ray cluster is set up, you can connect to this ray cluster "
          f"via address ray://{cluster_address}")

    # Sleep forever until the spark application being terminated,
    # at that time, the ray cluster will also be terminated.
    while True:
        time.sleep(10)
  1. Submit the spark application above to spark standalone cluster.

#!/bin/bash
spark-submit \
  --master spark://{spark_master_IP}:{spark_master_port} \
  path/to/long-running-ray-cluster-on-spark.py

Ray on Spark APIs#

ray.util.spark.setup_ray_cluster(*, max_worker_nodes: int, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, ray_temp_root_dir: str | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0, **kwargs) Tuple[str, str][source]#

Set up a ray cluster on the spark cluster by starting a ray head node in the spark application’s driver side node. After creating the head node, a background spark job is created that generates an instance of RayClusterOnSpark that contains configuration for the ray cluster that will run on the Spark cluster’s worker nodes. After a ray cluster is set up, “RAY_ADDRESS” environment variable is set to the cluster address, so you can call ray.init() without specifying ray cluster address to connect to the cluster. To shut down the cluster you can call ray.util.spark.shutdown_ray_cluster(). Note: If the active ray cluster haven’t shut down, you cannot create a new ray cluster.

Parameters:
  • max_worker_nodes – This argument represents maximum ray worker nodes to start for the ray cluster. you can specify the max_worker_nodes as ray.util.spark.MAX_NUM_WORKER_NODES represents a ray cluster configuration that will use all available resources configured for the spark application. To create a spark application that is intended to exclusively run a shared ray cluster in non-scaling, it is recommended to set this argument to ray.util.spark.MAX_NUM_WORKER_NODES.

  • min_worker_nodes – Minimal number of worker nodes (default None), if “max_worker_nodes” value is equal to “min_worker_nodes” argument, or “min_worker_nodes” argument value is None, then autoscaling is disabled and Ray cluster is launched with fixed number “max_worker_nodes” of Ray worker nodes, otherwise autoscaling is enabled.

  • num_cpus_worker_node – Number of cpus available to per-ray worker node, if not provided, if spark stage scheduling is supported, ‘num_cpus_head_node’ value equals to number of cpu cores per spark worker node, otherwise it uses spark application configuration ‘spark.task.cpus’ instead. Limitation Only spark version >= 3.4 or Databricks Runtime 12.x supports setting this argument.

  • num_cpus_head_node – Number of cpus available to Ray head node, if not provide, if it is global mode Ray cluster, use number of cpu cores in spark driver node, otherwise use 0 instead. use 0 instead. Number 0 means tasks requiring CPU resources are not scheduled to Ray head node.

  • num_gpus_worker_node – Number of gpus available to per-ray worker node, if not provided, if spark stage scheduling is supported, ‘num_gpus_worker_node’ value equals to number of GPUs per spark worker node, otherwise it uses spark application configuration ‘spark.task.resource.gpu.amount’ instead. This argument is only available on spark cluster that is configured with ‘gpu’ resources. Limitation Only spark version >= 3.4 or Databricks Runtime 12.x supports setting this argument.

  • num_gpus_head_node – Number of gpus available to Ray head node, if not provide, if it is global mode Ray cluster, use number of GPUs in spark driver node, otherwise use 0 instead. This argument is only available on spark cluster which spark driver node has GPUs.

  • memory_worker_node – Optional[int]: Heap memory configured for Ray worker node. This is basically setting --memory option when starting Ray node by ray start command.

  • memory_head_node – Optional[int]: Heap memory configured for Ray head node. This is basically setting --memory option when starting Ray node by ray start command.

  • object_store_memory_worker_node – Object store memory available to per-ray worker node, but it is capped by “dev_shm_available_size * 0.8 / num_tasks_per_spark_worker”. The default value equals to “0.3 * spark_worker_physical_memory * 0.8 / num_tasks_per_spark_worker”.

  • object_store_memory_head_node – Object store memory available to Ray head node, but it is capped by “dev_shm_available_size * 0.8”. The default value equals to “0.3 * spark_driver_physical_memory * 0.8”.

  • head_node_options – A dict representing Ray head node extra options, these options will be passed to ray start script. Note you need to convert ray start options key from --foo-bar format to foo_bar format. For flag options (e.g. ‘–disable-usage-stats’), you should set the value to None in the option dict, like {"disable_usage_stats": None}. Note: Short name options (e.g. ‘-v’) are not supported.

  • worker_node_options – A dict representing Ray worker node extra options, these options will be passed to ray start script. Note you need to convert ray start options key from --foo-bar format to foo_bar format. For flag options (e.g. ‘–disable-usage-stats’), you should set the value to None in the option dict, like {"disable_usage_stats": None}. Note: Short name options (e.g. ‘-v’) are not supported.

  • ray_temp_root_dir – A local disk path to store the ray temporary data. The created cluster will create a subdirectory “ray-{head_port}-{random_suffix}” beneath this path.

  • strict_mode – Boolean flag to fast-fail initialization of the ray cluster if the available spark cluster does not have sufficient resources to fulfill the resource allocation for memory, cpu and gpu. When set to true, if the requested resources are not available for recommended minimum recommended functionality, an exception will be raised that details the inadequate spark cluster configuration settings. If overridden as False, a warning is raised.

  • collect_log_to_path – If specified, after ray head / worker nodes terminated, collect their logs to the specified path. On Databricks Runtime, we recommend you to specify a local path starts with ‘/dbfs/’, because the path mounts with a centralized storage device and stored data is persisted after Databricks spark cluster terminated.

  • autoscale_upscaling_speed – If autoscale enabled, it represents the number of nodes allowed to be pending as a multiple of the current number of nodes. The higher the value, the more aggressive upscaling will be. For example, if this is set to 1.0, the cluster can grow in size by at most 100% at any time, so if the cluster currently has 20 nodes, at most 20 pending launches are allowed. The minimum number of pending launches is 5 regardless of this setting. Default value is 1.0, minimum value is 1.0

  • autoscale_idle_timeout_minutes – If autoscale enabled, it represents the number of minutes that need to pass before an idle worker node is removed by the autoscaler. The smaller the value, the more aggressive downscaling will be. Worker nodes are considered idle when they hold no active tasks, actors, or referenced objects (either in-memory or spilled to disk). This parameter does not affect the head node. Default value is 1.0, minimum value is 0

Returns:

returns a tuple of (address, remote_connection_address) “address” is in format of “<ray_head_node_ip>:<port>” “remote_connection_address” is in format of “ray://<ray_head_node_ip>:<ray-client-server-port>”, if your client runs on a machine that also hosts a Ray cluster node locally, you can connect to the Ray cluster via ray.init(address), otherwise you can connect to the Ray cluster via ray.init(remote_connection_address).

ray.util.spark.shutdown_ray_cluster() None[source]#

Shut down the active ray cluster.

ray.util.spark.setup_global_ray_cluster(*, max_worker_nodes: int, is_blocking: bool = True, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0)[source]#

Set up a global mode cluster. The global Ray on spark cluster means: - You can only create one active global Ray on spark cluster at a time. On databricks cluster, the global Ray cluster can be used by all users, - as contrast, non-global Ray cluster can only be used by current notebook user. - It is up persistently without automatic shutdown. - On databricks notebook, you can connect to the global cluster by calling ray.init() without specifying its address, it will discover the global cluster automatically if it is up.

For global mode, the ray_temp_root_dir argument is not supported. Global model Ray cluster always use the default Ray temporary directory path.

All arguments are the same with setup_ray_cluster API except that: - the ray_temp_root_dir argument is not supported. Global model Ray cluster always use the default Ray temporary directory path. - A new argument “is_blocking” (default True) is added. If “is_blocking” is True, then keep the call blocking until it is interrupted. once the call is interrupted, the global Ray on spark cluster is shut down and setup_global_ray_cluster call terminates. If “is_blocking” is False, once Ray cluster setup completes, return immediately.