Source code for ray.autoscaler.sdk

"""IMPORTANT: this is an experimental interface and not currently stable."""

from contextlib import contextmanager
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
import json
import os
import tempfile

from ray.autoscaler._private import commands
from ray.autoscaler._private.event_system import (  # noqa: F401
    CreateClusterEvent,  # noqa: F401
    global_event_system)
from ray.autoscaler._private.cli_logger import cli_logger


def create_or_update_cluster(cluster_config: Union[dict, str],
                             *,
                             no_restart: bool = False,
                             restart_only: bool = False,
                             no_config_cache: bool = False) -> Dict[str, Any]:
    """Create or updates an autoscaling Ray cluster from a config json.

    Args:
        cluster_config (Union[str, dict]): Either the config dict of the
            cluster, or a path pointing to a file containing the config.
        no_restart (bool): Whether to skip restarting Ray services during the
            update. This avoids interrupting running jobs and can be used to
            dynamically adjust autoscaler configuration.
        restart_only (bool): Whether to skip running setup commands and only
            restart Ray. This cannot be used with 'no-restart'.
        no_config_cache (bool): Whether to disable the config cache and fully
            resolve all environment settings from the Cloud provider again.
    """
    with _as_config_file(cluster_config) as config_file:
        return commands.create_or_update_cluster(
            config_file=config_file,
            override_min_workers=None,
            override_max_workers=None,
            no_restart=no_restart,
            restart_only=restart_only,
            yes=True,
            override_cluster_name=None,
            no_config_cache=no_config_cache,
            redirect_command_output=None,
            use_login_shells=True)


def teardown_cluster(cluster_config: Union[dict, str],
                     workers_only: bool = False,
                     keep_min_workers: bool = False) -> None:
    """Destroys all nodes of a Ray cluster described by a config json.

    Args:
        cluster_config (Union[str, dict]): Either the config dict of the
            cluster, or a path pointing to a file containing the config.
        workers_only (bool): Whether to keep the head node running and only
            teardown worker nodes.
        keep_min_workers (bool): Whether to keep min_workers (as specified
            in the YAML) still running.
    """
    with _as_config_file(cluster_config) as config_file:
        return commands.teardown_cluster(
            config_file=config_file,
            yes=True,
            workers_only=workers_only,
            override_cluster_name=None,
            keep_min_workers=keep_min_workers)


def run_on_cluster(cluster_config: Union[dict, str],
                   *,
                   cmd: Optional[str] = None,
                   run_env: str = "auto",
                   tmux: bool = False,
                   stop: bool = False,
                   no_config_cache: bool = False,
                   port_forward: Optional[commands.Port_forward] = None,
                   with_output: bool = False) -> Optional[str]:
    """Runs a command on the specified cluster.

    Args:
        cluster_config (Union[str, dict]): Either the config dict of the
            cluster, or a path pointing to a file containing the config.
        cmd (str): the command to run, or None for a no-op command.
        run_env (str): whether to run the command on the host or in a
            container. Select between "auto", "host" and "docker".
        tmux (bool): whether to run in a tmux session
        stop (bool): whether to stop the cluster after command run
        no_config_cache (bool): Whether to disable the config cache and fully
            resolve all environment settings from the Cloud provider again.
        port_forward ( (int,int) or list[(int,int)]): port(s) to forward.
        with_output (bool): Whether to capture command output.

    Returns:
        The output of the command as a string.
    """
    with _as_config_file(cluster_config) as config_file:
        return commands.exec_cluster(
            config_file,
            cmd=cmd,
            run_env=run_env,
            screen=False,
            tmux=tmux,
            stop=stop,
            start=False,
            override_cluster_name=None,
            no_config_cache=no_config_cache,
            port_forward=port_forward,
            with_output=with_output)


def rsync(cluster_config: Union[dict, str],
          *,
          source: Optional[str],
          target: Optional[str],
          down: bool,
          ip_address: str = None,
          use_internal_ip: bool = False,
          no_config_cache: bool = False,
          should_bootstrap: bool = True):
    """Rsyncs files to or from the cluster.

    Args:
        cluster_config (Union[str, dict]): Either the config dict of the
            cluster, or a path pointing to a file containing the config.
        source (str): rsync source argument.
        target (str): rsync target argument.
        down (bool): whether we're syncing remote -> local.
        ip_address (str): Address of node.
        use_internal_ip (bool): Whether the provided ip_address is
            public or private.
        no_config_cache (bool): Whether to disable the config cache and fully
            resolve all environment settings from the Cloud provider again.
        should_bootstrap: whether to bootstrap cluster config before syncing

    Raises:
        RuntimeError if the cluster head node is not found.
    """
    with _as_config_file(cluster_config) as config_file:
        return commands.rsync(
            config_file=config_file,
            source=source,
            target=target,
            override_cluster_name=None,
            down=down,
            ip_address=ip_address,
            use_internal_ip=use_internal_ip,
            no_config_cache=no_config_cache,
            all_nodes=False,
            should_bootstrap=should_bootstrap)


def get_head_node_ip(cluster_config: Union[dict, str]) -> str:
    """Returns head node IP for given configuration file if exists.

    Args:
        cluster_config (Union[str, dict]): Either the config dict of the
            cluster, or a path pointing to a file containing the config.

    Returns:
        The ip address of the cluster head node.

    Raises:
        RuntimeError if the cluster is not found.
    """
    with _as_config_file(cluster_config) as config_file:
        return commands.get_head_node_ip(config_file)


def get_worker_node_ips(cluster_config: Union[dict, str]) -> List[str]:
    """Returns worker node IPs for given configuration file.

    Args:
        cluster_config (Union[str, dict]): Either the config dict of the
            cluster, or a path pointing to a file containing the config.

    Returns:
        List of worker node ip addresses.

    Raises:
        RuntimeError if the cluster is not found.
    """
    with _as_config_file(cluster_config) as config_file:
        return commands.get_worker_node_ips(config_file)


[docs]def request_resources(num_cpus: Optional[int] = None, bundles: Optional[List[dict]] = None) -> None: """Command the autoscaler to scale to accommodate the specified requests. The cluster will immediately attempt to scale to accommodate the requested resources, bypassing normal upscaling speed constraints. This takes into account existing resource usage. For example, suppose you call ``request_resources(num_cpus=100)`` and there are 45 currently running tasks, each requiring 1 CPU. Then, enough nodes will be added so up to 100 tasks can run concurrently. It does **not** add enough nodes so that 145 tasks can run. This call is only a hint to the autoscaler. The actual resulting cluster size may be slightly larger or smaller than expected depending on the internal bin packing algorithm and max worker count restrictions. Args: num_cpus (int): Scale the cluster to ensure this number of CPUs are available. This request is persistent until another call to request_resources() is made to override. bundles (List[ResourceDict]): Scale the cluster to ensure this set of resource shapes can fit. This request is persistent until another call to request_resources() is made to override. Examples: >>> # Request 1000 CPUs. >>> request_resources(num_cpus=1000) >>> # Request 64 CPUs and also fit a 1-GPU/4-CPU task. >>> request_resources(num_cpus=64, bundles=[{"GPU": 1, "CPU": 4}]) >>> # Same as requesting num_cpus=3. >>> request_resources(bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}]) """ return commands.request_resources(num_cpus, bundles)
def configure_logging(log_style: Optional[str] = None, color_mode: Optional[str] = None, verbosity: Optional[int] = None): """Configures logging for cluster command calls. Args: log_style (str): If 'pretty', outputs with formatting and color. If 'record', outputs record-style without formatting. 'auto' defaults to 'pretty', and disables pretty logging if stdin is *not* a TTY. Defaults to "auto". color_mode (str): Can be "true", "false", or "auto". Enables or disables `colorful`. If `color_mode` is "auto", is set to `not stdout.isatty()` vebosity (int): Output verbosity (0, 1, 2, 3). Low verbosity will disable `verbose` and `very_verbose` messages. """ cli_logger.configure( log_style=log_style, color_mode=color_mode, verbosity=verbosity) @contextmanager def _as_config_file(cluster_config: Union[dict, str]) -> Iterator[str]: if isinstance(cluster_config, dict): tmp = tempfile.NamedTemporaryFile("w", prefix="autoscaler-sdk-tmp-") tmp.write(json.dumps(cluster_config)) tmp.flush() cluster_config = tmp.name if not os.path.exists(cluster_config): raise ValueError("Cluster config not found {}".format(cluster_config)) yield cluster_config def bootstrap_config(cluster_config: Dict[str, Any], no_config_cache: bool = False) -> Dict[str, Any]: """Validate and add provider-specific fields to the config. For example, IAM/authentication may be added here.""" return commands._bootstrap_config(cluster_config, no_config_cache) def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: """Fillout default values for a cluster_config based on the provider.""" from ray.autoscaler._private.util import fillout_defaults return fillout_defaults(config) def register_callback_handler( event_name: str, callback: Union[Callable[[Dict], None], List[Callable[[Dict], None]]], ) -> None: """Registers a callback handler for autoscaler events. Args: event_name (str): Event that callback should be called on. See CreateClusterEvent for details on the events available to be registered against. callback (Callable): Callable object that is invoked when specified event occurs. """ global_event_system.add_callback_handler(event_name, callback) def get_docker_host_mount_location(cluster_name: str) -> str: """Return host path that Docker mounts attach to.""" docker_mount_prefix = "/tmp/ray_tmp_mount/{cluster_name}" return docker_mount_prefix.format(cluster_name=cluster_name)