Streaming lets you subscribe to updates of the agent run as it proceeds. This can be useful for showing the end-user progress updates and partial responses.
To stream, you can call [Runner.run_streamed()][agents.run.Runner.run_streamed], which will give you a [RunResultStreaming][agents.result.RunResultStreaming]. Calling result.stream_events() gives you an async stream of [StreamEvent][agents.stream_events.StreamEvent] objects, which are described below.
Keep consuming result.stream_events() until the async iterator finishes. A streaming run is not complete until the iterator ends, and post-processing such as session persistence, approval bookkeeping, or history compaction can finish after the last visible token arrives. When the loop exits, result.is_complete reflects the final run state.
[RawResponsesStreamEvent][agents.stream_events.RawResponsesStreamEvent] are raw events passed directly from the LLM. They are in OpenAI Responses API format, which means each event has a type (like response.created, response.output_text.delta, etc) and data. These events are useful if you want to stream response messages to the user as soon as they are generated.
Computer-tool raw events keep the same preview-vs-GA distinction as stored results. Preview flows stream computer_call items with one action, while gpt-5.4 can stream computer_call items with batched actions[]. The higher-level [RunItemStreamEvent][agents.stream_events.RunItemStreamEvent] surface does not add a special computer-only event name for this: both shapes still surface as tool_called, and the screenshot result comes back as tool_output wrapping a computer_call_output item.
For example, this will output the text generated by the LLM token-by-token.
import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner
async def main():
agent = Agent(
name="Joker",
instructions="You are a helpful assistant.",
)
result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
print(event.data.delta, end="", flush=True)
if __name__ == "__main__":
asyncio.run(main())Streaming is compatible with runs that pause for tool approval. If a tool requires approval, result.stream_events() finishes and pending approvals are exposed in [RunResultStreaming.interruptions][agents.result.RunResultStreaming.interruptions]. Convert the result to a [RunState][agents.run_state.RunState] with result.to_state(), approve or reject the interruption, and then resume with Runner.run_streamed(...).
result = Runner.run_streamed(agent, "Delete temporary files if they are no longer needed.")
async for _event in result.stream_events():
pass
if result.interruptions:
state = result.to_state()
for interruption in result.interruptions:
state.approve(interruption)
result = Runner.run_streamed(agent, state)
async for _event in result.stream_events():
passFor a full pause/resume walkthrough, see the human-in-the-loop guide.
If you need to stop a streaming run in the middle, call [result.cancel()][agents.result.RunResultStreaming.cancel]. By default this stops the run immediately. To let the current turn finish cleanly before stopping, call result.cancel(mode="after_turn") instead.
A streamed run is not complete until result.stream_events() finishes. The SDK may still be persisting session items, finalizing approval state, or compacting history after the last visible token.
If you are manually continuing from [result.to_input_list(mode="normalized")][agents.result.RunResultBase.to_input_list], and cancel(mode="after_turn") stops after a tool turn, continue that unfinished turn by rerunning result.last_agent with that normalized input instead of appending a fresh user turn right away.
- If a streamed run stopped for tool approval, do not treat that as a new turn. Finish draining the stream, inspect
result.interruptions, and resume fromresult.to_state()instead. - Use [
RunConfig.session_input_callback][agents.run.RunConfig.session_input_callback] to customize how retrieved session history and the new user input are merged before the next model call. If you rewrite new-turn items there, the rewritten version is what gets persisted for that turn.
[RunItemStreamEvent][agents.stream_events.RunItemStreamEvent]s are higher level events. They inform you when an item has been fully generated. This allows you to push progress updates at the level of "message generated", "tool ran", etc, instead of each token. Similarly, [AgentUpdatedStreamEvent][agents.stream_events.AgentUpdatedStreamEvent] gives you updates when the current agent changes (e.g. as the result of a handoff).
RunItemStreamEvent.name uses a fixed set of semantic event names:
message_output_createdhandoff_requestedhandoff_occuredtool_calledtool_search_calledtool_search_output_createdtool_outputreasoning_item_createdmcp_approval_requestedmcp_approval_responsemcp_list_tools
handoff_occured is intentionally misspelled for backward compatibility.
When you use hosted tool search, tool_search_called is emitted when the model issues a tool-search request and tool_search_output_created is emitted when the Responses API returns the loaded subset.
For example, this will ignore raw events and stream updates to the user.
import asyncio
import random
from agents import Agent, ItemHelpers, Runner, function_tool
@function_tool
def how_many_jokes() -> int:
return random.randint(1, 10)
async def main():
agent = Agent(
name="Joker",
instructions="First call the `how_many_jokes` tool, then tell that many jokes.",
tools=[how_many_jokes],
)
result = Runner.run_streamed(
agent,
input="Hello",
)
print("=== Run starting ===")
async for event in result.stream_events():
# We'll ignore the raw responses event deltas
if event.type == "raw_response_event":
continue
# When the agent updates, print that
elif event.type == "agent_updated_stream_event":
print(f"Agent updated: {event.new_agent.name}")
continue
# When items are generated, print them
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print("-- Tool was called")
elif event.item.type == "tool_call_output_item":
print(f"-- Tool output: {event.item.output}")
elif event.item.type == "message_output_item":
print(f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}")
else:
pass # Ignore other event types
print("=== Run complete ===")
if __name__ == "__main__":
asyncio.run(main())