From 6983993e942a3c0c2e1000ba84b2dbecc3a1f128 Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Wed, 15 Apr 2026 13:24:03 -0400 Subject: [PATCH] feat: Add `pty` support to Vercel sandbox --- examples/sandbox/extensions/README.md | 9 +- examples/sandbox/extensions/vercel_runner.py | 402 +++++++++++++++--- pyproject.toml | 2 +- .../extensions/sandbox/vercel/sandbox.py | 271 +++++++++++- tests/extensions/test_sandbox_vercel.py | 173 +++++++- uv.lock | 8 +- 6 files changed, 799 insertions(+), 66 deletions(-) diff --git a/examples/sandbox/extensions/README.md b/examples/sandbox/extensions/README.md index 837d9dfa28..05c2ba07b6 100644 --- a/examples/sandbox/extensions/README.md +++ b/examples/sandbox/extensions/README.md @@ -211,14 +211,17 @@ uv run python examples/sandbox/extensions/vercel_runner.py --stream Useful flags: +- `--demo pty` +- `--demo port` +- `--demo backend-checks` - `--workspace-persistence tar` - `--workspace-persistence snapshot` - `--runtime node22` - `--timeout-ms 120000` -The Vercel example stays on the non-PTY path on purpose. It covers command -execution, workspace materialization, and persistence verification without -depending on interactive websocket support. +The default Vercel run mirrors the other cloud runners and stays focused on the +standard agent flow. Use `--demo` for backend-specific checks such as PTY, +resume verification, and exposed-port validation. ## Daytona diff --git a/examples/sandbox/extensions/vercel_runner.py b/examples/sandbox/extensions/vercel_runner.py index 9d33bf1fe4..8a9ef0b102 100644 --- a/examples/sandbox/extensions/vercel_runner.py +++ b/examples/sandbox/extensions/vercel_runner.py @@ -1,9 +1,5 @@ """ -Minimal Vercel-backed sandbox example for manual validation. - -This mirrors the other cloud extension examples: it creates a tiny workspace, -verifies stop/resume persistence, then asks a sandboxed agent to inspect the -workspace through one shell tool. +Vercel-backed sandbox example for manual validation. """ from __future__ import annotations @@ -20,18 +16,19 @@ from pathlib import Path from typing import Literal, cast -from openai.types.responses import ResponseTextDeltaEvent +from openai.types.responses import ResponseCompletedEvent, ResponseTextDeltaEvent -from agents import ModelSettings, Runner -from agents.models.openai_provider import OpenAIProvider +from agents import ModelSettings, MultiProvider, Runner from agents.run import RunConfig from agents.sandbox import LocalSnapshotSpec, Manifest, SandboxAgent, SandboxRunConfig +from agents.sandbox.capabilities import Shell +from agents.sandbox.entries import File from agents.sandbox.session import BaseSandboxSession if __package__ is None or __package__ == "": sys.path.insert(0, str(Path(__file__).resolve().parents[3])) -from examples.sandbox.misc.example_support import text_manifest +from examples.sandbox.misc.example_support import text_manifest, tool_call_name from examples.sandbox.misc.workspace_shell import WorkspaceShellCapability try: @@ -43,11 +40,18 @@ ) from exc +DEFAULT_MODEL = "gpt-5.4" DEFAULT_QUESTION = "Summarize this cloud sandbox workspace in 2 sentences." +DEFAULT_PTY_QUESTION = ( + "Start an interactive Python session with `tty=true`. In that same session, compute " + "`5 + 5`, then add 5 more to the previous result. Briefly report the outputs and " + "confirm that you stayed in one Python process." +) SNAPSHOT_CHECK_PATH = Path("snapshot-check.txt") SNAPSHOT_CHECK_CONTENT = "vercel snapshot round-trip ok\n" LIVE_RESUME_CHECK_PATH = Path("live-resume-check.txt") LIVE_RESUME_CHECK_CONTENT = "vercel live resume ok\n" +PTY_CHECK_VALUE = "vercel pty round-trip ok" EXPOSED_PORT = 3000 PORT_CHECK_CONTENT = "

vercel exposed port ok

\n" PORT_CHECK_NODE_SERVER_PATH = Path(".port-check-server.js") @@ -93,8 +97,8 @@ def _build_manifest() -> Manifest: "handoff.md": ( "# Handoff\n\n" "- Customer: Northwind Traders.\n" - "- Goal: validate Vercel sandbox exec and persistence flows.\n" - "- Current status: non-PTY backend slice is wired and under test.\n" + "- Goal: validate Vercel sandbox exec, PTY, and persistence flows.\n" + "- Current status: backend slice is wired and under test.\n" ), "todo.md": ( "# Todo\n\n" @@ -105,6 +109,36 @@ def _build_manifest() -> Manifest: ) +def _build_pty_manifest() -> Manifest: + return Manifest( + entries={ + "README.md": File( + content=( + b"# Vercel PTY Agent Example\n\n" + b"This workspace is used by the Vercel PTY demo.\n" + ) + ), + } + ) + + +def _stream_event_banner(event_name: str, raw_item: object) -> str | None: + _ = raw_item + if event_name == "tool_called": + return "[tool call]" + if event_name == "tool_output": + return "[tool output]" + return None + + +def _raw_item_call_id(raw_item: object) -> str | None: + if isinstance(raw_item, dict): + call_id = raw_item.get("call_id") or raw_item.get("id") + else: + call_id = getattr(raw_item, "call_id", None) or getattr(raw_item, "id", None) + return call_id if isinstance(call_id, str) and call_id else None + + async def _read_text(session: BaseSandboxSession, path: Path) -> str: data = await session.read(path) text = cast(str | bytes, data.read()) @@ -134,6 +168,31 @@ def _require_vercel_credentials() -> None: ) +def _build_options( + *, + runtime: str | None, + timeout_ms: int | None, + workspace_persistence: Literal["tar", "snapshot"], + interactive: bool = False, + exposed_ports: tuple[int, ...] = (), +) -> VercelSandboxClientOptions: + return VercelSandboxClientOptions( + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + interactive=interactive, + exposed_ports=exposed_ports, + ) + + +def _build_run_config(*, sandbox: BaseSandboxSession, workflow_name: str) -> RunConfig: + return RunConfig( + sandbox=SandboxRunConfig(session=sandbox), + workflow_name=workflow_name, + model_provider=MultiProvider(openai_prefix_mode="model_id"), + ) + + async def _verify_stop_resume( *, manifest: Manifest, @@ -142,7 +201,7 @@ async def _verify_stop_resume( workspace_persistence: Literal["tar", "snapshot"], ) -> None: client = VercelSandboxClient() - options = VercelSandboxClientOptions( + options = _build_options( runtime=runtime, timeout_ms=timeout_ms, workspace_persistence=workspace_persistence, @@ -189,7 +248,7 @@ async def _verify_resume_running_sandbox( client = VercelSandboxClient() sandbox = await client.create( manifest=manifest, - options=VercelSandboxClientOptions( + options=_build_options( runtime=runtime, timeout_ms=timeout_ms, workspace_persistence=workspace_persistence, @@ -249,7 +308,7 @@ async def _verify_exposed_port( client = VercelSandboxClient() sandbox = await client.create( manifest=manifest, - options=VercelSandboxClientOptions( + options=_build_options( runtime=runtime, timeout_ms=timeout_ms, workspace_persistence=workspace_persistence, @@ -267,10 +326,7 @@ async def _verify_exposed_port( PORT_CHECK_PYTHON_SERVER_PATH, io.BytesIO(PORT_CHECK_PYTHON_SERVER_CONTENT.encode("utf-8")), ) - result = await sandbox.exec( - _port_check_server_command(), - shell=True, - ) + result = await sandbox.exec(_port_check_server_command(), shell=True) if not result.ok(): raise RuntimeError( f"Failed to start HTTP server for exposed port check: {result.stderr!r}" @@ -298,39 +354,173 @@ async def _verify_exposed_port( await sandbox.shutdown() -async def main( +async def _verify_pty_direct( *, - model: str, - question: str, + manifest: Manifest, runtime: str | None, timeout_ms: int | None, workspace_persistence: Literal["tar", "snapshot"], - stream: bool, ) -> None: - _require_env("OPENAI_API_KEY") - _require_vercel_credentials() - - manifest = _build_manifest() - - await _verify_stop_resume( + client = VercelSandboxClient() + sandbox = await client.create( manifest=manifest, - runtime=runtime, - timeout_ms=timeout_ms, - workspace_persistence=workspace_persistence, + options=_build_options( + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + interactive=True, + ), ) - await _verify_resume_running_sandbox( - manifest=manifest, - runtime=runtime, - timeout_ms=timeout_ms, - workspace_persistence=workspace_persistence, + + try: + await sandbox.start() + if not sandbox.supports_pty(): + raise RuntimeError("Interactive Vercel sandbox did not report PTY support.") + + started = await sandbox.pty_exec_start("sh", shell=False, yield_time_s=0.25) + process_id = started.process_id + if process_id is None: + raise RuntimeError( + f"PTY session exited too early during startup: output={started.output!r}, " + f"exit_code={started.exit_code!r}" + ) + + await sandbox.pty_write_stdin( + session_id=process_id, + chars=f"export PTY_CHECK_VALUE={PTY_CHECK_VALUE!r}\n", + yield_time_s=0.25, + ) + completed = await sandbox.pty_write_stdin( + session_id=process_id, + chars='printf "%s\\n" "$PTY_CHECK_VALUE"\nexit\n', + yield_time_s=0.5, + ) + + if completed.exit_code != 0: + raise RuntimeError( + f"PTY verification exited with {completed.exit_code}: {completed.output!r}" + ) + if PTY_CHECK_VALUE not in completed.output.decode("utf-8", errors="replace"): + raise RuntimeError( + f"PTY verification did not observe persisted shell state: {completed.output!r}" + ) + finally: + try: + await sandbox.pty_terminate_all() + finally: + await sandbox.shutdown() + + print(f"pty round-trip ok ({workspace_persistence})") + + +async def _run_pty_demo( + *, + model: str, + question: str, + runtime: str | None, + timeout_ms: int | None, + workspace_persistence: Literal["tar", "snapshot"], +) -> None: + agent = SandboxAgent( + name="Vercel PTY Demo", + model=model, + instructions=( + "Complete the task by interacting with the sandbox through the shell capability. " + "Keep the final answer concise. " + "Preserve process state when the task depends on it. If you start an interactive " + "program, continue using that same process instead of launching a second one." + ), + default_manifest=_build_pty_manifest(), + capabilities=[Shell()], + model_settings=ModelSettings(tool_choice="required"), ) - await _verify_exposed_port( - manifest=manifest, - runtime=runtime, - timeout_ms=timeout_ms, - workspace_persistence=workspace_persistence, + + client = VercelSandboxClient() + sandbox = await client.create( + manifest=agent.default_manifest, + options=_build_options( + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + interactive=True, + ), ) + try: + async with sandbox: + result = Runner.run_streamed( + agent, + question, + run_config=_build_run_config( + sandbox=sandbox, + workflow_name="Vercel PTY sandbox example", + ), + ) + + saw_text_delta = False + saw_any_text = False + tool_names_by_call_id: dict[str, str] = {} + + async for event in result.stream_events(): + if event.type == "raw_response_event" and isinstance( + event.data, ResponseTextDeltaEvent + ): + if not saw_text_delta: + print("assistant> ", end="", flush=True) + saw_text_delta = True + print(event.data.delta, end="", flush=True) + saw_any_text = True + continue + if event.type == "raw_response_event" and isinstance( + event.data, ResponseCompletedEvent + ): + continue + + if event.type != "run_item_stream_event": + continue + + raw_item = event.item.raw_item + banner = _stream_event_banner(event.name, raw_item) + if banner is None: + continue + + if saw_text_delta: + print() + saw_text_delta = False + + if event.name == "tool_called": + tool_name = tool_call_name(raw_item) + call_id = _raw_item_call_id(raw_item) + if call_id is not None and tool_name: + tool_names_by_call_id[call_id] = tool_name + if tool_name: + banner = f"{banner} {tool_name}" + elif event.name == "tool_output": + call_id = _raw_item_call_id(raw_item) + output_tool_name = tool_names_by_call_id.get(call_id or "") + if output_tool_name: + banner = f"{banner} {output_tool_name}" + + print(banner) + + if saw_text_delta: + print() + if not saw_any_text: + print(result.final_output) + finally: + await client.delete(sandbox) + + +async def _run_standard_agent( + *, + model: str, + question: str, + runtime: str | None, + timeout_ms: int | None, + workspace_persistence: Literal["tar", "snapshot"], + stream: bool, +) -> None: + manifest = _build_manifest() agent = SandboxAgent( name="Vercel Sandbox Assistant", model=model, @@ -348,24 +538,19 @@ async def main( client = VercelSandboxClient() sandbox = await client.create( manifest=manifest, - options=VercelSandboxClientOptions( + options=_build_options( runtime=runtime, timeout_ms=timeout_ms, workspace_persistence=workspace_persistence, ), ) - run_config = RunConfig( - model_provider=OpenAIProvider(), - sandbox=SandboxRunConfig(session=sandbox), - # Disable tracing because it does not currently work reliably with alternate - # upstreams such as AI Gateway, and provider config already comes from env. - tracing_disabled=True, - workflow_name="Vercel sandbox example", - ) - try: async with sandbox: + run_config = _build_run_config( + sandbox=sandbox, + workflow_name="Vercel sandbox example", + ) if not stream: result = await Runner.run(agent, question, run_config=run_config) print(result.final_output) @@ -381,6 +566,10 @@ async def main( print("assistant> ", end="", flush=True) saw_text_delta = True print(event.data.delta, end="", flush=True) + elif event.type == "raw_response_event" and isinstance( + event.data, ResponseCompletedEvent + ): + continue if saw_text_delta: print() @@ -388,10 +577,109 @@ async def main( await client.delete(sandbox) +async def main( + *, + model: str, + question: str, + runtime: str | None, + timeout_ms: int | None, + workspace_persistence: Literal["tar", "snapshot"], + stream: bool, + demo: str, +) -> None: + _require_env("OPENAI_API_KEY") + _require_vercel_credentials() + + manifest = _build_manifest() + + if demo == "snapshot": + await _verify_stop_resume( + manifest=manifest, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + return + + if demo == "resume": + await _verify_resume_running_sandbox( + manifest=manifest, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + return + + if demo == "port": + await _verify_exposed_port( + manifest=manifest, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + return + + if demo == "pty": + await _run_pty_demo( + model=model, + question=question, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + return + + if demo == "backend-checks": + await _verify_stop_resume( + manifest=manifest, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + await _verify_resume_running_sandbox( + manifest=manifest, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + await _verify_exposed_port( + manifest=manifest, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + await _verify_pty_direct( + manifest=manifest, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + ) + return + + await _run_standard_agent( + model=model, + question=question, + runtime=runtime, + timeout_ms=timeout_ms, + workspace_persistence=workspace_persistence, + stream=stream, + ) + + if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--model", default="gpt-5.4", help="Model name to use.") + parser = argparse.ArgumentParser( + description=( + "Run a Vercel sandbox agent with optional resume, exposed-port, and PTY demos." + ) + ) + parser.add_argument("--model", default=DEFAULT_MODEL, help="Model name to use.") parser.add_argument("--question", default=DEFAULT_QUESTION, help="Prompt to send to the agent.") + parser.add_argument( + "--demo", + default="agent", + choices=["agent", "snapshot", "resume", "port", "pty", "backend-checks"], + help="Which demo to run (default: agent).", + ) parser.add_argument( "--runtime", default=None, @@ -407,18 +695,22 @@ async def main( "--workspace-persistence", choices=("tar", "snapshot"), default="tar", - help="Workspace persistence mode to verify before the agent run.", + help="Workspace persistence mode for the Vercel sandbox.", ) parser.add_argument("--stream", action="store_true", default=False, help="Stream the response.") args = parser.parse_args() + default_question = ( + DEFAULT_PTY_QUESTION if args.demo == "pty" and args.question == DEFAULT_QUESTION else None + ) asyncio.run( main( model=args.model, - question=args.question, + question=default_question or args.question, runtime=args.runtime, timeout_ms=args.timeout_ms, workspace_persistence=cast(Literal["tar", "snapshot"], args.workspace_persistence), stream=args.stream, + demo=args.demo, ) ) diff --git a/pyproject.toml b/pyproject.toml index b016977639..ab6c5f4616 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ cloudflare = ["aiohttp>=3.12,<4"] e2b = ["e2b==2.20.0", "e2b-code-interpreter==2.4.1"] modal = ["modal==1.3.5"] runloop = ["runloop_api_client>=1.16.0,<2.0.0"] -vercel = ["vercel>=0.5.6,<0.6"] +vercel = ["vercel>=0.5.7,<0.6"] s3 = ["boto3>=1.34"] temporal = [ "temporalio==1.25.0", diff --git a/src/agents/extensions/sandbox/vercel/sandbox.py b/src/agents/extensions/sandbox/vercel/sandbox.py index 6bc14876a3..58e317a9e7 100644 --- a/src/agents/extensions/sandbox/vercel/sandbox.py +++ b/src/agents/extensions/sandbox/vercel/sandbox.py @@ -14,10 +14,14 @@ import asyncio import io import json +import logging import os import tarfile +import time import uuid +from collections import deque from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field from pathlib import Path, PurePosixPath from typing import Any, Literal, cast from urllib.parse import urlsplit @@ -31,6 +35,7 @@ SandboxStatus, SnapshotSource, ) +from vercel.sandbox.pty import AsyncPTYSession from ....sandbox.errors import ( ConfigurationError, @@ -51,6 +56,16 @@ from ....sandbox.session.base_sandbox_session import BaseSandboxSession from ....sandbox.session.dependencies import Dependencies from ....sandbox.session.manager import Instrumentation +from ....sandbox.session.pty_types import ( + PTY_PROCESSES_MAX, + PTY_PROCESSES_WARNING, + PtyExecUpdate, + allocate_pty_process_id, + clamp_pty_yield_time_ms, + process_id_to_prune_from_meta, + resolve_pty_write_yield_time_ms, + truncate_text_by_tokens, +) from ....sandbox.session.sandbox_client import BaseSandboxClient, BaseSandboxClientOptions from ....sandbox.snapshot import SnapshotBase, SnapshotSpec, resolve_snapshot from ....sandbox.types import ExecResult, ExposedPortEndpoint, User @@ -77,6 +92,19 @@ httpx.NetworkError, httpx.ProtocolError, ) +logger = logging.getLogger(__name__) + + +@dataclass +class _VercelPtySessionEntry: + session: Any + output_chunks: deque[bytes] = field(default_factory=deque) + output_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + output_notify: asyncio.Event = field(default_factory=asyncio.Event) + reader_task: asyncio.Task[None] | None = None + done: bool = False + exit_code: int | None = None + last_used: float = field(default_factory=time.monotonic) def _is_transient_create_error(exc: BaseException) -> bool: @@ -245,6 +273,9 @@ class VercelSandboxSession(BaseSandboxSession): state: VercelSandboxSessionState _sandbox: Any | None _token: str | None + _pty_lock: asyncio.Lock + _pty_sessions: dict[int, _VercelPtySessionEntry] + _reserved_pty_process_ids: set[int] def __init__( self, @@ -256,6 +287,9 @@ def __init__( self.state = state self._sandbox = sandbox self._token = token + self._pty_lock = asyncio.Lock() + self._pty_sessions = {} + self._reserved_pty_process_ids = set() @classmethod def from_state( @@ -268,7 +302,7 @@ def from_state( return cls(state=state, sandbox=sandbox, token=token) def supports_pty(self) -> bool: - return False + return self.state.interactive def _reject_user_arg(self, *, op: Literal["exec", "read", "write"], user: str | User) -> None: user_name = user.name if isinstance(user, User) else user @@ -453,8 +487,243 @@ async def running(self) -> bool: return bool(sandbox.status == SandboxStatus.RUNNING) async def shutdown(self) -> None: + await self.pty_terminate_all() await self._stop_attached_sandbox() + async def pty_exec_start( + self, + *command: str | Path, + timeout: float | None = None, + shell: bool | list[str] = True, + user: str | User | None = None, + tty: bool = False, + yield_time_s: float | None = None, + max_output_tokens: int | None = None, + ) -> PtyExecUpdate: + _ = tty + sandbox = await self._ensure_sandbox() + sanitized = self._prepare_exec_command(*command, shell=shell, user=user) + connect_timeout = 30.0 if timeout is None else timeout + + entry = _VercelPtySessionEntry(session=None) + registered = False + pruned: _VercelPtySessionEntry | None = None + process_count = 0 + + try: + session = await AsyncPTYSession.open( + sandbox, + sanitized, + cwd=self.state.manifest.root, + _connection_timeout=connect_timeout, + ) + await session.ready() + entry.session = session + entry.reader_task = asyncio.create_task(self._pty_reader(entry)) + + async with self._pty_lock: + process_id = allocate_pty_process_id(self._reserved_pty_process_ids) + self._reserved_pty_process_ids.add(process_id) + pruned = self._prune_pty_sessions_if_needed() + self._pty_sessions[process_id] = entry + process_count = len(self._pty_sessions) + registered = True + except TimeoutError as e: + if not registered: + await self._terminate_pty_entry(entry) + raise ExecTimeoutError(command=command, timeout_s=timeout, cause=e) from e + except ExecTimeoutError: + raise + except Exception as e: + if not registered: + await self._terminate_pty_entry(entry) + raise ExecTransportError( + command=command, + context={"backend": "vercel", "sandbox_id": self.state.sandbox_id}, + cause=e, + ) from e + + if pruned is not None: + await self._terminate_pty_entry(pruned) + + if process_count >= PTY_PROCESSES_WARNING: + logger.warning( + "PTY process count reached warning threshold: %s active sessions", + process_count, + ) + + yield_time_ms = 10_000 if yield_time_s is None else int(yield_time_s * 1000) + output, original_token_count = await self._collect_pty_output( + entry=entry, + yield_time_ms=clamp_pty_yield_time_ms(yield_time_ms), + max_output_tokens=max_output_tokens, + ) + return await self._finalize_pty_update( + process_id=process_id, + entry=entry, + output=output, + original_token_count=original_token_count, + ) + + async def pty_write_stdin( + self, + *, + session_id: int, + chars: str, + yield_time_s: float | None = None, + max_output_tokens: int | None = None, + ) -> PtyExecUpdate: + async with self._pty_lock: + entry = self._resolve_pty_session_entry( + pty_processes=self._pty_sessions, + session_id=session_id, + ) + + try: + if chars: + await entry.session.stream.send(chars.encode("utf-8")) + except Exception as e: + raise ExecTransportError( + command=("pty_write_stdin",), + context={ + "backend": "vercel", + "sandbox_id": self.state.sandbox_id, + "session_id": session_id, + }, + cause=e, + ) from e + + yield_time_ms = 250 if yield_time_s is None else int(yield_time_s * 1000) + output, original_token_count = await self._collect_pty_output( + entry=entry, + yield_time_ms=resolve_pty_write_yield_time_ms( + yield_time_ms=yield_time_ms, + input_empty=chars == "", + ), + max_output_tokens=max_output_tokens, + ) + entry.last_used = time.monotonic() + return await self._finalize_pty_update( + process_id=session_id, + entry=entry, + output=output, + original_token_count=original_token_count, + ) + + async def pty_terminate_all(self) -> None: + async with self._pty_lock: + entries = list(self._pty_sessions.values()) + self._pty_sessions.clear() + self._reserved_pty_process_ids.clear() + for entry in entries: + await self._terminate_pty_entry(entry) + + async def _pty_reader(self, entry: _VercelPtySessionEntry) -> None: + """Stream PTY output into the session buffer and capture process exit.""" + try: + async for data in entry.session.stream: + if not data: + continue + async with entry.output_lock: + entry.output_chunks.append(data) + entry.output_notify.set() + except Exception as e: + logger.debug("PTY reader output stream terminated with error: %s", e) + try: + finished = await entry.session.command.wait() + entry.exit_code = finished.exit_code + except Exception as e: + logger.debug("PTY reader terminated with error: %s", e) + finally: + entry.done = True + entry.output_notify.set() + + async def _collect_pty_output( + self, + *, + entry: _VercelPtySessionEntry, + yield_time_ms: int, + max_output_tokens: int | None, + ) -> tuple[bytes, int | None]: + deadline = time.monotonic() + (yield_time_ms / 1000) + output = bytearray() + + while True: + async with entry.output_lock: + while entry.output_chunks: + output.extend(entry.output_chunks.popleft()) + + if time.monotonic() >= deadline: + break + if entry.done: + async with entry.output_lock: + while entry.output_chunks: + output.extend(entry.output_chunks.popleft()) + break + + remaining_s = deadline - time.monotonic() + if remaining_s <= 0: + break + try: + await asyncio.wait_for(entry.output_notify.wait(), timeout=remaining_s) + except asyncio.TimeoutError: + break + entry.output_notify.clear() + + text = output.decode("utf-8", errors="replace") + truncated, original_token_count = truncate_text_by_tokens(text, max_output_tokens) + return truncated.encode("utf-8", errors="replace"), original_token_count + + async def _finalize_pty_update( + self, + *, + process_id: int, + entry: _VercelPtySessionEntry, + output: bytes, + original_token_count: int | None, + ) -> PtyExecUpdate: + exit_code = entry.exit_code if entry.done else None + live_process_id: int | None = process_id + + if entry.done: + async with self._pty_lock: + removed = self._pty_sessions.pop(process_id, None) + self._reserved_pty_process_ids.discard(process_id) + if removed is not None: + await self._terminate_pty_entry(removed) + live_process_id = None + + return PtyExecUpdate( + process_id=live_process_id, + output=output, + exit_code=exit_code, + original_token_count=original_token_count, + ) + + def _prune_pty_sessions_if_needed(self) -> _VercelPtySessionEntry | None: + if len(self._pty_sessions) < PTY_PROCESSES_MAX: + return None + meta = [(pid, entry.last_used, entry.done) for pid, entry in self._pty_sessions.items()] + pid = process_id_to_prune_from_meta(meta) + if pid is None: + return None + self._reserved_pty_process_ids.discard(pid) + return self._pty_sessions.pop(pid, None) + + async def _terminate_pty_entry(self, entry: _VercelPtySessionEntry) -> None: + if entry.session is None: + return + try: + if entry.reader_task is not None and not entry.reader_task.done(): + entry.reader_task.cancel() + try: + await entry.reader_task + except (asyncio.CancelledError, Exception): + pass + await entry.session.close() + except Exception as e: + logger.debug("PTY entry termination error (non-fatal): %s", e) + async def _persist_with_ephemeral_mounts_removed( self, operation: Callable[[], Awaitable[io.IOBase]], diff --git a/tests/extensions/test_sandbox_vercel.py b/tests/extensions/test_sandbox_vercel.py index 55460823dc..4ebd0204bf 100644 --- a/tests/extensions/test_sandbox_vercel.py +++ b/tests/extensions/test_sandbox_vercel.py @@ -1,16 +1,19 @@ from __future__ import annotations +import asyncio import builtins import importlib import io import sys import tarfile import types +from dataclasses import dataclass from pathlib import Path from typing import Any, Literal, cast import httpx import pytest +from anyio import EndOfStream from pydantic import BaseModel, PrivateAttr from agents.sandbox import Manifest @@ -24,6 +27,8 @@ from agents.sandbox.snapshot import NoopSnapshot, SnapshotBase from agents.sandbox.types import User +_ASYNCIO = asyncio + class _FakeNetworkPolicyRule(BaseModel): pass @@ -90,6 +95,98 @@ async def stderr(self) -> str: return self._stderr +@dataclass +class _FakePtyCommandFinished: + exit_code: int + + +class _FakePtyCommand: + def __init__(self, *, exit_code: int = 0) -> None: + self.exit_code = exit_code + self.wait_calls = 0 + + async def wait(self) -> _FakePtyCommandFinished: + self.wait_calls += 1 + return _FakePtyCommandFinished(exit_code=self.exit_code) + + +class _FakeAsyncPTYSession: + open_calls: list[dict[str, object]] = [] + next_session: _FakeAsyncPTYSession | None = None + + def __init__( + self, + *, + chunks: list[bytes] | None = None, + exit_code: int = 0, + ) -> None: + self.command = _FakePtyCommand(exit_code=exit_code) + self.ready_calls = 0 + self.write_calls: list[bytes] = [] + self.close_calls = 0 + self._queue: _ASYNCIO.Queue[bytes | None] = _ASYNCIO.Queue() + self._closed = False + self.stream = _FakePtyStream(self) + for chunk in chunks or []: + self.feed_output(chunk) + if chunks is not None: + self.finish() + + @classmethod + def reset(cls) -> None: + cls.open_calls = [] + cls.next_session = None + + @classmethod + async def open( + cls, sandbox: object, command: list[str], **kwargs: object + ) -> _FakeAsyncPTYSession: + cls.open_calls.append({"sandbox": sandbox, "command": list(command), **kwargs}) + if cls.next_session is None: + cls.next_session = _FakeAsyncPTYSession() + return cls.next_session + + async def ready(self) -> None: + self.ready_calls += 1 + + def feed_output(self, chunk: bytes) -> None: + self._queue.put_nowait(chunk) + + def finish(self) -> None: + self._queue.put_nowait(None) + + async def close(self) -> None: + self.close_calls += 1 + self._closed = True + self.finish() + + +class _FakePtyStream: + def __init__(self, session: _FakeAsyncPTYSession) -> None: + self._session = session + + def __aiter__(self) -> _FakePtyStream: + return self + + async def __anext__(self) -> bytes: + try: + return await self.receive() + except EndOfStream: + raise StopAsyncIteration from None + + async def send(self, data: bytes) -> None: + self._session.write_calls.append(data) + + async def receive(self, max_bytes: int = 65536) -> bytes: + chunk = await self._session._queue.get() + if chunk is None: + raise EndOfStream + return chunk[:max_bytes] + + async def aclose(self) -> None: + await self._session.close() + + class _FakeClient: def __init__(self) -> None: self.closed = False @@ -344,9 +441,11 @@ async def restore_after_snapshot( def _load_vercel_module(monkeypatch: pytest.MonkeyPatch) -> Any: _FakeAsyncSandbox.reset() + _FakeAsyncPTYSession.reset() fake_vercel = types.ModuleType("vercel") fake_vercel_sandbox = cast(Any, types.ModuleType("vercel.sandbox")) + fake_vercel_sandbox_pty = cast(Any, types.ModuleType("vercel.sandbox.pty")) fake_vercel_sandbox.AsyncSandbox = _FakeAsyncSandbox fake_vercel_sandbox.NetworkPolicy = NetworkPolicy fake_vercel_sandbox.NetworkPolicyCustom = NetworkPolicyCustom @@ -355,9 +454,11 @@ def _load_vercel_module(monkeypatch: pytest.MonkeyPatch) -> Any: fake_vercel_sandbox.Resources = Resources fake_vercel_sandbox.SandboxStatus = types.SimpleNamespace(RUNNING="running") fake_vercel_sandbox.SnapshotSource = SnapshotSource + fake_vercel_sandbox_pty.AsyncPTYSession = _FakeAsyncPTYSession monkeypatch.setitem(sys.modules, "vercel", fake_vercel) monkeypatch.setitem(sys.modules, "vercel.sandbox", fake_vercel_sandbox) + monkeypatch.setitem(sys.modules, "vercel.sandbox.pty", fake_vercel_sandbox_pty) sys.modules.pop("agents.extensions.sandbox.vercel.sandbox", None) sys.modules.pop("agents.extensions.sandbox.vercel", None) @@ -376,7 +477,7 @@ def test_vercel_package_re_exports_backend_symbols(monkeypatch: pytest.MonkeyPat assert package_module.VercelSandboxSessionState is vercel_module.VercelSandboxSessionState -def test_vercel_supports_pty_is_disabled_until_provider_methods_exist( +def test_vercel_supports_pty_when_interactive_and_sdk_support_is_available( monkeypatch: pytest.MonkeyPatch, ) -> None: vercel_module = _load_vercel_module(monkeypatch) @@ -397,7 +498,75 @@ def test_vercel_supports_pty_is_disabled_until_provider_methods_exist( ) assert not vercel_module.VercelSandboxSession.from_state(noninteractive).supports_pty() - assert not vercel_module.VercelSandboxSession.from_state(interactive).supports_pty() + assert vercel_module.VercelSandboxSession.from_state(interactive).supports_pty() + + +@pytest.mark.asyncio +async def test_vercel_pty_exec_start_tracks_live_session(monkeypatch: pytest.MonkeyPatch) -> None: + vercel_module = _load_vercel_module(monkeypatch) + state = vercel_module.VercelSandboxSessionState( + session_id="00000000-0000-0000-0000-000000000201", + manifest=Manifest(root="/vercel/sandbox/project"), + snapshot=NoopSnapshot(id="snapshot"), + sandbox_id="sandbox-pty-live", + interactive=True, + ) + sandbox = _FakeAsyncSandbox(sandbox_id="sandbox-pty-live") + session = vercel_module.VercelSandboxSession.from_state(state, sandbox=sandbox) + _FakeAsyncPTYSession.next_session = _FakeAsyncPTYSession() + + update = await session.pty_exec_start("python", "-i", shell=False, yield_time_s=0.25) + + assert update.process_id is not None + assert update.exit_code is None + assert update.output == b"" + assert _FakeAsyncPTYSession.open_calls == [ + { + "sandbox": sandbox, + "command": ["python", "-i"], + "cwd": "/vercel/sandbox/project", + "_connection_timeout": 30.0, + } + ] + assert _FakeAsyncPTYSession.next_session.ready_calls == 1 + + await session.pty_terminate_all() + assert _FakeAsyncPTYSession.next_session.close_calls == 1 + + +@pytest.mark.asyncio +async def test_vercel_pty_write_collects_output_and_exit(monkeypatch: pytest.MonkeyPatch) -> None: + vercel_module = _load_vercel_module(monkeypatch) + state = vercel_module.VercelSandboxSessionState( + session_id="00000000-0000-0000-0000-000000000202", + manifest=Manifest(root="/vercel/sandbox/project"), + snapshot=NoopSnapshot(id="snapshot"), + sandbox_id="sandbox-pty-exit", + interactive=True, + ) + sandbox = _FakeAsyncSandbox(sandbox_id="sandbox-pty-exit") + session = vercel_module.VercelSandboxSession.from_state(state, sandbox=sandbox) + fake_pty = _FakeAsyncPTYSession() + _FakeAsyncPTYSession.next_session = fake_pty + + started = await session.pty_exec_start("python", "-i", shell=False, yield_time_s=0.25) + assert started.process_id is not None + + fake_pty.feed_output(b"hello from pty\n") + fake_pty.command.exit_code = 7 + fake_pty.finish() + + update = await session.pty_write_stdin( + session_id=cast(int, started.process_id), + chars="print('x')\n", + yield_time_s=0.25, + ) + + assert fake_pty.write_calls == [b"print('x')\n"] + assert update.process_id is None + assert update.exit_code == 7 + assert update.output == b"hello from pty\n" + assert fake_pty.close_calls == 1 @pytest.mark.asyncio diff --git a/uv.lock b/uv.lock index bf401d354a..a59ac90ba2 100644 --- a/uv.lock +++ b/uv.lock @@ -2562,7 +2562,7 @@ requires-dist = [ { name = "textual", marker = "extra == 'temporal'", specifier = ">=8.2.3,<8.3" }, { name = "types-requests", specifier = ">=2.0,<3" }, { name = "typing-extensions", specifier = ">=4.12.2,<5" }, - { name = "vercel", marker = "extra == 'vercel'", specifier = ">=0.5.6,<0.6" }, + { name = "vercel", marker = "extra == 'vercel'", specifier = ">=0.5.7,<0.6" }, { name = "websockets", specifier = ">=15.0,<16" }, { name = "websockets", marker = "extra == 'realtime'", specifier = ">=15.0,<16" }, { name = "websockets", marker = "extra == 'voice'", specifier = ">=15.0,<16" }, @@ -4203,7 +4203,7 @@ wheels = [ [[package]] name = "vercel" -version = "0.5.6" +version = "0.5.7" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -4215,9 +4215,9 @@ dependencies = [ { name = "vercel-workers", marker = "python_full_version >= '3.12'" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/73/2a/acf30370e110c839b198cdf08ccfbacc9e11db91fc5c0b185805b318232b/vercel-0.5.6.tar.gz", hash = "sha256:c5aacd81739ff22771f9c3bba6b764de1589e25fefce6ce5ded32261128f8710", size = 115452, upload-time = "2026-04-13T21:52:40.815Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d7/68/a671ebc656afbb5e25fb88c681b61511cc13670ea771c87b2f711782022b/vercel-0.5.7.tar.gz", hash = "sha256:8070ea1b33962adfed98498f9273f24ea2066a20c74d38643d479d8280801c6e", size = 118597, upload-time = "2026-04-15T17:58:20.424Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/bb/70/0bf6374905d8b7eccea8f33e67c8ec8b8ffcb5eb54c40fff52edbc976514/vercel-0.5.6-py3-none-any.whl", hash = "sha256:9f5f6c2f7bcec642809338bc1c507ea91b41b977ed3be16f4e24bd5065b8a1ee", size = 135164, upload-time = "2026-04-13T21:52:39.15Z" }, + { url = "https://files.pythonhosted.org/packages/c7/2e/bacf1ccc0ec95464a68398e64bf5e36f859cd51f3e379623f103802f85f1/vercel-0.5.7-py3-none-any.whl", hash = "sha256:90eb2689c34e403db2170fec3eb47e1a91092c200d91baf4b4501fb3e2a44d28", size = 139698, upload-time = "2026-04-15T17:58:18.945Z" }, ] [[package]]