# Some code in this file is from
# https://github.com/ionelmc/python-remote-pdb/blob/07d563331c4ab9eb45731bb272b158816d98236e/src/remote_pdb.py
# (BSD 2-Clause "Simplified" License)
import errno
import inspect
import json
import logging
import os
import re
import select
import socket
import sys
import time
import traceback
import uuid
from pdb import Pdb
from typing import Callable
import setproctitle
import ray
from ray._private import ray_constants
from ray.experimental.internal_kv import _internal_kv_del, _internal_kv_put
from ray.util.annotations import DeveloperAPI
log = logging.getLogger(__name__)
def _cry(message, stderr=sys.__stderr__):
print(message, file=stderr)
stderr.flush()
class _LF2CRLF_FileWrapper(object):
def __init__(self, connection):
self.connection = connection
self.stream = fh = connection.makefile("rw")
self.read = fh.read
self.readline = fh.readline
self.readlines = fh.readlines
self.close = fh.close
self.flush = fh.flush
self.fileno = fh.fileno
if hasattr(fh, "encoding"):
self._send = lambda data: connection.sendall(
data.encode(fh.encoding, errors="replace")
)
else:
self._send = connection.sendall
@property
def encoding(self):
return self.stream.encoding
def __iter__(self):
return self.stream.__iter__()
def write(self, data, nl_rex=re.compile("\r?\n")):
data = nl_rex.sub("\r\n", data)
self._send(data)
def writelines(self, lines, nl_rex=re.compile("\r?\n")):
for line in lines:
self.write(line, nl_rex)
class _PdbWrap(Pdb):
"""Wrap PDB to run a custom exit hook on continue."""
def __init__(self, exit_hook: Callable[[], None]):
self._exit_hook = exit_hook
Pdb.__init__(self)
def do_continue(self, arg):
self._exit_hook()
return Pdb.do_continue(self, arg)
do_c = do_cont = do_continue
class _RemotePdb(Pdb):
"""
This will run pdb as a ephemeral telnet service. Once you connect no one
else can connect. On construction this object will block execution till a
client has connected.
Based on https://github.com/tamentis/rpdb I think ...
To use this::
RemotePdb(host="0.0.0.0", port=4444).set_trace()
Then run: telnet 127.0.0.1 4444
"""
active_instance = None
def __init__(
self,
breakpoint_uuid,
host,
port,
ip_address,
patch_stdstreams=False,
quiet=False,
):
self._breakpoint_uuid = breakpoint_uuid
self._quiet = quiet
self._patch_stdstreams = patch_stdstreams
self._listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self._listen_socket.bind((host, port))
self._ip_address = ip_address
def listen(self):
if not self._quiet:
_cry(
"RemotePdb session open at %s:%s, "
"use 'ray debug' to connect..."
% (self._ip_address, self._listen_socket.getsockname()[1])
)
self._listen_socket.listen(1)
connection, address = self._listen_socket.accept()
if not self._quiet:
_cry(f"RemotePdb accepted connection from {address}")
self.handle = _LF2CRLF_FileWrapper(connection)
Pdb.__init__(
self,
completekey="tab",
stdin=self.handle,
stdout=self.handle,
skip=["ray.*"],
)
self.backup = []
if self._patch_stdstreams:
for name in (
"stderr",
"stdout",
"__stderr__",
"__stdout__",
"stdin",
"__stdin__",
):
self.backup.append((name, getattr(sys, name)))
setattr(sys, name, self.handle)
_RemotePdb.active_instance = self
def __restore(self):
if self.backup and not self._quiet:
_cry("Restoring streams: %s ..." % self.backup)
for name, fh in self.backup:
setattr(sys, name, fh)
self.handle.close()
_RemotePdb.active_instance = None
def do_quit(self, arg):
self.__restore()
return Pdb.do_quit(self, arg)
do_q = do_exit = do_quit
def do_continue(self, arg):
self.__restore()
self.handle.connection.close()
return Pdb.do_continue(self, arg)
do_c = do_cont = do_continue
def set_trace(self, frame=None):
if frame is None:
frame = sys._getframe().f_back
try:
Pdb.set_trace(self, frame)
except IOError as exc:
if exc.errno != errno.ECONNRESET:
raise
def post_mortem(self, traceback=None):
# See https://github.com/python/cpython/blob/
# 022bc7572f061e1d1132a4db9d085b29707701e7/Lib/pdb.py#L1617
try:
t = sys.exc_info()[2]
self.reset()
Pdb.interaction(self, None, t)
except IOError as exc:
if exc.errno != errno.ECONNRESET:
raise
def do_remote(self, arg):
"""remote
Skip into the next remote call.
"""
# Tell the next task to drop into the debugger.
ray._private.worker.global_worker.debugger_breakpoint = self._breakpoint_uuid
# Tell the debug loop to connect to the next task.
data = json.dumps(
{
"job_id": ray.get_runtime_context().get_job_id(),
}
)
_internal_kv_put(
"RAY_PDB_CONTINUE_{}".format(self._breakpoint_uuid),
data,
namespace=ray_constants.KV_NAMESPACE_PDB,
)
self.__restore()
self.handle.connection.close()
return Pdb.do_continue(self, arg)
def do_get(self, arg):
"""get
Skip to where the current task returns to.
"""
ray._private.worker.global_worker.debugger_get_breakpoint = (
self._breakpoint_uuid
)
self.__restore()
self.handle.connection.close()
return Pdb.do_continue(self, arg)
def _connect_ray_pdb(
host=None,
port=None,
patch_stdstreams=False,
quiet=None,
breakpoint_uuid=None,
debugger_external=False,
):
"""
Opens a remote PDB on first available port.
"""
if debugger_external:
assert not host, "Cannot specify both host and debugger_external"
host = "0.0.0.0"
elif host is None:
host = os.environ.get("REMOTE_PDB_HOST", "127.0.0.1")
if port is None:
port = int(os.environ.get("REMOTE_PDB_PORT", "0"))
if quiet is None:
quiet = bool(os.environ.get("REMOTE_PDB_QUIET", ""))
if not breakpoint_uuid:
breakpoint_uuid = uuid.uuid4().hex
if debugger_external:
ip_address = ray._private.worker.global_worker.node_ip_address
else:
ip_address = "localhost"
rdb = _RemotePdb(
breakpoint_uuid=breakpoint_uuid,
host=host,
port=port,
ip_address=ip_address,
patch_stdstreams=patch_stdstreams,
quiet=quiet,
)
sockname = rdb._listen_socket.getsockname()
pdb_address = "{}:{}".format(ip_address, sockname[1])
parentframeinfo = inspect.getouterframes(inspect.currentframe())[2]
data = {
"proctitle": setproctitle.getproctitle(),
"pdb_address": pdb_address,
"filename": parentframeinfo.filename,
"lineno": parentframeinfo.lineno,
"traceback": "\n".join(traceback.format_exception(*sys.exc_info())),
"timestamp": time.time(),
"job_id": ray.get_runtime_context().get_job_id(),
"node_id": ray.get_runtime_context().get_node_id(),
"worker_id": ray.get_runtime_context().get_worker_id(),
"actor_id": ray.get_runtime_context().get_actor_id(),
"task_id": ray.get_runtime_context().get_task_id(),
}
_internal_kv_put(
"RAY_PDB_{}".format(breakpoint_uuid),
json.dumps(data),
overwrite=True,
namespace=ray_constants.KV_NAMESPACE_PDB,
)
rdb.listen()
_internal_kv_del(
"RAY_PDB_{}".format(breakpoint_uuid), namespace=ray_constants.KV_NAMESPACE_PDB
)
return rdb
[docs]
@DeveloperAPI
def set_trace(breakpoint_uuid=None):
"""Interrupt the flow of the program and drop into the Ray debugger.
Can be used within a Ray task or actor.
"""
if os.environ.get("RAY_DEBUG", "1") == "1":
return ray.util.ray_debugpy.set_trace(breakpoint_uuid)
if os.environ.get("RAY_DEBUG", "1") == "legacy":
# If there is an active debugger already, we do not want to
# start another one, so "set_trace" is just a no-op in that case.
if ray._private.worker.global_worker.debugger_breakpoint == b"":
frame = sys._getframe().f_back
rdb = _connect_ray_pdb(
host=None,
port=None,
patch_stdstreams=False,
quiet=None,
breakpoint_uuid=breakpoint_uuid.decode() if breakpoint_uuid else None,
debugger_external=ray._private.worker.global_worker.ray_debugger_external, # noqa: E501
)
rdb.set_trace(frame=frame)
def _driver_set_trace():
"""The breakpoint hook to use for the driver.
This disables Ray driver logs temporarily so that the PDB console is not
spammed: https://github.com/ray-project/ray/issues/18172
"""
if os.environ.get("RAY_DEBUG", "1") == "1":
return ray.util.ray_debugpy.set_trace()
if os.environ.get("RAY_DEBUG", "1") == "legacy":
print("*** Temporarily disabling Ray worker logs ***")
ray._private.worker._worker_logs_enabled = False
def enable_logging():
print("*** Re-enabling Ray worker logs ***")
ray._private.worker._worker_logs_enabled = True
pdb = _PdbWrap(enable_logging)
frame = sys._getframe().f_back
pdb.set_trace(frame)
def _is_ray_debugger_post_mortem_enabled():
return os.environ.get("RAY_DEBUG_POST_MORTEM", "0") == "1"
def _post_mortem():
if os.environ.get("RAY_DEBUG", "1") == "1":
return ray.util.ray_debugpy._post_mortem()
rdb = _connect_ray_pdb(
host=None,
port=None,
patch_stdstreams=False,
quiet=None,
debugger_external=ray._private.worker.global_worker.ray_debugger_external,
)
rdb.post_mortem()
def _connect_pdb_client(host, port):
if sys.platform == "win32":
import msvcrt
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
while True:
# Get the list of sockets which are readable.
if sys.platform == "win32":
ready_to_read = select.select([s], [], [], 1)[0]
if msvcrt.kbhit():
ready_to_read.append(sys.stdin)
if not ready_to_read and not sys.stdin.isatty():
# in tests, when using pexpect, the pipe makes
# the msvcrt.kbhit() trick fail. Assume we are waiting
# for stdin, since this will block waiting for input
ready_to_read.append(sys.stdin)
else:
ready_to_read, write_sockets, error_sockets = select.select(
[sys.stdin, s], [], []
)
for sock in ready_to_read:
if sock == s:
# Incoming message from remote debugger.
data = sock.recv(4096)
if not data:
return
else:
sys.stdout.write(data.decode())
sys.stdout.flush()
else:
# User entered a message.
msg = sys.stdin.readline()
s.send(msg.encode())