Skip to content

Commit c4a2fcb

Browse files
authored
feat: add Responses websocket model and stream_ws example (#2530)
1 parent 02e8b03 commit c4a2fcb

17 files changed

+4025
-70
lines changed

examples/basic/stream_ws.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
"""Responses websocket streaming example with function tools, agent-as-tool, and approval.
2+
3+
This example shows a user-facing websocket workflow using
4+
`responses_websocket_session(...)`:
5+
- Streaming output (including reasoning summary deltas when available)
6+
- Regular function tools
7+
- An `Agent.as_tool(...)` specialist agent
8+
- HITL approval for a sensitive tool call
9+
- A follow-up turn using `previous_response_id` on the same trace
10+
11+
Required environment variable:
12+
- `OPENAI_API_KEY`
13+
14+
Optional environment variables:
15+
- `OPENAI_MODEL` (defaults to `gpt-5.2`)
16+
- `OPENAI_BASE_URL`
17+
- `OPENAI_WEBSOCKET_BASE_URL`
18+
- `EXAMPLES_INTERACTIVE_MODE=auto` (auto-approve HITL prompts for scripted runs)
19+
"""
20+
21+
import asyncio
22+
import os
23+
from typing import Any
24+
25+
from openai.types.shared import Reasoning
26+
27+
from agents import (
28+
Agent,
29+
ModelSettings,
30+
ResponsesWebSocketSession,
31+
function_tool,
32+
responses_websocket_session,
33+
trace,
34+
)
35+
from examples.auto_mode import confirm_with_fallback
36+
37+
38+
@function_tool
39+
def lookup_order(order_id: str) -> dict[str, Any]:
40+
"""Return deterministic order data for the demo."""
41+
orders = {
42+
"ORD-1001": {
43+
"order_id": "ORD-1001",
44+
"status": "delivered",
45+
"delivered_days_ago": 3,
46+
"amount": 49.99,
47+
"currency": "USD",
48+
"item": "Wireless Mouse",
49+
},
50+
"ORD-2002": {
51+
"order_id": "ORD-2002",
52+
"status": "delivered",
53+
"delivered_days_ago": 12,
54+
"amount": 129.0,
55+
"currency": "USD",
56+
"item": "Keyboard",
57+
},
58+
}
59+
return orders.get(
60+
order_id,
61+
{
62+
"order_id": order_id,
63+
"status": "unknown",
64+
"delivered_days_ago": 999,
65+
"amount": 0.0,
66+
"currency": "USD",
67+
"item": "unknown",
68+
},
69+
)
70+
71+
72+
@function_tool(needs_approval=True)
73+
def submit_refund(order_id: str, amount: float, reason: str) -> dict[str, Any]:
74+
"""Create a refund request. This tool requires approval."""
75+
ticket = "RF-1001" if order_id == "ORD-1001" else f"RF-{order_id[-4:]}"
76+
return {
77+
"refund_ticket": ticket,
78+
"order_id": order_id,
79+
"amount": amount,
80+
"reason": reason,
81+
"status": "approved_pending_processing",
82+
}
83+
84+
85+
def ask_approval(question: str) -> bool:
86+
"""Prompt for approval (or auto-approve in examples auto mode)."""
87+
return confirm_with_fallback(f"[approval] {question} [y/N]: ", default=True)
88+
89+
90+
async def run_streamed_turn(
91+
ws: ResponsesWebSocketSession,
92+
agent: Agent[Any],
93+
prompt: str,
94+
*,
95+
previous_response_id: str | None = None,
96+
) -> tuple[str, str]:
97+
"""Run one streamed turn and handle HITL approvals if needed."""
98+
print(f"\nUser: {prompt}\n")
99+
100+
result = ws.run_streamed(
101+
agent,
102+
prompt,
103+
previous_response_id=previous_response_id,
104+
)
105+
printed_reasoning = False
106+
printed_output = False
107+
108+
while True:
109+
async for event in result.stream_events():
110+
if event.type == "raw_response_event":
111+
raw = event.data
112+
if raw.type == "response.reasoning_summary_text.delta":
113+
if not printed_reasoning:
114+
print("Reasoning:")
115+
printed_reasoning = True
116+
print(raw.delta, end="", flush=True)
117+
elif raw.type == "response.output_text.delta":
118+
if printed_reasoning and not printed_output:
119+
print("\n")
120+
if not printed_output:
121+
print("Assistant:")
122+
printed_output = True
123+
print(raw.delta, end="", flush=True)
124+
continue
125+
126+
if event.type != "run_item_stream_event":
127+
continue
128+
129+
item = event.item
130+
if item.type == "tool_call_item":
131+
tool_name = getattr(item.raw_item, "name", "unknown")
132+
tool_args = getattr(item.raw_item, "arguments", "")
133+
print(f"\n[tool call] {tool_name}({tool_args})")
134+
elif item.type == "tool_call_output_item":
135+
print(f"[tool result] {item.output}")
136+
137+
if printed_reasoning or printed_output:
138+
print("\n")
139+
140+
if not result.interruptions:
141+
break
142+
143+
state = result.to_state()
144+
for interruption in result.interruptions:
145+
question = f"Approve {interruption.name} with args {interruption.arguments}?"
146+
if ask_approval(question):
147+
state.approve(interruption)
148+
else:
149+
state.reject(interruption)
150+
151+
result = ws.run_streamed(agent, state)
152+
153+
if result.last_response_id is None:
154+
raise RuntimeError("The streamed run completed without a response_id.")
155+
156+
final_output = str(result.final_output)
157+
print(f"response_id: {result.last_response_id}")
158+
print(f"final_output: {final_output}\n")
159+
return result.last_response_id, final_output
160+
161+
162+
async def main() -> None:
163+
model_name = os.getenv("OPENAI_MODEL", "gpt-5.2-codex")
164+
policy_agent = Agent(
165+
name="RefundPolicySpecialist",
166+
instructions=(
167+
"You are a refund policy specialist. The policy is simple: orders delivered "
168+
"within 7 days are eligible for a full refund, and older delivered orders "
169+
"are not. Return a short answer with eligibility and a one-line reason."
170+
),
171+
model=model_name,
172+
model_settings=ModelSettings(max_tokens=120),
173+
)
174+
175+
support_agent = Agent(
176+
name="SupportAgent",
177+
instructions=(
178+
"You are a support agent. For refund requests, do this in order: "
179+
"1) call lookup_order, 2) call refund_policy_specialist, 3) if the user "
180+
"asked to proceed and the order is eligible, call submit_refund. "
181+
"When asked for only the refund ticket, return only the ticket token "
182+
"(for example RF-1001)."
183+
),
184+
tools=[
185+
lookup_order,
186+
policy_agent.as_tool(
187+
tool_name="refund_policy_specialist",
188+
tool_description="Check refund eligibility and explain the policy decision.",
189+
),
190+
submit_refund,
191+
],
192+
model=model_name,
193+
model_settings=ModelSettings(
194+
max_tokens=200,
195+
reasoning=Reasoning(effort="medium", summary="detailed"),
196+
),
197+
)
198+
199+
try:
200+
# You can skip this helper and call Runner.run_streamed(...) directly.
201+
# It will still work, but each run will create/connect again unless you manually
202+
# reuse the same RunConfig/provider. This helper makes that reuse easy across turns
203+
# (and nested agent-as-tool runs) so the websocket connection can stay warm.
204+
async with responses_websocket_session() as ws:
205+
with trace("Responses WS support example") as current_trace:
206+
print(f"Using model={model_name}")
207+
print(f"trace_id={current_trace.trace_id}")
208+
209+
first_response_id, _ = await run_streamed_turn(
210+
ws,
211+
support_agent,
212+
(
213+
"Customer wants a refund for order ORD-1001 because the mouse arrived "
214+
"damaged. Please check the order, ask the refund policy specialist, and "
215+
"if it is eligible submit the refund. Reply with only the refund ticket."
216+
),
217+
)
218+
219+
await run_streamed_turn(
220+
ws,
221+
support_agent,
222+
"What refund ticket did you just create? Reply with only the ticket.",
223+
previous_response_id=first_response_id,
224+
)
225+
except RuntimeError as exc:
226+
if "closed before any response events" in str(exc):
227+
print(
228+
"\nWebsocket mode closed before sending events. This usually means the "
229+
"feature is not enabled for this account/model yet."
230+
)
231+
return
232+
raise
233+
234+
235+
if __name__ == "__main__":
236+
asyncio.run(main())

src/agents/__init__.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@
8282
from .models.multi_provider import MultiProvider
8383
from .models.openai_chatcompletions import OpenAIChatCompletionsModel
8484
from .models.openai_provider import OpenAIProvider
85-
from .models.openai_responses import OpenAIResponsesModel
85+
from .models.openai_responses import OpenAIResponsesModel, OpenAIResponsesWSModel
8686
from .prompts import DynamicPromptFunction, GenerateDynamicPromptData, Prompt
8787
from .repl import run_demo_loop
88+
from .responses_websocket_session import ResponsesWebSocketSession, responses_websocket_session
8889
from .result import RunResult, RunResultStreaming
8990
from .run import (
9091
ReasoningItemIdPolicy,
@@ -246,6 +247,15 @@ def set_default_openai_api(api: Literal["chat_completions", "responses"]) -> Non
246247
_config.set_default_openai_api(api)
247248

248249

250+
def set_default_openai_responses_transport(transport: Literal["http", "websocket"]) -> None:
251+
"""Set the default transport for OpenAI Responses API requests.
252+
253+
By default, the Responses API uses the HTTP transport. Set this to ``"websocket"`` to use
254+
websocket transport when the OpenAI provider resolves a Responses model.
255+
"""
256+
_config.set_default_openai_responses_transport(transport)
257+
258+
249259
def enable_verbose_stdout_logging():
250260
"""Enables verbose logging to stdout. This is useful for debugging."""
251261
logger = logging.getLogger("openai.agents")
@@ -276,6 +286,7 @@ def enable_verbose_stdout_logging():
276286
"MultiProvider",
277287
"OpenAIProvider",
278288
"OpenAIResponsesModel",
289+
"OpenAIResponsesWSModel",
279290
"AgentOutputSchema",
280291
"AgentOutputSchemaBase",
281292
"Computer",
@@ -350,6 +361,7 @@ def enable_verbose_stdout_logging():
350361
"RunErrorHandlers",
351362
"RunResult",
352363
"RunResultStreaming",
364+
"ResponsesWebSocketSession",
353365
"RunConfig",
354366
"ReasoningItemIdPolicy",
355367
"ToolErrorFormatter",
@@ -446,6 +458,8 @@ def enable_verbose_stdout_logging():
446458
"set_default_openai_key",
447459
"set_default_openai_client",
448460
"set_default_openai_api",
461+
"set_default_openai_responses_transport",
462+
"responses_websocket_session",
449463
"set_tracing_export_api_key",
450464
"enable_verbose_stdout_logging",
451465
"gen_trace_id",

src/agents/_config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,11 @@ def set_default_openai_api(api: Literal["chat_completions", "responses"]) -> Non
2424
_openai_shared.set_use_responses_by_default(False)
2525
else:
2626
_openai_shared.set_use_responses_by_default(True)
27+
28+
29+
def set_default_openai_responses_transport(transport: Literal["http", "websocket"]) -> None:
30+
if transport not in {"http", "websocket"}:
31+
raise ValueError(
32+
"Invalid OpenAI Responses transport. Expected one of: 'http', 'websocket'."
33+
)
34+
_openai_shared.set_default_openai_responses_transport(transport)

src/agents/models/_openai_shared.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
from __future__ import annotations
22

3+
from typing import Literal
4+
35
from openai import AsyncOpenAI
46

7+
OpenAIResponsesTransport = Literal["http", "websocket"]
8+
59
_default_openai_key: str | None = None
610
_default_openai_client: AsyncOpenAI | None = None
711
_use_responses_by_default: bool = True
12+
# Source of truth for the default Responses transport.
13+
_default_openai_responses_transport: OpenAIResponsesTransport = "http"
14+
# Backward-compatibility shim for internal code/tests that still mutate the legacy flag directly.
15+
_use_responses_websocket_by_default: bool = False
816

917

1018
def set_default_openai_key(key: str) -> None:
@@ -32,3 +40,29 @@ def set_use_responses_by_default(use_responses: bool) -> None:
3240

3341
def get_use_responses_by_default() -> bool:
3442
return _use_responses_by_default
43+
44+
45+
def set_use_responses_websocket_by_default(use_responses_websocket: bool) -> None:
46+
set_default_openai_responses_transport("websocket" if use_responses_websocket else "http")
47+
48+
49+
def get_use_responses_websocket_by_default() -> bool:
50+
return get_default_openai_responses_transport() == "websocket"
51+
52+
53+
def set_default_openai_responses_transport(transport: OpenAIResponsesTransport) -> None:
54+
global _default_openai_responses_transport
55+
global _use_responses_websocket_by_default
56+
_default_openai_responses_transport = transport
57+
_use_responses_websocket_by_default = transport == "websocket"
58+
59+
60+
def get_default_openai_responses_transport() -> OpenAIResponsesTransport:
61+
global _default_openai_responses_transport
62+
# Respect direct writes to the legacy private flag (used in tests) by syncing on read.
63+
legacy_transport: OpenAIResponsesTransport = (
64+
"websocket" if _use_responses_websocket_by_default else "http"
65+
)
66+
if _default_openai_responses_transport != legacy_transport:
67+
_default_openai_responses_transport = legacy_transport
68+
return _default_openai_responses_transport

src/agents/models/interface.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ def include_data(self) -> bool:
3636
class Model(abc.ABC):
3737
"""The base interface for calling an LLM."""
3838

39+
async def close(self) -> None:
40+
"""Release any resources held by the model.
41+
42+
Models that maintain persistent connections can override this. The default implementation
43+
is a no-op.
44+
"""
45+
return None
46+
3947
@abc.abstractmethod
4048
async def get_response(
4149
self,
@@ -123,3 +131,11 @@ def get_model(self, model_name: str | None) -> Model:
123131
Returns:
124132
The model.
125133
"""
134+
135+
async def aclose(self) -> None:
136+
"""Release any resources held by the provider.
137+
138+
Providers that cache persistent models or network connections can override this. The
139+
default implementation is a no-op.
140+
"""
141+
return None

0 commit comments

Comments
 (0)