import json
import logging
import os
from copy import deepcopy
from dataclasses import asdict, is_dataclass
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import ray
from ray._private.ray_constants import DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS
from ray._private.runtime_env.conda import get_uri as get_conda_uri
from ray._private.runtime_env.pip import get_uri as get_pip_uri
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN
from ray._private.thirdparty.dacite import from_dict
from ray.core.generated.runtime_env_common_pb2 import (
RuntimeEnvConfig as ProtoRuntimeEnvConfig,
)
from ray.util.annotations import PublicAPI
logger = logging.getLogger(__name__)
[docs]
@PublicAPI(stability="stable")
class RuntimeEnvConfig(dict):
"""Used to specify configuration options for a runtime environment.
The config is not included when calculating the runtime_env hash,
which means that two runtime_envs with the same options but different
configs are considered the same for caching purposes.
Args:
setup_timeout_seconds: The timeout of runtime environment
creation, timeout is in seconds. The value `-1` means disable
timeout logic, except `-1`, `setup_timeout_seconds` cannot be
less than or equal to 0. The default value of `setup_timeout_seconds`
is 600 seconds.
eager_install: Indicates whether to install the runtime environment
on the cluster at `ray.init()` time, before the workers are leased.
This flag is set to `True` by default.
"""
known_fields: Set[str] = {"setup_timeout_seconds", "eager_install", "log_files"}
_default_config: Dict = {
"setup_timeout_seconds": DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS,
"eager_install": True,
"log_files": [],
}
def __init__(
self,
setup_timeout_seconds: int = DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS,
eager_install: bool = True,
log_files: Optional[List[str]] = None,
):
super().__init__()
if not isinstance(setup_timeout_seconds, int):
raise TypeError(
"setup_timeout_seconds must be of type int, "
f"got: {type(setup_timeout_seconds)}"
)
elif setup_timeout_seconds <= 0 and setup_timeout_seconds != -1:
raise ValueError(
"setup_timeout_seconds must be greater than zero "
f"or equals to -1, got: {setup_timeout_seconds}"
)
self["setup_timeout_seconds"] = setup_timeout_seconds
if not isinstance(eager_install, bool):
raise TypeError(
f"eager_install must be a boolean. got {type(eager_install)}"
)
self["eager_install"] = eager_install
if log_files is not None:
if not isinstance(log_files, list):
raise TypeError(
"log_files must be a list of strings or None, got "
f"{log_files} with type {type(log_files)}."
)
for file_name in log_files:
if not isinstance(file_name, str):
raise TypeError("Each item in log_files must be a string.")
else:
log_files = self._default_config["log_files"]
self["log_files"] = log_files
@staticmethod
def parse_and_validate_runtime_env_config(
config: Union[Dict, "RuntimeEnvConfig"]
) -> "RuntimeEnvConfig":
if isinstance(config, RuntimeEnvConfig):
return config
elif isinstance(config, Dict):
unknown_fields = set(config.keys()) - RuntimeEnvConfig.known_fields
if len(unknown_fields):
logger.warning(
"The following unknown entries in the runtime_env_config "
f"dictionary will be ignored: {unknown_fields}."
)
config_dict = dict()
for field in RuntimeEnvConfig.known_fields:
if field in config:
config_dict[field] = config[field]
return RuntimeEnvConfig(**config_dict)
else:
raise TypeError(
"runtime_env['config'] must be of type dict or RuntimeEnvConfig, "
f"got: {type(config)}"
)
@classmethod
def default_config(cls):
return RuntimeEnvConfig(**cls._default_config)
def build_proto_runtime_env_config(self) -> ProtoRuntimeEnvConfig:
runtime_env_config = ProtoRuntimeEnvConfig()
runtime_env_config.setup_timeout_seconds = self["setup_timeout_seconds"]
runtime_env_config.eager_install = self["eager_install"]
if self["log_files"] is not None:
runtime_env_config.log_files.extend(self["log_files"])
return runtime_env_config
@classmethod
def from_proto(cls, runtime_env_config: ProtoRuntimeEnvConfig):
setup_timeout_seconds = runtime_env_config.setup_timeout_seconds
# Cause python class RuntimeEnvConfig has validate to avoid
# setup_timeout_seconds equals zero, so setup_timeout_seconds
# on RuntimeEnvConfig is zero means other Language(except python)
# dosn't assign value to setup_timeout_seconds. So runtime_env_agent
# assign the default value to setup_timeout_seconds.
if setup_timeout_seconds == 0:
setup_timeout_seconds = cls._default_config["setup_timeout_seconds"]
return cls(
setup_timeout_seconds=setup_timeout_seconds,
eager_install=runtime_env_config.eager_install,
log_files=list(runtime_env_config.log_files),
)
def to_dict(self) -> Dict:
return dict(deepcopy(self))
# Due to circular reference, field config can only be assigned a value here
OPTION_TO_VALIDATION_FN[
"config"
] = RuntimeEnvConfig.parse_and_validate_runtime_env_config
[docs]
@PublicAPI
class RuntimeEnv(dict):
"""This class is used to define a runtime environment for a job, task,
or actor.
See :ref:`runtime-environments` for detailed documentation.
This class can be used interchangeably with an unstructured dictionary
in the relevant API calls.
Can specify a runtime environment whole job, whether running a script
directly on the cluster, using Ray Job submission, or using Ray Client:
.. code-block:: python
from ray.runtime_env import RuntimeEnv
# Starting a single-node local Ray cluster
ray.init(runtime_env=RuntimeEnv(...))
.. code-block:: python
from ray.runtime_env import RuntimeEnv
# Connecting to remote cluster using Ray Client
ray.init("ray://123.456.7.89:10001", runtime_env=RuntimeEnv(...))
Can specify different runtime environments per-actor or per-task using
``.options()`` or the ``@ray.remote`` decorator:
.. code-block:: python
from ray.runtime_env import RuntimeEnv
# Invoke a remote task that will run in a specified runtime environment.
f.options(runtime_env=RuntimeEnv(...)).remote()
# Instantiate an actor that will run in a specified runtime environment.
actor = SomeClass.options(runtime_env=RuntimeEnv(...)).remote()
# Specify a runtime environment in the task definition. Future invocations via
# `g.remote()` will use this runtime environment unless overridden by using
# `.options()` as above.
@ray.remote(runtime_env=RuntimeEnv(...))
def g():
pass
# Specify a runtime environment in the actor definition. Future instantiations
# via `MyClass.remote()` will use this runtime environment unless overridden by
# using `.options()` as above.
@ray.remote(runtime_env=RuntimeEnv(...))
class MyClass:
pass
Here are some examples of RuntimeEnv initialization:
.. code-block:: python
# Example for using conda
RuntimeEnv(conda={
"channels": ["defaults"], "dependencies": ["codecov"]})
RuntimeEnv(conda="pytorch_p36") # Found on DLAMIs
# Example for using container
RuntimeEnv(
container={"image": "anyscale/ray-ml:nightly-py38-cpu",
"run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]})
# Example for set env_vars
RuntimeEnv(env_vars={"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"})
# Example for set pip
RuntimeEnv(
pip={"packages":["tensorflow", "requests"], "pip_check": False,
"pip_version": "==22.0.2;python_version=='3.8.11'"})
Args:
py_modules: List of URIs (either in the GCS or external
storage), each of which is a zip file that will be unpacked and
inserted into the PYTHONPATH of the workers.
working_dir: URI (either in the GCS or external storage) of a zip
file that will be unpacked in the directory of each task/actor.
pip: Either a list of pip packages, a string
containing the path to a pip requirements.txt file, or a python
dictionary that has three fields: 1) ``packages`` (required, List[str]): a
list of pip packages, 2) ``pip_check`` (optional, bool): whether enable
pip check at the end of pip install, defaults to False.
3) ``pip_version`` (optional, str): the version of pip, Ray will spell
the package name "pip" in front of the ``pip_version`` to form the final
requirement string, the syntax of a requirement specifier is defined in
full in PEP 508.
conda: Either the conda YAML config, the name of a
local conda env (e.g., "pytorch_p36"), or the path to a conda
environment.yaml file.
The Ray dependency will be automatically injected into the conda
env to ensure compatibility with the cluster Ray. The conda name
may be mangled automatically to avoid conflicts between runtime
envs.
This field cannot be specified at the same time as the 'pip' field.
To use pip with conda, please specify your pip dependencies within
the conda YAML config:
https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html#create-env-file-manually
container: Require a given (Docker) container image,
The Ray worker process will run in a container with this image.
The `run_options` list spec is here:
https://docs.docker.com/engine/reference/run/
env_vars: Environment variables to set.
worker_process_setup_hook: (Experimental) The setup hook that's
called after workers start and before Tasks and Actors are scheduled.
A module name (string type) or callable (function) can be passed.
When a module name is passed, Ray worker should be able to access the
module name. When a callable is passed, callable should be serializable.
When a runtime env is specified by job submission API,
only a module name (string) is allowed.
nsight: Dictionary mapping nsight profile option name to it's value.
config: config for runtime environment. Either
a dict or a RuntimeEnvConfig. Field: (1) setup_timeout_seconds, the
timeout of runtime environment creation, timeout is in seconds.
"""
known_fields: Set[str] = {
"py_modules",
"java_jars",
"working_dir",
"conda",
"pip",
"container",
"excludes",
"env_vars",
"_ray_release",
"_ray_commit",
"_inject_current_ray",
"config",
# TODO(SongGuyang): We add this because the test
# `test_experimental_package_github` set a `docker`
# field which is not supported. We should remove it
# with the test.
"docker",
"worker_process_setup_hook",
"_nsight",
"mpi",
"image_uri",
}
extensions_fields: Set[str] = {
"_ray_release",
"_ray_commit",
"_inject_current_ray",
}
def __init__(
self,
*,
py_modules: Optional[List[str]] = None,
working_dir: Optional[str] = None,
pip: Optional[List[str]] = None,
conda: Optional[Union[Dict[str, str], str]] = None,
container: Optional[Dict[str, str]] = None,
env_vars: Optional[Dict[str, str]] = None,
worker_process_setup_hook: Optional[Union[Callable, str]] = None,
nsight: Optional[Union[str, Dict[str, str]]] = None,
config: Optional[Union[Dict, RuntimeEnvConfig]] = None,
_validate: bool = True,
mpi: Optional[Dict] = None,
image_uri: Optional[str] = None,
**kwargs,
):
super().__init__()
runtime_env = kwargs
if py_modules is not None:
runtime_env["py_modules"] = py_modules
if working_dir is not None:
runtime_env["working_dir"] = working_dir
if pip is not None:
runtime_env["pip"] = pip
if conda is not None:
runtime_env["conda"] = conda
if nsight is not None:
runtime_env["_nsight"] = nsight
if container is not None:
runtime_env["container"] = container
if env_vars is not None:
runtime_env["env_vars"] = env_vars
if config is not None:
runtime_env["config"] = config
if worker_process_setup_hook is not None:
runtime_env["worker_process_setup_hook"] = worker_process_setup_hook
if mpi is not None:
runtime_env["mpi"] = mpi
if image_uri is not None:
runtime_env["image_uri"] = image_uri
if runtime_env.get("java_jars"):
runtime_env["java_jars"] = runtime_env.get("java_jars")
self.update(runtime_env)
# Blindly trust that the runtime_env has already been validated.
# This is dangerous and should only be used internally (e.g., on the
# deserialization codepath.
if not _validate:
return
if self.get("conda") and self.get("pip"):
raise ValueError(
"The 'pip' field and 'conda' field of "
"runtime_env cannot both be specified.\n"
f"specified pip field: {self['pip']}\n"
f"specified conda field: {self['conda']}\n"
"To use pip with conda, please only set the 'conda' "
"field, and specify your pip dependencies "
"within the conda YAML config dict: see "
"https://conda.io/projects/conda/en/latest/"
"user-guide/tasks/manage-environments.html"
"#create-env-file-manually"
)
if self.get("container"):
invalid_keys = set(runtime_env.keys()) - {"container", "config", "env_vars"}
if len(invalid_keys):
raise ValueError(
"The 'container' field currently cannot be used "
"together with other fields of runtime_env. "
f"Specified fields: {invalid_keys}"
)
if self.get("image_uri"):
invalid_keys = set(runtime_env.keys()) - {"image_uri", "config", "env_vars"}
if len(invalid_keys):
raise ValueError(
"The 'image_uri' field currently cannot be used "
"together with other fields of runtime_env. "
f"Specified fields: {invalid_keys}"
)
for option, validate_fn in OPTION_TO_VALIDATION_FN.items():
option_val = self.get(option)
if option_val is not None:
del self[option]
self[option] = option_val
if "_ray_commit" not in self:
if self.get("pip") or self.get("conda"):
self["_ray_commit"] = ray.__commit__
# Used for testing wheels that have not yet been merged into master.
# If this is set to True, then we do not inject Ray into the conda
# or pip dependencies.
if "_inject_current_ray" not in self:
if "RAY_RUNTIME_ENV_LOCAL_DEV_MODE" in os.environ:
self["_inject_current_ray"] = True
# NOTE(architkulkarni): This allows worker caching code in C++ to check
# if a runtime env is empty without deserializing it. This is a catch-
# all; for validated inputs we won't set the key if the value is None.
if all(val is None for val in self.values()):
self.clear()
def __setitem__(self, key: str, value: Any) -> None:
if is_dataclass(value):
jsonable_type = asdict(value)
else:
jsonable_type = value
RuntimeEnvPluginSchemaManager.validate(key, jsonable_type)
res_value = jsonable_type
if key in RuntimeEnv.known_fields and key in OPTION_TO_VALIDATION_FN:
res_value = OPTION_TO_VALIDATION_FN[key](jsonable_type)
if res_value is None:
return
return super().__setitem__(key, res_value)
def set(self, name: str, value: Any) -> None:
self.__setitem__(name, value)
def get(self, name, default=None, data_class=None):
if name not in self:
return default
if not data_class:
return self.__getitem__(name)
else:
return from_dict(data_class=data_class, data=self.__getitem__(name))
@classmethod
def deserialize(cls, serialized_runtime_env: str) -> "RuntimeEnv": # noqa: F821
return cls(_validate=False, **json.loads(serialized_runtime_env))
def serialize(self) -> str:
# To ensure the accuracy of Proto, `__setitem__` can only guarantee the
# accuracy of a certain field, not the overall accuracy
runtime_env = type(self)(_validate=True, **self)
return json.dumps(
runtime_env,
sort_keys=True,
)
def to_dict(self) -> Dict:
runtime_env_dict = dict(deepcopy(self))
# Replace strongly-typed RuntimeEnvConfig with a dict to allow the returned
# dict to work properly as a field in a dataclass. Details in issue #26986
if runtime_env_dict.get("config"):
runtime_env_dict["config"] = runtime_env_dict["config"].to_dict()
return runtime_env_dict
def has_working_dir(self) -> bool:
return self.get("working_dir") is not None
def working_dir_uri(self) -> Optional[str]:
return self.get("working_dir")
def py_modules_uris(self) -> List[str]:
if "py_modules" in self:
return list(self["py_modules"])
return []
def conda_uri(self) -> Optional[str]:
if "conda" in self:
return get_conda_uri(self)
return None
def pip_uri(self) -> Optional[str]:
if "pip" in self:
return get_pip_uri(self)
return None
[docs]
def plugin_uris(self) -> List[str]:
"""Not implemented yet, always return a empty list"""
return []
def working_dir(self) -> str:
return self.get("working_dir", "")
def py_modules(self) -> List[str]:
if "py_modules" in self:
return list(self["py_modules"])
return []
def java_jars(self) -> List[str]:
if "java_jars" in self:
return list(self["java_jars"])
return []
def mpi(self) -> Optional[Union[str, Dict[str, str]]]:
return self.get("mpi", None)
def nsight(self) -> Optional[Union[str, Dict[str, str]]]:
return self.get("_nsight", None)
def env_vars(self) -> Dict:
return self.get("env_vars", {})
def has_conda(self) -> str:
if self.get("conda"):
return True
return False
def conda_env_name(self) -> str:
if not self.has_conda() or not isinstance(self["conda"], str):
return None
return self["conda"]
def conda_config(self) -> str:
if not self.has_conda() or not isinstance(self["conda"], dict):
return None
return json.dumps(self["conda"], sort_keys=True)
def has_pip(self) -> bool:
if self.get("pip"):
return True
return False
def virtualenv_name(self) -> Optional[str]:
if not self.has_pip() or not isinstance(self["pip"], str):
return None
return self["pip"]
def pip_config(self) -> Dict:
if not self.has_pip() or isinstance(self["pip"], str):
return {}
# Parse and validate field pip on method `__setitem__`
self["pip"] = self["pip"]
return self["pip"]
def get_extension(self, key) -> Optional[str]:
if key not in RuntimeEnv.extensions_fields:
raise ValueError(
f"Extension key must be one of {RuntimeEnv.extensions_fields}, "
f"got: {key}"
)
return self.get(key)
def has_py_container(self) -> bool:
if self.get("container"):
return True
return False
def py_container_image(self) -> Optional[str]:
if not self.has_py_container():
return None
return self["container"].get("image", "")
def py_container_worker_path(self) -> Optional[str]:
if not self.has_py_container():
return None
return self["container"].get("worker_path", "")
def py_container_run_options(self) -> List:
if not self.has_py_container():
return None
return self["container"].get("run_options", [])
def image_uri(self) -> Optional[str]:
return self.get("image_uri")
def plugins(self) -> List[Tuple[str, Any]]:
result = list()
for key, value in self.items():
if key not in self.known_fields:
result.append((key, value))
return result
def _merge_runtime_env(
parent: Optional[RuntimeEnv],
child: Optional[RuntimeEnv],
override: bool = False,
) -> Optional[RuntimeEnv]:
"""Merge the parent and child runtime environments.
If override = True, the child's runtime env overrides the parent's
runtime env in the event of a conflict.
Merging happens per key (i.e., "conda", "pip", ...), but
"env_vars" are merged per env var key.
It returns None if Ray fails to merge runtime environments because
of a conflict and `override = False`.
Args:
parent: Parent runtime env.
child: Child runtime env.
override: If True, the child's runtime env overrides
conflicting fields.
Returns:
The merged runtime env's if Ray successfully merges them.
None if the runtime env's conflict. Empty dict if
parent and child are both None.
"""
if parent is None:
parent = {}
if child is None:
child = {}
parent = deepcopy(parent)
child = deepcopy(child)
parent_env_vars = parent.pop("env_vars", {})
child_env_vars = child.pop("env_vars", {})
if not override:
if set(parent.keys()).intersection(set(child.keys())):
return None
if set(parent_env_vars.keys()).intersection(set(child_env_vars.keys())): # noqa
return None
parent.update(child)
parent_env_vars.update(child_env_vars)
if parent_env_vars:
parent["env_vars"] = parent_env_vars
return parent