Skip to content

Commit 6814a54

Browse files
authored
fix: #2540 reattach resumed traces without duplicate trace starts (#2547)
1 parent 4df07f0 commit 6814a54

8 files changed

Lines changed: 719 additions & 32 deletions

File tree

src/agents/run.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@
107107
from .run_state import RunState
108108
from .tool import dispose_resolved_computers
109109
from .tool_guardrails import ToolInputGuardrailResult, ToolOutputGuardrailResult
110-
from .tracing import Span, SpanError, agent_span, get_current_trace, trace
111-
from .tracing.context import TraceCtxManager
110+
from .tracing import Span, SpanError, agent_span, get_current_trace
111+
from .tracing.context import TraceCtxManager, create_trace_for_run
112112
from .tracing.span_data import AgentSpanData
113113
from .util import _error_tracing
114114

@@ -549,6 +549,8 @@ async def run(
549549
metadata=trace_metadata,
550550
tracing=trace_config,
551551
disabled=run_config.tracing_disabled,
552+
trace_state=run_state._trace_state if run_state is not None else None,
553+
reattach_resumed_trace=is_resumed_state,
552554
):
553555
if is_resumed_state and run_state is not None:
554556
run_state.set_trace(get_current_trace())
@@ -1519,17 +1521,15 @@ def run_streamed(
15191521
# If there's already a trace, we don't create a new one. In addition, we can't end the
15201522
# trace here, because the actual work is done in `stream_events` and this method ends
15211523
# before that.
1522-
new_trace = (
1523-
None
1524-
if get_current_trace()
1525-
else trace(
1526-
workflow_name=trace_workflow_name,
1527-
trace_id=trace_id,
1528-
group_id=trace_group_id,
1529-
metadata=trace_metadata,
1530-
tracing=trace_config,
1531-
disabled=run_config.tracing_disabled,
1532-
)
1524+
new_trace = create_trace_for_run(
1525+
workflow_name=trace_workflow_name,
1526+
trace_id=trace_id,
1527+
group_id=trace_group_id,
1528+
metadata=trace_metadata,
1529+
tracing=trace_config,
1530+
disabled=run_config.tracing_disabled,
1531+
trace_state=run_state._trace_state if run_state is not None else None,
1532+
reattach_resumed_trace=is_resumed_state,
15331533
)
15341534
if run_state is not None:
15351535
run_state.set_trace(new_trace or get_current_trace())

src/agents/run_state.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@
9999
# 2. Keep older readable versions in SUPPORTED_SCHEMA_VERSIONS for backward reads.
100100
# 3. to_json() always emits CURRENT_SCHEMA_VERSION.
101101
# 4. Forward compatibility is intentionally fail-fast (older SDKs reject newer versions).
102-
CURRENT_SCHEMA_VERSION = "1.2"
103-
SUPPORTED_SCHEMA_VERSIONS = frozenset({"1.0", "1.1", CURRENT_SCHEMA_VERSION})
102+
CURRENT_SCHEMA_VERSION = "1.3"
103+
SUPPORTED_SCHEMA_VERSIONS = frozenset({"1.0", "1.1", "1.2", CURRENT_SCHEMA_VERSION})
104104

105105
_FUNCTION_OUTPUT_ADAPTER: TypeAdapter[FunctionCallOutput] = TypeAdapter(FunctionCallOutput)
106106
_COMPUTER_OUTPUT_ADAPTER: TypeAdapter[ComputerCallOutput] = TypeAdapter(ComputerCallOutput)

src/agents/tracing/context.py

Lines changed: 97 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,88 @@
44

55
from .config import TracingConfig
66
from .create import get_current_trace, trace
7-
from .traces import Trace
7+
from .traces import (
8+
Trace,
9+
TraceState,
10+
_hash_tracing_api_key,
11+
_trace_id_was_started,
12+
reattach_trace,
13+
)
14+
15+
16+
def _get_tracing_api_key(tracing: TracingConfig | None) -> str | None:
17+
return tracing.get("api_key") if tracing is not None else None
18+
19+
20+
def _trace_state_matches_effective_settings(
21+
*,
22+
trace_state: TraceState,
23+
workflow_name: str,
24+
trace_id: str | None,
25+
group_id: str | None,
26+
metadata: dict[str, Any] | None,
27+
tracing: TracingConfig | None,
28+
) -> bool:
29+
if trace_state.trace_id is None or trace_state.trace_id != trace_id:
30+
return False
31+
if trace_state.workflow_name != workflow_name:
32+
return False
33+
if trace_state.group_id != group_id:
34+
return False
35+
if trace_state.metadata != metadata:
36+
return False
37+
tracing_api_key = _get_tracing_api_key(tracing)
38+
if trace_state.tracing_api_key is not None:
39+
return trace_state.tracing_api_key == tracing_api_key
40+
if trace_state.tracing_api_key_hash is not None:
41+
# A fingerprint lets stripped RunState snapshots prove the caller
42+
# re-supplied the same explicit key.
43+
return trace_state.tracing_api_key_hash == _hash_tracing_api_key(tracing_api_key)
44+
return tracing_api_key is None
45+
46+
47+
def create_trace_for_run(
48+
*,
49+
workflow_name: str,
50+
trace_id: str | None,
51+
group_id: str | None,
52+
metadata: dict[str, Any] | None,
53+
tracing: TracingConfig | None,
54+
disabled: bool,
55+
trace_state: TraceState | None = None,
56+
reattach_resumed_trace: bool = False,
57+
) -> Trace | None:
58+
"""Return a trace object for this run when one is not already active."""
59+
current_trace = get_current_trace()
60+
if current_trace:
61+
return None
62+
63+
if (
64+
reattach_resumed_trace
65+
and not disabled
66+
and trace_state is not None
67+
and _trace_id_was_started(trace_state.trace_id)
68+
and _trace_state_matches_effective_settings(
69+
trace_state=trace_state,
70+
workflow_name=workflow_name,
71+
trace_id=trace_id,
72+
group_id=group_id,
73+
metadata=metadata,
74+
tracing=tracing,
75+
)
76+
):
77+
# Reuse the live key because secure snapshots may persist only the
78+
# fingerprint, not the secret itself.
79+
return reattach_trace(trace_state, tracing_api_key=_get_tracing_api_key(tracing))
80+
81+
return trace(
82+
workflow_name=workflow_name,
83+
trace_id=trace_id,
84+
group_id=group_id,
85+
metadata=metadata,
86+
tracing=tracing,
87+
disabled=disabled,
88+
)
889

990

1091
class TraceCtxManager:
@@ -18,6 +99,8 @@ def __init__(
1899
metadata: dict[str, Any] | None,
19100
tracing: TracingConfig | None,
20101
disabled: bool,
102+
trace_state: TraceState | None = None,
103+
reattach_resumed_trace: bool = False,
21104
):
22105
self.trace: Trace | None = None
23106
self.workflow_name = workflow_name
@@ -26,18 +109,21 @@ def __init__(
26109
self.metadata = metadata
27110
self.tracing = tracing
28111
self.disabled = disabled
112+
self.trace_state = trace_state
113+
self.reattach_resumed_trace = reattach_resumed_trace
29114

30115
def __enter__(self) -> TraceCtxManager:
31-
current_trace = get_current_trace()
32-
if not current_trace:
33-
self.trace = trace(
34-
workflow_name=self.workflow_name,
35-
trace_id=self.trace_id,
36-
group_id=self.group_id,
37-
metadata=self.metadata,
38-
tracing=self.tracing,
39-
disabled=self.disabled,
40-
)
116+
self.trace = create_trace_for_run(
117+
workflow_name=self.workflow_name,
118+
trace_id=self.trace_id,
119+
group_id=self.group_id,
120+
metadata=self.metadata,
121+
tracing=self.tracing,
122+
disabled=self.disabled,
123+
trace_state=self.trace_state,
124+
reattach_resumed_trace=self.reattach_resumed_trace,
125+
)
126+
if self.trace:
41127
assert self.trace is not None
42128
self.trace.start(mark_as_current=True)
43129
return self

0 commit comments

Comments
 (0)