Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
ToolCallOutputItem,
TResponseInputItem,
)
from .lifecycle import AgentHooks, RunHooks
from .lifecycle import AgentHooks, AgentHooksBase, RunHooks, RunHooksBase, TurnControl
from .memory import (
OpenAIConversationsSession,
OpenAIResponsesCompactionArgs,
Expand Down Expand Up @@ -361,7 +361,10 @@ def enable_verbose_stdout_logging():
"ReasoningItem",
"ItemHelpers",
"RunHooks",
"RunHooksBase",
"AgentHooks",
"AgentHooksBase",
"TurnControl",
"Session",
"SessionABC",
"SessionSettings",
Expand Down
102 changes: 101 additions & 1 deletion src/agents/lifecycle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Any, Generic, Optional
from __future__ import annotations

from typing import Any, Generic, Literal, Optional, Union

from typing_extensions import TypeVar

Expand All @@ -9,10 +11,26 @@

TAgent = TypeVar("TAgent", bound=AgentBase, default=AgentBase)

TurnControl = Literal["continue", "stop"]
"""Return value for :meth:`RunHooksBase.on_turn_start` / :meth:`AgentHooksBase.on_turn_start`.

* ``"continue"`` (default / ``None``) – proceed with the turn as normal.
* ``"stop"`` – abort the run gracefully after this hook returns, exactly as if
``max_turns`` had been reached. The model is **not** called for this turn and
:meth:`on_turn_end` is **not** fired.
"""


class RunHooksBase(Generic[TContext, TAgent]):
"""A class that receives callbacks on various lifecycle events in an agent run. Subclass and
override the methods you need.

Turn-lifecycle hooks
--------------------
:meth:`on_turn_start` and :meth:`on_turn_end` fire once per iteration of the
agent loop. :meth:`on_turn_start` may return ``"stop"`` to halt the run
gracefully before the LLM is called for that turn (useful for implementing
custom turn-budget logic, external kill-switches, etc.).
"""

async def on_llm_start(
Expand Down Expand Up @@ -86,12 +104,57 @@ async def on_tool_end(
"""Called immediately after a local tool is invoked."""
pass

async def on_turn_start(
self,
context: RunContextWrapper[TContext],
agent: TAgent,
turn_number: int,
) -> Union[TurnControl, None]:
"""Called at the start of each agent turn, before the LLM is invoked.

Returning ``"stop"`` (or raising :class:`StopAgentRun`) will halt the run
gracefully — the model is **not** called for this turn and
:meth:`on_turn_end` is **not** fired. Returning ``None`` or ``"continue"``
proceeds normally.

Args:
context: The run context wrapper.
agent: The current agent.
turn_number: The 1-indexed turn number (increments each time through the
agent loop).

Returns:
``None`` / ``"continue"`` to proceed, or ``"stop"`` to halt the run.
"""
return None

async def on_turn_end(
self,
context: RunContextWrapper[TContext],
agent: TAgent,
turn_number: int,
) -> None:
"""Called at the end of each agent turn, after all tool calls for that turn complete.

Args:
context: The run context wrapper.
agent: The current agent.
turn_number: The 1-indexed turn number.
"""
pass


class AgentHooksBase(Generic[TContext, TAgent]):
"""A class that receives callbacks on various lifecycle events for a specific agent. You can
set this on `agent.hooks` to receive events for that specific agent.

Subclass and override the methods you need.

Turn-lifecycle hooks
--------------------
:meth:`on_turn_start` and :meth:`on_turn_end` fire once per iteration of the
agent loop. :meth:`on_turn_start` may return ``"stop"`` to halt the run
gracefully before the LLM is called for that turn.
"""

async def on_start(self, context: AgentHookContext[TContext], agent: TAgent) -> None:
Expand Down Expand Up @@ -148,6 +211,43 @@ async def on_tool_end(
"""Called immediately after a local tool is invoked."""
pass

async def on_turn_start(
self,
context: RunContextWrapper[TContext],
agent: TAgent,
turn_number: int,
) -> Union[TurnControl, None]:
"""Called at the start of each agent turn, before the LLM is invoked.

Returning ``"stop"`` halts the run gracefully before the model is called.
Returning ``None`` or ``"continue"`` proceeds normally.

Args:
context: The run context wrapper.
agent: The current agent.
turn_number: The 1-indexed turn number (increments each time through the
agent loop).

Returns:
``None`` / ``"continue"`` to proceed, or ``"stop"`` to halt the run.
"""
return None

async def on_turn_end(
self,
context: RunContextWrapper[TContext],
agent: TAgent,
turn_number: int,
) -> None:
"""Called at the end of each agent turn, after all tool calls for that turn complete.

Args:
context: The run context wrapper.
agent: The current agent.
turn_number: The 1-indexed turn number.
"""
pass

async def on_llm_start(
self,
context: RunContextWrapper[TContext],
Expand Down
32 changes: 31 additions & 1 deletion src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
from .tracing import Span, SpanError, agent_span, get_current_trace
from .tracing.context import TraceCtxManager, create_trace_for_run
from .tracing.span_data import AgentSpanData
from .util import _error_tracing
from .util import _coro, _error_tracing

DEFAULT_AGENT_RUNNER: AgentRunner = None # type: ignore
# the value is set at the end of the module
Expand Down Expand Up @@ -968,6 +968,25 @@ def _with_reasoning_item_id_policy(result: RunResult) -> RunResult:

logger.debug("Running agent %s (turn %s)", current_agent.name, current_turn)

run_hook_control, agent_hook_control = await asyncio.gather(
hooks.on_turn_start(context_wrapper, current_agent, current_turn),
(
current_agent.hooks.on_turn_start(
context_wrapper, current_agent, current_turn
)
if current_agent.hooks
else _coro.noop_coroutine()
),
)
if run_hook_control == "stop" or agent_hook_control == "stop":
logger.debug(
"Turn %s: on_turn_start hook requested stop; halting run.",
current_turn,
)
raise MaxTurnsExceeded(
f"Run halted by on_turn_start hook at turn {current_turn}"
)

if session_persistence_enabled:
try:
last_saved_input_snapshot_for_rewind = (
Expand Down Expand Up @@ -1093,6 +1112,17 @@ def _with_reasoning_item_id_policy(result: RunResult) -> RunResult:
last_saved_input_snapshot_for_rewind = None
should_run_agent_start_hooks = False

await asyncio.gather(
hooks.on_turn_end(context_wrapper, current_agent, current_turn),
(
current_agent.hooks.on_turn_end(
context_wrapper, current_agent, current_turn
Comment on lines +1115 to +1119
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Defer turn-end hook until interrupted turn actually completes

In the sync runner, on_turn_end is fired immediately after run_single_turn returns, even when next_step is NextStepInterruption (tool approval pending). In that path the turn has not actually finished—resolve_interrupted_turn continues the same turn later—so this reports a false turn completion and the hook is never re-fired at the real end. Any hook that compacts state, records per-turn metrics, or enforces turn-level cancellation will run too early for approval-based workflows.

Useful? React with 👍 / 👎.

)
if current_agent.hooks
else _coro.noop_coroutine()
),
)

model_responses.append(turn_result.model_response)
original_input = turn_result.original_input
# For model input, use new_step_items (filtered on handoffs).
Expand Down
34 changes: 34 additions & 0 deletions src/agents/run_internal/run_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,29 @@ async def _save_stream_items_without_count(
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
break

run_hook_control, agent_hook_control = await asyncio.gather(
hooks.on_turn_start(context_wrapper, current_agent, current_turn),
(
current_agent.hooks.on_turn_start(
context_wrapper, current_agent, current_turn
)
if current_agent.hooks
else _coro.noop_coroutine()
),
)
if run_hook_control == "stop" or agent_hook_control == "stop":
logger.debug(
"Turn %s: on_turn_start hook requested stop; halting run.",
current_turn,
)
streamed_result._max_turns_handled = True
streamed_result.current_turn = current_turn
if run_state is not None:
run_state._current_turn = current_turn
run_state._current_step = None
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
break

if current_turn == 1:
all_input_guardrails = starting_agent.input_guardrails + (
run_config.input_guardrails or []
Expand Down Expand Up @@ -909,6 +932,17 @@ async def _save_stream_items_without_count(
tool_use_tracker
)

await asyncio.gather(
hooks.on_turn_end(context_wrapper, current_agent, current_turn),
(
current_agent.hooks.on_turn_end(
context_wrapper, current_agent, current_turn
Comment on lines +935 to +939
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid emitting turn-end for paused streaming turns

The streaming loop has the same premature on_turn_end call before branching on turn_result.next_step. If the turn pauses with NextStepInterruption, the hook still fires even though approvals and remaining tool execution happen only after resume, so observers get an incorrect "turn complete" signal and never receive a true completion callback for that turn.

Useful? React with 👍 / 👎.

)
if current_agent.hooks
else _coro.noop_coroutine()
),
)

streamed_result.raw_responses = streamed_result.raw_responses + [
turn_result.model_response
]
Expand Down
Loading