import logging
import numpy as np
import os
import sys
from typing import Any, Optional, TYPE_CHECKING
import tree # pip install dm_tree
from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI
from ray.rllib.utils.deprecation import Deprecated
from ray.rllib.utils.typing import (
TensorShape,
TensorStructType,
TensorType,
)
if TYPE_CHECKING:
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
logger = logging.getLogger(__name__)
@PublicAPI
def convert_to_tensor(
data: TensorStructType,
framework: str,
device: Optional[str] = None,
):
"""Converts any nested numpy struct into framework-specific tensors.
Args:
data: The input data (numpy) to convert to framework-specific tensors.
framework: The framework to convert to. Only "torch" and "tf2" allowed.
device: An optional device name (for torch only).
Returns:
The converted tensor struct matching the input data.
"""
if framework == "torch":
from ray.rllib.utils.torch_utils import convert_to_torch_tensor
return convert_to_torch_tensor(data, device=device)
elif framework == "tf2":
_, tf, _ = try_import_tf()
return tree.map_structure(lambda s: tf.convert_to_tensor(s), data)
raise NotImplementedError(
f"framework={framework} not supported in `convert_to_tensor()`!"
)
@PublicAPI
def get_device(config: "AlgorithmConfig", num_gpus_requested: int = 1):
"""Returns a single device (CPU or some GPU) depending on a config.
Args:
config: An AlgorithmConfig to extract information from about the device to use.
num_gpus_requested: The number of GPUs actually requested. This may be the value
of `config.num_gpus_per_env_runner` when for example calling this function
from an EnvRunner.
Returns:
A single device (or name) given `config` and `num_gpus_requested`.
"""
if config.framework_str == "torch":
torch, _ = try_import_torch()
# TODO (Kourosh): How do we handle model parallelism?
# TODO (Kourosh): Instead of using _TorchAccelerator, we should use the public
# API in ray.train but allow for session to be None without any errors raised.
if num_gpus_requested > 0:
from ray.air._internal.torch_utils import get_devices
# `get_devices()` returns a list that contains the 0th device if
# it is called from outside a Ray Train session. It's necessary to give
# the user the option to run on the gpu of their choice, so we enable that
# option here through `config.local_gpu_idx`.
devices = get_devices()
if len(devices) == 1:
return devices[0]
else:
assert config.local_gpu_idx < torch.cuda.device_count(), (
f"local_gpu_idx {config.local_gpu_idx} is not a valid GPU ID "
"or is not available."
)
# This is an index into the available CUDA devices. For example, if
# `os.environ["CUDA_VISIBLE_DEVICES"] = "1"` then
# `torch.cuda.device_count() = 1` and torch.device(0) maps to that GPU
# with ID=1 on the node.
return torch.device(config.local_gpu_idx)
else:
return torch.device("cpu")
else:
raise NotImplementedError(
f"`framework_str` {config.framework_str} not supported!"
)
@PublicAPI
def try_import_jax(error: bool = False):
"""Tries importing JAX and FLAX and returns both modules (or Nones).
Args:
error: Whether to raise an error if JAX/FLAX cannot be imported.
Returns:
Tuple containing the jax- and the flax modules.
Raises:
ImportError: If error=True and JAX is not installed.
"""
if "RLLIB_TEST_NO_JAX_IMPORT" in os.environ:
logger.warning("Not importing JAX for test purposes.")
return None, None
try:
import jax
import flax
except ImportError:
if error:
raise ImportError(
"Could not import JAX! RLlib requires you to "
"install at least one deep-learning framework: "
"`pip install [torch|tensorflow|jax]`."
)
return None, None
return jax, flax
[docs]
@PublicAPI
def try_import_tf(error: bool = False):
"""Tries importing tf and returns the module (or None).
Args:
error: Whether to raise an error if tf cannot be imported.
Returns:
Tuple containing
1) tf1.x module (either from tf2.x.compat.v1 OR as tf1.x).
2) tf module (resulting from `import tensorflow`). Either tf1.x or
2.x. 3) The actually installed tf version as int: 1 or 2.
Raises:
ImportError: If error=True and tf is not installed.
"""
tf_stub = _TFStub()
# Make sure, these are reset after each test case
# that uses them: del os.environ["RLLIB_TEST_NO_TF_IMPORT"]
if "RLLIB_TEST_NO_TF_IMPORT" in os.environ:
logger.warning("Not importing TensorFlow for test purposes")
return None, tf_stub, None
if "TF_CPP_MIN_LOG_LEVEL" not in os.environ:
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
# Try to reuse already imported tf module. This will avoid going through
# the initial import steps below and thereby switching off v2_behavior
# (switching off v2 behavior twice breaks all-framework tests for eager).
was_imported = False
if "tensorflow" in sys.modules:
tf_module = sys.modules["tensorflow"]
was_imported = True
else:
try:
import tensorflow as tf_module
except ImportError:
if error:
raise ImportError(
"Could not import TensorFlow! RLlib requires you to "
"install at least one deep-learning framework: "
"`pip install [torch|tensorflow|jax]`."
)
return None, tf_stub, None
# Try "reducing" tf to tf.compat.v1.
try:
tf1_module = tf_module.compat.v1
tf1_module.logging.set_verbosity(tf1_module.logging.ERROR)
if not was_imported:
tf1_module.disable_v2_behavior()
tf1_module.enable_resource_variables()
tf1_module.logging.set_verbosity(tf1_module.logging.WARN)
# No compat.v1 -> return tf as is.
except AttributeError:
tf1_module = tf_module
if not hasattr(tf_module, "__version__"):
version = 1 # sphinx doc gen
else:
version = 2 if "2." in tf_module.__version__[:2] else 1
return tf1_module, tf_module, version
# Fake module for tf.
class _TFStub:
def __init__(self) -> None:
self.keras = _KerasStub()
def __bool__(self):
# if tf should return False
return False
# Fake module for tf.keras.
class _KerasStub:
def __init__(self) -> None:
self.Model = _FakeTfClassStub
# Fake classes under keras (e.g for tf.keras.Model)
class _FakeTfClassStub:
def __init__(self, *a, **kw):
raise ImportError("Could not import `tensorflow`. Try pip install tensorflow.")
@DeveloperAPI
def tf_function(tf_module):
"""Conditional decorator for @tf.function.
Use @tf_function(tf) instead to avoid errors if tf is not installed."""
# The actual decorator to use (pass in `tf` (which could be None)).
def decorator(func):
# If tf not installed -> return function as is (won't be used anyways).
if tf_module is None or tf_module.executing_eagerly():
return func
# If tf installed, return @tf.function-decorated function.
return tf_module.function(func)
return decorator
[docs]
@PublicAPI
def try_import_tfp(error: bool = False):
"""Tries importing tfp and returns the module (or None).
Args:
error: Whether to raise an error if tfp cannot be imported.
Returns:
The tfp module.
Raises:
ImportError: If error=True and tfp is not installed.
"""
if "RLLIB_TEST_NO_TF_IMPORT" in os.environ:
logger.warning("Not importing TensorFlow Probability for test purposes.")
return None
try:
import tensorflow_probability as tfp
return tfp
except ImportError as e:
if error:
raise e
return None
# Fake module for torch.nn.
class _NNStub:
def __init__(self, *a, **kw):
# Fake nn.functional module within torch.nn.
self.functional = None
self.Module = _FakeTorchClassStub
self.parallel = _ParallelStub()
# Fake class for e.g. torch.nn.Module to allow it to be inherited from.
class _FakeTorchClassStub:
def __init__(self, *a, **kw):
raise ImportError("Could not import `torch`. Try pip install torch.")
class _ParallelStub:
def __init__(self, *a, **kw):
self.DataParallel = _FakeTorchClassStub
self.DistributedDataParallel = _FakeTorchClassStub
[docs]
@PublicAPI
def try_import_torch(error: bool = False):
"""Tries importing torch and returns the module (or None).
Args:
error: Whether to raise an error if torch cannot be imported.
Returns:
Tuple consisting of the torch- AND torch.nn modules.
Raises:
ImportError: If error=True and PyTorch is not installed.
"""
if "RLLIB_TEST_NO_TORCH_IMPORT" in os.environ:
logger.warning("Not importing PyTorch for test purposes.")
return _torch_stubs()
try:
import torch
import torch.nn as nn
return torch, nn
except ImportError:
if error:
raise ImportError(
"Could not import PyTorch! RLlib requires you to "
"install at least one deep-learning framework: "
"`pip install [torch|tensorflow|jax]`."
)
return _torch_stubs()
def _torch_stubs():
nn = _NNStub()
return None, nn
@DeveloperAPI
def get_variable(
value: Any,
framework: str = "tf",
trainable: bool = False,
tf_name: str = "unnamed-variable",
torch_tensor: bool = False,
device: Optional[str] = None,
shape: Optional[TensorShape] = None,
dtype: Optional[TensorType] = None,
) -> Any:
"""Creates a tf variable, a torch tensor, or a python primitive.
Args:
value: The initial value to use. In the non-tf case, this will
be returned as is. In the tf case, this could be a tf-Initializer
object.
framework: One of "tf", "torch", or None.
trainable: Whether the generated variable should be
trainable (tf)/require_grad (torch) or not (default: False).
tf_name: For framework="tf": An optional name for the
tf.Variable.
torch_tensor: For framework="torch": Whether to actually create
a torch.tensor, or just a python value (default).
device: An optional torch device to use for
the created torch tensor.
shape: An optional shape to use iff `value`
does not have any (e.g. if it's an initializer w/o explicit value).
dtype: An optional dtype to use iff `value` does
not have any (e.g. if it's an initializer w/o explicit value).
This should always be a numpy dtype (e.g. np.float32, np.int64).
Returns:
A framework-specific variable (tf.Variable, torch.tensor, or
python primitive).
"""
if framework in ["tf2", "tf"]:
import tensorflow as tf
dtype = dtype or getattr(
value,
"dtype",
tf.float32
if isinstance(value, float)
else tf.int32
if isinstance(value, int)
else None,
)
return tf.compat.v1.get_variable(
tf_name,
initializer=value,
dtype=dtype,
trainable=trainable,
**({} if shape is None else {"shape": shape}),
)
elif framework == "torch" and torch_tensor is True:
torch, _ = try_import_torch()
if not isinstance(value, np.ndarray):
value = np.array(value)
var_ = torch.from_numpy(value)
if dtype in [torch.float32, np.float32]:
var_ = var_.float()
elif dtype in [torch.int32, np.int32]:
var_ = var_.int()
elif dtype in [torch.float64, np.float64]:
var_ = var_.double()
if device:
var_ = var_.to(device)
var_.requires_grad = trainable
return var_
# torch or None: Return python primitive.
return value
@Deprecated(
old="rllib/utils/framework.py::get_activation_fn",
new="rllib/models/utils.py::get_activation_fn",
error=True,
)
def get_activation_fn(name: Optional[str] = None, framework: str = "tf"):
pass