-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Expand file tree
/
Copy pathtest_reasoning_delta_stream_event.py
More file actions
147 lines (111 loc) · 4.9 KB
/
test_reasoning_delta_stream_event.py
File metadata and controls
147 lines (111 loc) · 4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Tests for ReasoningDeltaEvent stream event (issue #825)."""
from __future__ import annotations
import pytest
from agents import Agent, Runner
from agents.stream_events import ReasoningDeltaEvent, RawResponsesStreamEvent
from openai.types.responses.response_reasoning_item import ResponseReasoningItem, Summary
from .fake_model import FakeModel
from .test_responses import get_text_message
def _make_reasoning_item(text: str) -> ResponseReasoningItem:
return ResponseReasoningItem(
id="rs_test",
type="reasoning",
summary=[Summary(text=text, type="summary_text")],
)
@pytest.mark.asyncio
async def test_reasoning_delta_event_emitted_during_streaming() -> None:
"""ReasoningDeltaEvent is emitted when the model streams a reasoning summary delta."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("Let me think..."),
get_text_message("Answer"),
])
agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")
reasoning_deltas: list[ReasoningDeltaEvent] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
reasoning_deltas.append(event)
assert len(reasoning_deltas) >= 1
assert all(isinstance(e.delta, str) for e in reasoning_deltas)
assert all(isinstance(e.snapshot, str) for e in reasoning_deltas)
assert all(e.type == "reasoning_delta" for e in reasoning_deltas)
@pytest.mark.asyncio
async def test_reasoning_delta_snapshot_accumulates() -> None:
"""The snapshot field grows monotonically across delta events."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("Hello world"),
get_text_message("done"),
])
agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")
snapshots: list[str] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
snapshots.append(event.snapshot)
# Each snapshot must be at least as long as the previous one
for i in range(1, len(snapshots)):
assert len(snapshots[i]) >= len(snapshots[i - 1])
# Last snapshot must contain the full reasoning text
if snapshots:
assert "Hello world" in snapshots[-1]
@pytest.mark.asyncio
async def test_no_reasoning_delta_event_without_reasoning() -> None:
"""ReasoningDeltaEvent is not emitted when there is no reasoning in the response."""
model = FakeModel()
model.set_next_output([get_text_message("plain text answer")])
agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")
async for event in result.stream_events():
assert not isinstance(event, ReasoningDeltaEvent), (
"Got unexpected ReasoningDeltaEvent for a plain text response"
)
@pytest.mark.asyncio
async def test_reasoning_delta_event_type_field() -> None:
"""ReasoningDeltaEvent.type is always 'reasoning_delta'."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("some reasoning"),
get_text_message("answer"),
])
agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")
found = False
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
assert event.type == "reasoning_delta"
found = True
break
assert found, "Expected at least one ReasoningDeltaEvent but none were emitted"
@pytest.mark.asyncio
async def test_raw_response_events_still_emitted_alongside_reasoning_delta() -> None:
"""RawResponsesStreamEvent is still emitted even when ReasoningDeltaEvent is also emitted."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("thinking"),
get_text_message("result"),
])
agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")
raw_events: list[RawResponsesStreamEvent] = []
reasoning_events: list[ReasoningDeltaEvent] = []
async for event in result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
raw_events.append(event)
elif isinstance(event, ReasoningDeltaEvent):
reasoning_events.append(event)
# Both types should be present
assert len(raw_events) > 0
assert len(reasoning_events) > 0
@pytest.mark.asyncio
async def test_reasoning_delta_event_importable_from_agents() -> None:
"""ReasoningDeltaEvent can be imported directly from the agents package."""
from agents import ReasoningDeltaEvent as RDE
assert RDE is ReasoningDeltaEvent
def test_reasoning_delta_event_dataclass() -> None:
"""ReasoningDeltaEvent is a proper dataclass with expected fields."""
event = ReasoningDeltaEvent(delta="chunk", snapshot="full chunk")
assert event.delta == "chunk"
assert event.snapshot == "full chunk"
assert event.type == "reasoning_delta"