-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Expand file tree
/
Copy patherror_handlers.py
More file actions
204 lines (188 loc) · 6.93 KB
/
error_handlers.py
File metadata and controls
204 lines (188 loc) · 6.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from __future__ import annotations
import inspect
import json
from typing import Any
from openai.types.responses import ResponseOutputMessage, ResponseOutputText
from ..agent import Agent
from ..agent_output import _WRAPPER_DICT_KEY, AgentOutputSchema
from ..exceptions import MaxTurnsExceeded, ModelBehaviorError, UserError
from ..items import (
ItemHelpers,
MessageOutputItem,
ModelResponse,
RunItem,
TResponseInputItem,
)
from ..models.fake_id import FAKE_RESPONSES_ID
from ..run_context import RunContextWrapper, TContext
from ..run_error_handlers import (
RunErrorData,
RunErrorHandlerInput,
RunErrorHandlerResult,
RunErrorHandlers,
ToolNotFoundAction,
ToolNotFoundErrorHandlerInput,
)
from .items import ReasoningItemIdPolicy, run_item_to_input_item
from .turn_preparation import get_output_schema
def build_run_error_data(
*,
input: str | list[TResponseInputItem],
new_items: list[RunItem],
raw_responses: list[ModelResponse],
last_agent: Agent[Any],
reasoning_item_id_policy: ReasoningItemIdPolicy | None = None,
) -> RunErrorData:
history = ItemHelpers.input_to_new_input_list(input)
output = []
for item in new_items:
converted = run_item_to_input_item(item, reasoning_item_id_policy)
if converted is None:
continue
output.append(converted)
history = history + list(output)
return RunErrorData(
input=input,
new_items=list(new_items),
history=history,
output=output,
raw_responses=list(raw_responses),
last_agent=last_agent,
)
def format_final_output_text(agent: Agent[Any], final_output: Any) -> str:
output_schema = get_output_schema(agent)
if output_schema is None or output_schema.is_plain_text():
return str(final_output)
payload_value = final_output
if isinstance(output_schema, AgentOutputSchema) and output_schema._is_wrapped:
if isinstance(final_output, dict) and _WRAPPER_DICT_KEY in final_output:
payload_value = final_output
else:
payload_value = {_WRAPPER_DICT_KEY: final_output}
try:
if isinstance(output_schema, AgentOutputSchema):
payload_bytes = output_schema._type_adapter.dump_json(payload_value)
return (
payload_bytes.decode()
if isinstance(payload_bytes, bytes | bytearray)
else str(payload_bytes)
)
return json.dumps(payload_value, ensure_ascii=False)
except (TypeError, ValueError):
return str(final_output)
def validate_handler_final_output(agent: Agent[Any], final_output: Any) -> Any:
output_schema = get_output_schema(agent)
if output_schema is None or output_schema.is_plain_text():
return final_output
payload_value = final_output
if isinstance(output_schema, AgentOutputSchema) and output_schema._is_wrapped:
if isinstance(final_output, dict) and _WRAPPER_DICT_KEY in final_output:
payload_value = final_output
else:
payload_value = {_WRAPPER_DICT_KEY: final_output}
try:
if isinstance(output_schema, AgentOutputSchema):
payload_bytes = output_schema._type_adapter.dump_json(payload_value)
payload = (
payload_bytes.decode()
if isinstance(payload_bytes, bytes | bytearray)
else str(payload_bytes)
)
else:
payload = json.dumps(payload_value, ensure_ascii=False)
except TypeError as exc:
raise UserError("Invalid run error handler final_output for structured output.") from exc
except ValueError as exc:
raise UserError("Invalid run error handler final_output for structured output.") from exc
try:
return output_schema.validate_json(payload)
except ModelBehaviorError as exc:
raise UserError("Invalid run error handler final_output for structured output.") from exc
def create_message_output_item(agent: Agent[Any], output_text: str) -> MessageOutputItem:
message = ResponseOutputMessage(
id=FAKE_RESPONSES_ID,
type="message",
role="assistant",
content=[
ResponseOutputText(
text=output_text,
type="output_text",
annotations=[],
logprobs=[],
)
],
status="completed",
)
return MessageOutputItem(raw_item=message, agent=agent)
async def resolve_run_error_handler_result(
*,
error_handlers: RunErrorHandlers[TContext] | None,
error: MaxTurnsExceeded,
context_wrapper: RunContextWrapper[TContext],
run_data: RunErrorData,
) -> RunErrorHandlerResult | None:
if not error_handlers:
return None
handler = error_handlers.get("max_turns")
if handler is None:
return None
handler_input = RunErrorHandlerInput(
error=error,
context=context_wrapper,
run_data=run_data,
)
result = handler(handler_input)
if inspect.isawaitable(result):
result = await result
if result is None:
return None
if isinstance(result, RunErrorHandlerResult):
return result
if isinstance(result, dict):
if "final_output" in result:
allowed_keys = {"final_output", "include_in_history"}
extra_keys = set(result.keys()) - allowed_keys
if extra_keys:
raise UserError("Invalid run error handler result.")
try:
return RunErrorHandlerResult(**result)
except TypeError as exc:
raise UserError("Invalid run error handler result.") from exc
return RunErrorHandlerResult(final_output=result)
return RunErrorHandlerResult(final_output=result)
async def resolve_tool_not_found_action(
*,
error_handlers: RunErrorHandlers[TContext] | None,
tool_name: str,
available_tools: list[str],
agent: Agent[Any],
context_wrapper: RunContextWrapper[TContext],
run_data: RunErrorData,
) -> ToolNotFoundAction | None:
"""Invoke the ``tool_not_found`` handler (if configured) and normalize its return value.
Returns a :class:`ToolNotFoundAction` when the handler asks the runner to recover, or
``None`` when no handler is registered or the handler opts to re-raise.
"""
if not error_handlers:
return None
handler = error_handlers.get("tool_not_found")
if handler is None:
return None
handler_input = ToolNotFoundErrorHandlerInput(
tool_name=tool_name,
available_tools=available_tools,
agent=agent,
context=context_wrapper,
run_data=run_data,
)
result: Any = handler(handler_input)
if inspect.isawaitable(result):
result = await result
if result is None:
return None
if isinstance(result, ToolNotFoundAction):
return result
raise UserError(
"tool_not_found handler must return ToolNotFoundAction or None, "
f"got {type(result).__name__}."
)