diff --git a/README.md b/README.md index e00de1e84..03ed87d51 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ informal introduction to the features and their implementation. - [Customizing the Sandbox](#customizing-the-sandbox) - [Passthrough Modules](#passthrough-modules) - [Invalid Module Members](#invalid-module-members) + - [Debugging Workflows with `breakpoint()` / `pdb`](#debugging-workflows-with-breakpoint--pdb) - [Known Sandbox Issues](#known-sandbox-issues) - [Global Import/Builtins](#global-importbuiltins) - [Sandbox is not Secure](#sandbox-is-not-secure) @@ -1241,6 +1242,80 @@ my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_ See the API for more details on exact fields and their meaning. +##### Debugging Workflows with `breakpoint()` / `pdb` + +Setting `debug_mode=True` on the `Worker` (or `TEMPORAL_DEBUG=1` in the environment) routes workflow activations +onto the asyncio main thread instead of a worker thread pool. This lets `breakpoint()` and `pdb.set_trace()` +inside workflow code open an interactive REPL — without it, pdb hangs because its `input()` call would run on a +thread that does not own the controlling TTY. + +A minimal runnable example: + +```python +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker + + +@workflow.defn +class DebugMeWorkflow: + @workflow.run + async def run(self) -> str: + x = 42 + breakpoint() # interactive pdb prompt opens at this line + return f"x was {x}" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue="debug-me", + workflows=[DebugMeWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + DebugMeWorkflow.run, + id="debug-me-wf", + task_queue="debug-me", + task_timeout=timedelta(minutes=10), # see caveat below + ) + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Run with `python debug_me.py`, or under pytest with `pytest -s` (the `-s` flag disables pytest's stdin +capture). At the `(Pdb)` prompt you'll land at the line where `breakpoint()` was called, with workflow +locals in scope. Try `p x`, `n`, `c`, `q`. + +**Quitting cleanly.** Typing `q` or hitting Ctrl-D continues the workflow rather than raising `BdbQuit` +(which would fail the workflow task). To genuinely abort, kill the outer process with Ctrl-C. + +Two caveats when pausing at a breakpoint inside a workflow: + +1. **Workflow task timeout.** Temporal expires a workflow task after ~10 seconds by default. If you sit at the + `(Pdb)` prompt longer than that, the server reassigns the task and your workflow replays from the start when + you continue — re-hitting the breakpoint. Pass `task_timeout=timedelta(minutes=N)` to `execute_workflow` / + `start_workflow` to give yourself debugging headroom: + + ```python + await client.execute_workflow(MyWorkflow.run, ..., task_timeout=timedelta(minutes=10)) + ``` + +2. **Deterministic replay.** Workflows are deterministic and replay from history; any wall-clock pause violates + that contract. For post-mortem debugging without these caveats, use the [Replayer](#replayer) on a recorded + history instead of live debugging. + +Calling `breakpoint()` from sandboxed workflow code without `debug_mode` raises a sandbox +`RestrictedWorkflowAccessError` with a message pointing at `debug_mode=True`, so the failure mode is loud +and the fix is obvious. + ##### Known Sandbox Issues Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved. diff --git a/temporalio/worker/_debugger.py b/temporalio/worker/_debugger.py new file mode 100644 index 000000000..6f64da5c4 --- /dev/null +++ b/temporalio/worker/_debugger.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import dataclasses +import sys +from types import FrameType, TracebackType + +import temporalio.workflow +from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner + +from ._workflow_instance import WorkflowRunner + +__all__ = [ + "_install_workflow_breakpoint_hook", + "_relax_sandbox_for_debugger", + "_temporal_workflow_breakpoint_hook", +] + +_ORIGINAL_BREAKPOINTHOOK = sys.breakpointhook + + +def _build_workflow_pdb_class() -> type: + """Build a Pdb subclass that suspends sandbox restrictions during the REPL. + + pdb's cmdloop touches ``readline.get_completer`` and other + sandbox-restricted internals each time it interacts with the user; we + bracket each interaction with ``_sandbox_unrestricted.value = True`` and + restore the previous value afterwards. Outside the REPL the sandbox + stays intact. + + ``pdb`` is imported lazily because it's a debug-only dependency that + pulls in ``cmd``/``bdb``/``linecache``; no reason to pay that cost at + worker import time. + """ + import pdb + + from temporalio.workflow._sandbox import _sandbox_unrestricted + + class _WorkflowPdb(pdb.Pdb): + # The `interaction` signature differs across Python versions: 3.10-3.12 + # typeshed names the second parameter `traceback: TracebackType | None`, + # while 3.13+ renames it `tb_or_exc` and widens the type to include + # `BaseException`. No single signature satisfies both stubs, so we + # suppress the override check. + def interaction( # type: ignore[override] + self, + frame: FrameType | None, + tb_or_exc: TracebackType | BaseException | None, + ) -> None: + prev = getattr(_sandbox_unrestricted, "value", False) + _sandbox_unrestricted.value = True + try: + super().interaction(frame, tb_or_exc) # type: ignore[arg-type] + finally: + _sandbox_unrestricted.value = prev + + # Override `q`/`quit`/`exit`/EOF (Ctrl-D) to behave like `continue`. + # Default pdb raises `BdbQuit`, which propagates as an uncaught + # exception out of workflow.run, fails the workflow task, and + # triggers a server retry storm during teardown. For a debug + # session the user almost always wants "stop debugging and let the + # workflow finish" — that's `continue`. Users who truly want to + # abort can Ctrl-C the outer shell. + def do_quit(self, arg: str) -> bool | None: + self.message( + "[Temporal] 'q'/Ctrl-D continues the workflow. " + "Ctrl-C the outer shell to abort." + ) + return self.do_continue(arg) + + do_q = do_exit = do_quit + do_EOF = do_quit + + return _WorkflowPdb + + +def _temporal_workflow_breakpoint_hook(*args: object, **kwargs: object) -> object: + """``sys.breakpointhook`` that handles ``breakpoint()`` inside workflows. + + Only installed when ``debug_mode`` is enabled on the Worker. From inside + a workflow activation: drops the user into a custom Pdb at the workflow's + own frame, with sandbox restrictions suspended during the REPL. From + anywhere else (test code, helpers, etc.): delegates to whatever hook was + previously installed. + """ + if not temporalio.workflow.in_workflow(): + # Not inside a workflow activation — let pytest's wrapper, ipdb, or + # whatever else is configured handle it. + return _ORIGINAL_BREAKPOINTHOOK(*args, **kwargs) + # Inside a workflow: drop the user into pdb at the caller's frame (the + # workflow's `run` method, where breakpoint() was actually written) rather + # than landing inside this hook. Bypassing the configured breakpoint hook + # also avoids pytest's pdb wrapper, which assumes a test-code context and + # touches sandbox-restricted internals during its terminal-writer setup. + # `sandbox_unrestricted()` lifts member checks for the duration of the + # REPL so pdb's own initialization (readline, etc.) isn't blocked. + # `skip` tells pdb not to stop in our hook frame or the contextlib + # plumbing — without it pdb's first step lands at the `with` teardown + # instead of the user's next workflow line. + caller_frame = sys._getframe(1) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + pdb_cls = _build_workflow_pdb_class() + pdb_cls( + skip=[ + "temporalio.worker._debugger", + "temporalio.workflow._sandbox", + "contextlib", + ] + ).set_trace(caller_frame) + return None + + +def _install_workflow_breakpoint_hook() -> None: + """Set ``sys.breakpointhook`` to the workflow hook if it isn't already.""" + if sys.breakpointhook is not _temporal_workflow_breakpoint_hook: + sys.breakpointhook = _temporal_workflow_breakpoint_hook + + +def _relax_sandbox_for_debugger(workflow_runner: WorkflowRunner) -> WorkflowRunner: + """Allow ``breakpoint()`` past the sandbox so it can reach the worker hook. + + The sandbox flags ``breakpoint`` as non-deterministic by default; without + this relaxation the call raises before our breakpoint hook can run. + Once inside the hook, the hook itself enters ``sandbox_unrestricted()`` + for the duration of the debugger session, so pdb's internals (readline, + os.environ, etc.) aren't blocked either — without permanently dropping + sandbox checks for the rest of workflow execution. + """ + if not isinstance(workflow_runner, SandboxedWorkflowRunner): + return workflow_runner + + restrictions = workflow_runner.restrictions + invalid = restrictions.invalid_module_members + builtins_matcher = invalid.children.get("__builtins__") + if builtins_matcher is None: + return workflow_runner + + # `breakpoint` may sit either in `children` (as a leaf matcher with a + # custom error message) or in `use` (the legacy flat form). Strip from + # whichever shape is present. + has_child = "breakpoint" in builtins_matcher.children + has_use = "breakpoint" in builtins_matcher.use + if not (has_child or has_use): + return workflow_runner + + new_children = { + k: v for k, v in builtins_matcher.children.items() if k != "breakpoint" + } + new_use = set(builtins_matcher.use) - {"breakpoint"} + new_builtins = dataclasses.replace( + builtins_matcher, children=new_children, use=new_use + ) + new_invalid = dataclasses.replace( + invalid, children={**invalid.children, "__builtins__": new_builtins} + ) + new_restrictions = dataclasses.replace( + restrictions, invalid_module_members=new_invalid + ) + return dataclasses.replace(workflow_runner, restrictions=new_restrictions) diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 9e2ac9c7b..9ca802c40 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -32,6 +32,10 @@ from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner from . import _command_aware_visitor +from ._debugger import ( + _install_workflow_breakpoint_hook, + _relax_sandbox_for_debugger, +) from ._interceptor import ( Interceptor, WorkflowInboundInterceptor, @@ -49,6 +53,7 @@ # Set to true to log all activations and completions LOG_PROTOS = False + # Value was chosen abitrarily as a small number that allows some concurrency and prevents # large numbers of concurrent external storage operations causing resource contention. # This default limit is per workflow task activation and does not limit the total number @@ -111,6 +116,13 @@ def __init__( ), ) + # In debug mode, also lift the sandbox restriction on breakpoint() + # and install the workflow-aware breakpoint hook so pdb works in + # workflow code. Outside of debug mode neither happens. + self._debug_mode = debug_mode + if self._debug_mode: + workflow_runner = _relax_sandbox_for_debugger(workflow_runner) + _install_workflow_breakpoint_hook() self._workflow_runner = workflow_runner self._unsandboxed_workflow_runner = unsandboxed_workflow_runner @@ -145,7 +157,7 @@ def __init__( # If debug mode is enabled, disable deadlock detection # otherwise set to 2 seconds - self._deadlock_timeout_seconds = None if debug_mode else 2 + self._deadlock_timeout_seconds = None if self._debug_mode else 2 # Keep track of workflows that could not be evicted self._could_not_evict_count = 0 @@ -255,6 +267,34 @@ async def drain_poll_queue(self) -> None: except PollShutdownError: return + async def _activate_inline_for_debug( + self, + loop: asyncio.AbstractEventLoop, + workflow: _RunningWorkflow, + act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: + # Indirect through call_soon + a future so the activation runs outside + # the dispatch task's __step() context. Python 3.14 refuses to enter a + # task while another on the same thread is mid-step; suspending at the + # await below clears that state so workflow.activate can step its own + # task without collision. + future: asyncio.Future = loop.create_future() + + def run_inline() -> None: + # _run_once clears the running-loop registration on exit; restore + # the main loop so later code sees the right one. + main_loop = asyncio._get_running_loop() + try: + completion = workflow.activate(act) + future.set_result(completion) + except BaseException as e: + future.set_exception(e) + finally: + asyncio._set_running_loop(main_loop) + + loop.call_soon(run_inline) + return await future + async def _handle_activation( self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation ) -> None: @@ -344,35 +384,43 @@ async def _handle_activation( ) self._running_workflows[act.run_id] = workflow - # Run activation in separate thread so we can check if it's - # deadlocked - activate_task = asyncio.get_running_loop().run_in_executor( - self._workflow_task_executor, - workflow.activate, - act, - ) - - # Run activation task with deadlock timeout - try: - completion = await asyncio.wait_for( - activate_task, self._deadlock_timeout_seconds + if self._debug_mode: + # Inline on the main thread so pdb / breakpoint() can read + # stdin. The loop blocks during the activation — that's the + # intended single-stepping semantic. + completion = await self._activate_inline_for_debug( + asyncio.get_running_loop(), workflow, act ) - except asyncio.TimeoutError: - # Need to create the deadlock exception up here so it - # captures the trace now instead of later after we may have - # interrupted it - deadlock_exc = _DeadlockError.from_deadlocked_workflow( - workflow.instance, self._deadlock_timeout_seconds + else: + # Run activation in separate thread so we can check if it's + # deadlocked + activate_task = asyncio.get_running_loop().run_in_executor( + self._workflow_task_executor, + workflow.activate, + act, ) - # When we deadlock, we will raise an exception to fail - # the task. But before we do that, we want to try to - # interrupt the thread and put this activation task on - # the workflow so that the successive eviction can wait - # on it before trying to evict. - workflow.attempt_deadlock_interruption() - # Set the task and raise - workflow.deadlocked_activation_task = activate_task - raise deadlock_exc from None + + # Run activation task with deadlock timeout + try: + completion = await asyncio.wait_for( + activate_task, self._deadlock_timeout_seconds + ) + except asyncio.TimeoutError: + # Need to create the deadlock exception up here so it + # captures the trace now instead of later after we may have + # interrupted it + deadlock_exc = _DeadlockError.from_deadlocked_workflow( + workflow.instance, self._deadlock_timeout_seconds + ) + # When we deadlock, we will raise an exception to fail + # the task. But before we do that, we want to try to + # interrupt the thread and put this activation task on + # the workflow so that the successive eviction can wait + # on it before trying to evict. + workflow.attempt_deadlock_interruption() + # Set the task and raise + workflow.deadlocked_activation_task = activate_task + raise deadlock_exc from None except Exception as err: if isinstance(err, _DeadlockError): @@ -590,22 +638,27 @@ async def _handle_cache_eviction( handle_eviction_task: asyncio.Future | None = None while True: try: - # We only create the eviction task if we haven't already or - # it is done. This is because if it already is running and - # timed out, it's still running (and holding on to a - # thread). But if did complete running but failed with - # another error, we want to re-create the task. - if not handle_eviction_task or handle_eviction_task.done(): - handle_eviction_task = ( - asyncio.get_running_loop().run_in_executor( - self._workflow_task_executor, - workflow.activate, - act, + if self._debug_mode: + await self._activate_inline_for_debug( + asyncio.get_running_loop(), workflow, act + ) + else: + # We only create the eviction task if we haven't already or + # it is done. This is because if it already is running and + # timed out, it's still running (and holding on to a + # thread). But if did complete running but failed with + # another error, we want to re-create the task. + if not handle_eviction_task or handle_eviction_task.done(): + handle_eviction_task = ( + asyncio.get_running_loop().run_in_executor( + self._workflow_task_executor, + workflow.activate, + act, + ) ) + await asyncio.wait_for( + handle_eviction_task, self._deadlock_timeout_seconds ) - await asyncio.wait_for( - handle_eviction_task, self._deadlock_timeout_seconds - ) # Break if it succeeds break except BaseException as err: diff --git a/temporalio/worker/workflow_sandbox/_importer.py b/temporalio/worker/workflow_sandbox/_importer.py index 42f0e06b2..1ab0a1dd6 100644 --- a/temporalio/worker/workflow_sandbox/_importer.py +++ b/temporalio/worker/workflow_sandbox/_importer.py @@ -83,7 +83,15 @@ def restrict_built_in(name: str, orig: Any, *args: Any, **kwargs: Any): ) and not temporalio.workflow.unsafe.is_sandbox_unrestricted() ): - raise RestrictedWorkflowAccessError(f"__builtins__.{name}") + # If a per-builtin child matcher carries a custom + # leaf_message (e.g. directing the user to debug_mode for + # breakpoint()), surface that instead of the generic + # pass-through-modules advice. + child = builtin_matcher.children.get(name) + override_message = child.leaf_message if child else None + raise RestrictedWorkflowAccessError( + f"__builtins__.{name}", override_message=override_message + ) return orig(*args, **kwargs) for k in dir(builtins): diff --git a/temporalio/worker/workflow_sandbox/_restrictions.py b/temporalio/worker/workflow_sandbox/_restrictions.py index d53ceabd6..78b7a0363 100644 --- a/temporalio/worker/workflow_sandbox/_restrictions.py +++ b/temporalio/worker/workflow_sandbox/_restrictions.py @@ -551,11 +551,19 @@ def _public_callables(parent: Any, *, exclude: set[str] = set()) -> set[str]: SandboxRestrictions.invalid_module_members_default = SandboxMatcher( children={ "__builtins__": SandboxMatcher( - use={ - "breakpoint", - "input", - "open", + children={ + "breakpoint": SandboxMatcher( + match_self=True, + only_runtime=True, + leaf_message=( + "breakpoint() inside workflow code requires " + "debug_mode=True on the Worker (or the " + "TEMPORAL_DEBUG environment variable). Without it, " + "the call cannot reach the debugger." + ), + ), }, + use={"input", "open"}, # Too many things use open() at import time, e.g. pytest's assertion # rewriter only_runtime=True, diff --git a/tests/worker/test_breakpoint_hang.py b/tests/worker/test_breakpoint_hang.py new file mode 100644 index 000000000..29f3cb3f7 --- /dev/null +++ b/tests/worker/test_breakpoint_hang.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +import pdb +import threading +import uuid +from types import FrameType +from typing import Any +from unittest.mock import patch + +import pytest + +from temporalio import workflow +from temporalio.client import Client, WorkflowFailureError +from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox._restrictions import ( + RestrictedWorkflowAccessError, +) + + +@workflow.defn(sandboxed=False) +class ThreadCaptureWorkflow: + """Returns the name of the thread the workflow runs on. + + `sandboxed=False` so `threading.current_thread()` isn't intercepted — + these tests are about thread placement, not sandbox behavior. + """ + + @workflow.run + async def run(self) -> str: + return threading.current_thread().name + + +async def test_workflow_runs_on_pool_thread_without_debug_mode(client: Client): + """Production behavior unchanged: workflows run on `temporal_workflow_*`.""" + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ThreadCaptureWorkflow], + ): + thread_name = await client.execute_workflow( + ThreadCaptureWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + main_name = threading.main_thread().name + assert thread_name != main_name, ( + f"workflow ran on the main thread ({main_name!r}) — production behavior changed" + ) + assert thread_name.startswith("temporal_workflow_"), ( + f"expected pool thread, got {thread_name!r}" + ) + + +async def test_workflow_runs_on_main_thread_in_debug_mode(client: Client): + """debug_mode=True moves workflow activation to the asyncio main thread + so pdb's input() reaches the controlling TTY.""" + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ThreadCaptureWorkflow], + debug_mode=True, + ): + thread_name = await client.execute_workflow( + ThreadCaptureWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + main_name = threading.main_thread().name + assert thread_name == main_name, ( + f"expected workflow on main thread ({main_name!r}) in debug mode; " + f"got {thread_name!r}" + ) + + +@workflow.defn +class SandboxedBreakpointWorkflow: + """Sandboxed workflow that calls breakpoint() — verifies the fix works + without requiring users to switch to UnsandboxedWorkflowRunner.""" + + @workflow.run + async def run(self) -> str: + bird = "chicken" + breakpoint() + return f"bird was {bird}" + + +async def test_breakpoint_works_in_sandboxed_workflow_in_debug_mode(client: Client): + """breakpoint() inside a sandboxed workflow reaches the debugger when + debug_mode=True — no need to switch to UnsandboxedWorkflowRunner. + + Patches `pdb.Pdb.set_trace` with a stub so CI doesn't hang on an + interactive prompt. Reaching the stub on `MainThread` with the + workflow's `run` frame proves the full path (sandbox relaxation -> + our hook -> pdb) works through the sandbox. Also verifies workflow + locals (`bird`) are visible in the captured frame. + """ + captured: dict[str, object] = {} + + def stub_set_trace(_self: pdb.Pdb, frame: FrameType | None = None) -> None: + captured["thread"] = threading.current_thread().name + captured["frame_name"] = frame.f_code.co_name if frame else None + captured["bird"] = frame.f_locals.get("bird") if frame else None + captured["called"] = True + + task_queue = f"tq-{uuid.uuid4()}" + with patch.object(pdb.Pdb, "set_trace", stub_set_trace): + async with Worker( + client, + task_queue=task_queue, + workflows=[SandboxedBreakpointWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + SandboxedBreakpointWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "bird was chicken", ( + f"workflow did not complete; breakpoint() likely raised inside the sandbox: " + f"result={result!r}" + ) + assert captured.get("called"), "pdb.Pdb.set_trace was never reached" + assert captured["thread"] == threading.main_thread().name, ( + f"breakpoint landed on {captured['thread']!r}, not the main thread" + ) + assert captured["frame_name"] == "run", ( + f"breakpoint stopped at frame {captured['frame_name']!r}, " + f"expected the workflow's `run` method" + ) + assert captured["bird"] == "chicken", ( + f"workflow local `bird` not visible in pdb frame: got {captured['bird']!r}" + ) + + +async def test_breakpoint_quit_continues_workflow_in_debug_mode(client: Client): + """Typing `q` (or hitting Ctrl-D) in a workflow pdb session should + continue the workflow rather than failing the workflow task with + BdbQuit. The hook overrides `do_quit`/`do_EOF` to call `do_continue` + instead, so a debug session ends cleanly. + + Drives pdb via `cmdqueue` so no real stdin is needed. The first + iteration of cmdloop sees `q`, which dispatches to our overridden + `do_quit` -> `do_continue`. The workflow then completes normally. + """ + captured: dict[str, object] = {} + + class _AutoQuitPdb(pdb.Pdb): + """Pdb subclass that pre-queues `q` and captures frame state on + entry to `interaction`.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.cmdqueue = ["q"] + + def interaction( # type: ignore[override] + self, frame: FrameType | None, traceback: Any + ) -> Any: + if frame is not None: + captured["frame_name"] = frame.f_code.co_name + captured["bird"] = frame.f_locals.get("bird") + return super().interaction(frame, traceback) + + task_queue = f"tq-{uuid.uuid4()}" + with patch("pdb.Pdb", _AutoQuitPdb): + async with Worker( + client, + task_queue=task_queue, + workflows=[SandboxedBreakpointWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + SandboxedBreakpointWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "bird was chicken", ( + f"workflow did not complete after `q`; `BdbQuit` likely propagated: " + f"result={result!r}" + ) + assert captured.get("frame_name") == "run", ( + f"pdb didn't stop in workflow.run frame: got {captured.get('frame_name')!r}" + ) + assert captured.get("bird") == "chicken", ( + f"workflow local `bird` not visible at pdb breakpoint: " + f"got {captured.get('bird')!r}" + ) + + +async def test_sandboxed_breakpoint_points_at_debug_mode(client: Client): + """Without `debug_mode`, calling `breakpoint()` in a sandboxed workflow + should raise the sandbox's restricted-access error with a message that + directs the user at `debug_mode=True` (rather than the generic + pass-through advice that doesn't apply here). + + `workflow_failure_exception_types=[RestrictedWorkflowAccessError]` makes + the sandbox error terminal for the workflow execution instead of + triggering Temporal's normal task-retry loop, so the test gets the + failure surfaced promptly without waiting on a timeout. + """ + task_queue = f"tq-{uuid.uuid4()}" + with pytest.raises(WorkflowFailureError) as exc_info: + async with Worker( + client, + task_queue=task_queue, + workflows=[SandboxedBreakpointWorkflow], + workflow_failure_exception_types=[RestrictedWorkflowAccessError], + ): + await client.execute_workflow( + SandboxedBreakpointWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + cause_msg = str(exc_info.value.cause) + assert "debug_mode=True" in cause_msg, ( + f"sandbox error didn't point at debug_mode: {cause_msg!r}" + )