Source code for ray.data.context

import os
import threading
from typing import TYPE_CHECKING, Any, Dict, Optional

import ray
from ray._private.ray_constants import env_integer
from ray.util.annotations import DeveloperAPI
from ray.util.scheduling_strategies import SchedulingStrategyT

if TYPE_CHECKING:
    from ray.data._internal.execution.interfaces import ExecutionOptions

# The context singleton on this process.
_default_context: "Optional[DataContext]" = None
_context_lock = threading.Lock()

# An estimate of what fraction of the object store a Dataset can use without too high
# a risk of triggering spilling. This is used to generate user warnings only.
ESTIMATED_SAFE_MEMORY_FRACTION = 0.25

# The max target block size in bytes for reads and transformations.  We choose
# 128MiB: With streaming execution and num_cpus many concurrent tasks, the
# memory footprint will be about 2 * num_cpus * target_max_block_size ~= RAM *
# DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION * 0.3 (default object store memory
# fraction set by Ray core), assuming typical memory:core ratio of 4:1.
DEFAULT_TARGET_MAX_BLOCK_SIZE = 512 * 1024 * 1024

# The max target block size in bytes for shuffle ops (random_shuffle, sort,
# repartition). Set a higher target block size because we have to materialize
# all input blocks anyway, so there is no performance advantage to having
# smaller blocks. Setting a larger block size allows avoiding overhead from an
# excessive number of partitions.
# We choose 512MiB as 8x less than the typical memory:core ratio of 4:1.
DEFAULT_SHUFFLE_TARGET_MAX_BLOCK_SIZE = 512 * 1024 * 1024

# Dataset will avoid creating blocks smaller than this size in bytes on read.
# This takes precedence over DEFAULT_MIN_PARALLELISM.
DEFAULT_TARGET_MIN_BLOCK_SIZE = 1 * 1024 * 1024

# Default buffer size when doing streaming reads from local or remote storage.
# This default appears to work well with most file sizes on remote storage systems,
# which is very sensitive to the buffer size.
DEFAULT_STREAMING_READ_BUFFER_SIZE = 32 * 1024 * 1024

# Whether pandas block format is enabled.
# TODO (kfstorm): Remove this once stable.
DEFAULT_ENABLE_PANDAS_BLOCK = True

# Whether to enable stage-fusion optimizations for dataset pipelines.
DEFAULT_OPTIMIZE_FUSE_STAGES = True

# Whether to enable stage-reorder optimizations for dataset pipelines.
DEFAULT_OPTIMIZE_REORDER_STAGES = True

# Whether to furthermore fuse read stages.
DEFAULT_OPTIMIZE_FUSE_READ_STAGES = True

# Whether to furthermore fuse prior map tasks with shuffle stages.
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True

# Minimum amount of parallelism to auto-detect for a dataset. Note that the min
# block size config takes precedence over this.
DEFAULT_MIN_PARALLELISM = 200

# Wether to use actor based block prefetcher.
DEFAULT_ACTOR_PREFETCHER_ENABLED = False

# Whether to use push-based shuffle by default.
DEFAULT_USE_PUSH_BASED_SHUFFLE = bool(
    os.environ.get("RAY_DATA_PUSH_BASED_SHUFFLE", None)
)

# The default global scheduling strategy. Note that for tasks with large args,
# DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS applies.
DEFAULT_SCHEDULING_STRATEGY = "SPREAD"

# Default scheduling strategy for tasks with large args. This enables locality-based
# scheduling in Ray for tasks where arg data transfer is a bottleneck.
DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS = "DEFAULT"

# Size in bytes after which point task arguments are considered large. Choose a value
# here at which point data transfer overhead becomes significant in comparison to
# task scheduling (i.e., low tens of ms).
DEFAULT_LARGE_ARGS_THRESHOLD = 50 * 1024 * 1024

# Whether to use Polars for tabular dataset sorts, groupbys, and aggregations.
DEFAULT_USE_POLARS = False

# Whether to use the new executor backend.
DEFAULT_NEW_EXECUTION_BACKEND = bool(
    int(os.environ.get("RAY_DATA_NEW_EXECUTION_BACKEND", "1"))
)

# Whether to use the streaming executor. This only has an effect if the new execution
# backend is enabled.
DEFAULT_USE_STREAMING_EXECUTOR = bool(
    int(os.environ.get("RAY_DATA_USE_STREAMING_EXECUTOR", "1"))
)

# Whether to eagerly free memory (new backend only).
DEFAULT_EAGER_FREE = bool(int(os.environ.get("RAY_DATA_EAGER_FREE", "1")))

# Whether to trace allocations / eager free (new backend only). This adds significant
# performance overheads and should only be used for debugging.
DEFAULT_TRACE_ALLOCATIONS = bool(int(os.environ.get("RAY_DATA_TRACE_ALLOCATIONS", "0")))

# Whether to estimate in-memory decoding data size for data source.
DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED = True

# Whether to automatically cast NumPy ndarray columns in Pandas DataFrames to tensor
# extension columns.
DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING = True

# Whether to automatically print Dataset stats after execution.
# If disabled, users can still manually print stats with Dataset.stats().
DEFAULT_AUTO_LOG_STATS = False

# Whether to enable optimizer.
DEFAULT_OPTIMIZER_ENABLED = bool(
    int(os.environ.get("RAY_DATA_NEW_EXECUTION_OPTIMIZER", "1"))
)

# Set this env var to enable distributed tqdm (experimental).
DEFAULT_USE_RAY_TQDM = bool(int(os.environ.get("RAY_TQDM", "1")))

# Use this to prefix important warning messages for the user.
WARN_PREFIX = "⚠️ "

# Use this to prefix important success messages for the user.
OK_PREFIX = "✔️ "

# Default batch size for batch transformations.
DEFAULT_BATCH_SIZE = 4096

# Default batch size for batch transformations in strict mode.
STRICT_MODE_DEFAULT_BATCH_SIZE = 1024

# Whether to enable progress bars.
DEFAULT_ENABLE_PROGRESS_BARS = not bool(
    env_integer("RAY_DATA_DISABLE_PROGRESS_BARS", 0)
)

# Whether to enable get_object_locations for metric
DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS = False


[docs]@DeveloperAPI class DataContext: """Singleton for shared Dataset resources and configurations. This object is automatically propagated to workers and can be retrieved from the driver and remote workers via DataContext.get_current(). """
[docs] def __init__( self, target_max_block_size: int, target_shuffle_max_block_size: int, target_min_block_size: int, streaming_read_buffer_size: int, enable_pandas_block: bool, optimize_fuse_stages: bool, optimize_fuse_read_stages: bool, optimize_fuse_shuffle_stages: bool, optimize_reorder_stages: bool, actor_prefetcher_enabled: bool, use_push_based_shuffle: bool, pipeline_push_based_shuffle_reduce_tasks: bool, scheduling_strategy: SchedulingStrategyT, scheduling_strategy_large_args: SchedulingStrategyT, large_args_threshold: int, use_polars: bool, new_execution_backend: bool, use_streaming_executor: bool, eager_free: bool, decoding_size_estimation: bool, min_parallelism: bool, enable_tensor_extension_casting: bool, enable_auto_log_stats: bool, trace_allocations: bool, optimizer_enabled: bool, execution_options: "ExecutionOptions", use_ray_tqdm: bool, enable_progress_bars: bool, enable_get_object_locations_for_metrics: bool, ): """Private constructor (use get_current() instead).""" self.target_max_block_size = target_max_block_size self.target_shuffle_max_block_size = target_shuffle_max_block_size self.target_min_block_size = target_min_block_size self.streaming_read_buffer_size = streaming_read_buffer_size self.enable_pandas_block = enable_pandas_block self.optimize_fuse_stages = optimize_fuse_stages self.optimize_fuse_read_stages = optimize_fuse_read_stages self.optimize_fuse_shuffle_stages = optimize_fuse_shuffle_stages self.optimize_reorder_stages = optimize_reorder_stages self.actor_prefetcher_enabled = actor_prefetcher_enabled self.use_push_based_shuffle = use_push_based_shuffle self.pipeline_push_based_shuffle_reduce_tasks = ( pipeline_push_based_shuffle_reduce_tasks ) self.scheduling_strategy = scheduling_strategy self.scheduling_strategy_large_args = scheduling_strategy_large_args self.large_args_threshold = large_args_threshold self.use_polars = use_polars self.new_execution_backend = new_execution_backend self.use_streaming_executor = use_streaming_executor self.eager_free = eager_free self.decoding_size_estimation = decoding_size_estimation self.min_parallelism = min_parallelism self.enable_tensor_extension_casting = enable_tensor_extension_casting self.enable_auto_log_stats = enable_auto_log_stats self.trace_allocations = trace_allocations self.optimizer_enabled = optimizer_enabled # TODO: expose execution options in Dataset public APIs. self.execution_options = execution_options self.use_ray_tqdm = use_ray_tqdm self.enable_progress_bars = enable_progress_bars self.enable_get_object_locations_for_metrics = ( enable_get_object_locations_for_metrics ) # The additonal ray remote args that should be added to # the task-pool-based data tasks. self._task_pool_data_task_remote_args: Dict[str, Any] = {} # The extra key-value style configs. # These configs are managed by individual components or plugins via # `set_config`, `get_config` and `remove_config`. # The reason why we use a dict instead of individual fields is to decouple # the DataContext from the plugin implementations, as well as to avoid # circular dependencies. self._kv_configs: Dict[str, Any] = {}
[docs] @staticmethod def get_current() -> "DataContext": """Get or create a singleton context. If the context has not yet been created in this process, it will be initialized with default settings. """ global _default_context with _context_lock: if _default_context is None: _default_context = DataContext( target_max_block_size=DEFAULT_TARGET_MAX_BLOCK_SIZE, target_shuffle_max_block_size=DEFAULT_SHUFFLE_TARGET_MAX_BLOCK_SIZE, target_min_block_size=DEFAULT_TARGET_MIN_BLOCK_SIZE, streaming_read_buffer_size=DEFAULT_STREAMING_READ_BUFFER_SIZE, enable_pandas_block=DEFAULT_ENABLE_PANDAS_BLOCK, optimize_fuse_stages=DEFAULT_OPTIMIZE_FUSE_STAGES, optimize_fuse_read_stages=DEFAULT_OPTIMIZE_FUSE_READ_STAGES, optimize_fuse_shuffle_stages=DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES, optimize_reorder_stages=DEFAULT_OPTIMIZE_REORDER_STAGES, actor_prefetcher_enabled=DEFAULT_ACTOR_PREFETCHER_ENABLED, use_push_based_shuffle=DEFAULT_USE_PUSH_BASED_SHUFFLE, # NOTE(swang): We have to pipeline reduce tasks right now # because of a scheduling bug at large scale. # See https://github.com/ray-project/ray/issues/25412. pipeline_push_based_shuffle_reduce_tasks=True, scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY, scheduling_strategy_large_args=( DEFAULT_SCHEDULING_STRATEGY_LARGE_ARGS ), large_args_threshold=DEFAULT_LARGE_ARGS_THRESHOLD, use_polars=DEFAULT_USE_POLARS, new_execution_backend=DEFAULT_NEW_EXECUTION_BACKEND, use_streaming_executor=DEFAULT_USE_STREAMING_EXECUTOR, eager_free=DEFAULT_EAGER_FREE, decoding_size_estimation=DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED, min_parallelism=DEFAULT_MIN_PARALLELISM, enable_tensor_extension_casting=( DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING ), enable_auto_log_stats=DEFAULT_AUTO_LOG_STATS, trace_allocations=DEFAULT_TRACE_ALLOCATIONS, optimizer_enabled=DEFAULT_OPTIMIZER_ENABLED, execution_options=ray.data.ExecutionOptions(), use_ray_tqdm=DEFAULT_USE_RAY_TQDM, enable_progress_bars=DEFAULT_ENABLE_PROGRESS_BARS, enable_get_object_locations_for_metrics=DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS, # noqa E501 ) return _default_context
@staticmethod def _set_current(context: "DataContext") -> None: """Set the current context in a remote worker. This is used internally by Dataset to propagate the driver context to remote workers used for parallelization. """ global _default_context _default_context = context
[docs] def get_config(self, key: str, default: Any = None) -> Any: """Get the value for a key-value style config. Args: key: The key of the config. default: The default value to return if the key is not found. Returns: The value for the key, or the default value if the key is not found. """ return self._kv_configs.get(key, default)
[docs] def set_config(self, key: str, value: Any) -> None: """Set the value for a key-value style config. Args: key: The key of the config. value: The value of the config. """ self._kv_configs[key] = value
[docs] def remove_config(self, key: str) -> None: """Remove a key-value style config. Args: key: The key of the config. """ self._kv_configs.pop(key, None)
# Backwards compatibility alias. DatasetContext = DataContext