@@ -789,25 +789,37 @@ async def dispatch_stream_events() -> None:
789789 break
790790
791791 dispatch_task = asyncio .create_task (dispatch_stream_events ())
792+ stream_iteration_cancelled = False
792793
793794 try :
794795 from .stream_events import AgentUpdatedStreamEvent
795796
796797 current_agent = run_result_streaming .current_agent
797- async for event in run_result_streaming .stream_events ():
798- if isinstance (event , AgentUpdatedStreamEvent ):
799- current_agent = event .new_agent
800-
801- payload : AgentToolStreamEvent = {
802- "event" : event ,
803- "agent" : current_agent ,
804- "tool_call" : context .tool_call ,
805- }
806- await event_queue .put (payload )
798+ try :
799+ async for event in run_result_streaming .stream_events ():
800+ if isinstance (event , AgentUpdatedStreamEvent ):
801+ current_agent = event .new_agent
802+
803+ payload : AgentToolStreamEvent = {
804+ "event" : event ,
805+ "agent" : current_agent ,
806+ "tool_call" : context .tool_call ,
807+ }
808+ await event_queue .put (payload )
809+ except asyncio .CancelledError :
810+ stream_iteration_cancelled = True
811+ raise
807812 finally :
808- await event_queue .put (None )
809- await event_queue .join ()
810- await dispatch_task
813+ if stream_iteration_cancelled :
814+ dispatch_task .cancel ()
815+ try :
816+ await dispatch_task
817+ except asyncio .CancelledError :
818+ pass
819+ else :
820+ await event_queue .put (None )
821+ await event_queue .join ()
822+ await dispatch_task
811823 run_result = run_result_streaming
812824 else :
813825 run_result = await Runner .run (
0 commit comments