Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 63 additions & 42 deletions packages/core/src/session/runner/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import { toLLMMessages } from "./to-llm-message"
import { MAX_STEPS_PROMPT } from "./max-steps"
import { Snapshot } from "../../snapshot"

const MAX_PROVIDER_RETRIES = 2
const PROVIDER_RETRY_DELAY_MS = 500

/**
* Runs one durable coding-agent Session until it settles.
*
Expand Down Expand Up @@ -222,54 +225,72 @@ export const layer = Layer.effect(
const publish = (event: LLMEvent, outputPaths: ReadonlyArray<string> = []) =>
withPublication(publisher.publish(event, outputPaths))
let overflowFailure: ProviderErrorEvent | undefined
const providerStream = llm.stream(request).pipe(
Stream.runForEach((event) =>
Effect.gen(function* () {
if (overflowFailure || publisher.hasProviderError()) return
if (LLMEvent.is.providerError(event)) {
if (isContextOverflowFailure(event) && !publisher.hasAssistantStarted()) {
overflowFailure = event
return
}
}
yield* publish(event)
if (event.type !== "tool-call" || event.providerExecuted) return
if (!toolMaterialization) {
yield* withPublication(publisher.failUnsettledTools("Tools are disabled after the maximum agent steps"))
return
}
needsContinuation = true
const assistantMessageID = yield* publisher.assistantMessageID(event.id)
yield* Effect.uninterruptibleMask((restore) =>
restore(
toolMaterialization.settle({
sessionID: session.id,
agent: agent.id,
assistantMessageID,
call: event,
}),
).pipe(
Effect.flatMap((settlement) =>
publish(
LLMEvent.toolResult({
id: event.id,
name: event.name,
result: settlement.result,
output: settlement.output,
const runProvider: (remaining: number, attempt: number) => Effect.Effect<void, LLMError> = Effect.fnUntraced(
function* (remaining: number, attempt: number) {
let retryFailure: ProviderErrorEvent | undefined
yield* llm.stream(request).pipe(
Stream.runForEach((event) =>
Effect.gen(function* () {
if (overflowFailure || retryFailure || publisher.hasProviderError()) return
if (LLMEvent.is.providerError(event) && !publisher.hasAssistantStarted()) {
if (isContextOverflowFailure(event)) {
overflowFailure = event
return
}
if (event.retryable) {
retryFailure = event
return
}
}
yield* publish(event)
if (event.type !== "tool-call" || event.providerExecuted) return
if (!toolMaterialization) {
yield* withPublication(
publisher.failUnsettledTools("Tools are disabled after the maximum agent steps"),
)
return
}
needsContinuation = true
const assistantMessageID = yield* publisher.assistantMessageID(event.id)
yield* Effect.uninterruptibleMask((restore) =>
restore(
toolMaterialization.settle({
sessionID: session.id,
agent: agent.id,
assistantMessageID,
call: event,
}),
settlement.outputPaths ?? [],
).pipe(
Effect.flatMap((settlement) =>
publish(
LLMEvent.toolResult({
id: event.id,
name: event.name,
result: settlement.result,
output: settlement.output,
}),
settlement.outputPaths ?? [],
),
),
),
),
),
).pipe(FiberSet.run(toolFibers))
}),
),
Effect.ensuring(withPublication(publisher.flush())),
).pipe(FiberSet.run(toolFibers))
}),
),
Effect.ensuring(withPublication(publisher.flush())),
)
if (!retryFailure) return
if (remaining === 0) {
yield* publish(retryFailure)
return
}
yield* Effect.sleep(`${PROVIDER_RETRY_DELAY_MS * 2 ** attempt} millis`)
yield* runProvider(remaining - 1, attempt + 1)
},
)

return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const stream = yield* restore(providerStream).pipe(Effect.exit)
const stream = yield* restore(runProvider(MAX_PROVIDER_RETRIES, 0)).pipe(Effect.exit)
const failure =
stream._tag === "Failure" ? Option.getOrUndefined(Cause.findErrorOption(stream.cause)) : undefined
if (
Expand Down
57 changes: 57 additions & 0 deletions packages/core/test/session-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import { ModelV2 } from "@opencode-ai/core/model"
import { Location } from "@opencode-ai/core/location"
import { ProviderV2 } from "@opencode-ai/core/provider"
import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect"
import * as TestClock from "effect/testing/TestClock"
import { asc, eq } from "drizzle-orm"
import { testEffect } from "./lib/effect"

Expand Down Expand Up @@ -2943,6 +2944,62 @@ describe("SessionRunnerLLM", () => {
}),
)

it.effect("retries transient provider errors before durable assistant output", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Recover transient stream" }), resume: false })

requests.length = 0
responses = [
[LLMEvent.providerError({ message: "stream_read_error", retryable: true })],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.textStart({ id: "text-after-retry" }),
LLMEvent.textDelta({ id: "text-after-retry", text: "Recovered" }),
LLMEvent.textEnd({ id: "text-after-retry" }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]

const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Effect.yieldNow
yield* TestClock.adjust("500 millis")
yield* Fiber.join(run)

expect(requests).toHaveLength(2)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Recover transient stream" },
{ type: "assistant", finish: "stop", content: [{ type: "text", text: "Recovered" }] },
])
}),
)

it.effect("bounds transient provider retries", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Bound transient retries" }), resume: false })

requests.length = 0
responses = Array.from({ length: 3 }, () => [
LLMEvent.providerError({ message: "server_error", retryable: true }),
])

const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Effect.yieldNow
yield* TestClock.adjust("1500 millis")
yield* Fiber.join(run)

expect(requests).toHaveLength(3)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Bound transient retries" },
{ type: "assistant", finish: "error", error: { type: "unknown", message: "server_error" } },
])
}),
)

it.effect("does not recover context overflow after durable assistant output", () =>
Effect.gen(function* () {
yield* setup
Expand Down
29 changes: 23 additions & 6 deletions packages/llm/src/protocols/openai-responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ type OpenAIResponsesStreamItem = Schema.Schema.Type<typeof OpenAIResponsesStream
// `response.failed` carries them under `response.error`. We capture both so
// the parser can surface a useful provider-error message in either path.
const OpenAIResponsesErrorPayload = Schema.Struct({
type: optionalNull(Schema.String),
code: optionalNull(Schema.String),
message: optionalNull(Schema.String),
param: optionalNull(Schema.String),
Expand All @@ -224,6 +225,7 @@ const OpenAIResponsesEvent = Schema.Struct({
[Schema.Record(Schema.String, Schema.Unknown)],
),
),
error: optionalNull(OpenAIResponsesErrorPayload),
code: Schema.optional(Schema.String),
message: Schema.optional(Schema.String),
param: Schema.optional(Schema.String),
Expand Down Expand Up @@ -593,10 +595,10 @@ type StepResult = readonly [ParserState, ReadonlyArray<LLMEvent>]
const NO_EVENTS: StepResult["1"] = []

// `response.completed` / `response.incomplete` are clean finishes that emit a
// `finish` event; `response.failed` is a hard failure that emits a
// `provider-error`. All three end the stream — kept in one set so `step` and
// `finish` event; `response.failed` and `error` are hard failures that emit a
// `provider-error`. All four end the stream — kept in one set so `step` and
// the protocol's `terminal` predicate stay in sync.
const TERMINAL_TYPES = new Set(["response.completed", "response.incomplete", "response.failed"])
const TERMINAL_TYPES = new Set(["response.completed", "response.incomplete", "response.failed", "error"])

const onOutputTextDelta = (state: ParserState, event: OpenAIResponsesEvent): StepResult => {
if (!event.delta) return [state, NO_EVENTS]
Expand Down Expand Up @@ -880,19 +882,34 @@ const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): Step
// the bare message — production rate limits and context-length failures used
// to be indistinguishable from generic stream drops.
const providerErrorMessage = (event: OpenAIResponsesEvent, fallback: string): string => {
const nested = event.response?.error ?? undefined
const nested = event.error ?? event.response?.error ?? undefined
const message = event.message || nested?.message || undefined
const code = event.code || nested?.code || undefined
if (message && code) return `${code}: ${message}`
return message || code || fallback
}

const providerError = (event: OpenAIResponsesEvent, fallback: string) => {
const code = event.code || event.response?.error?.code || undefined
const nested = event.error ?? event.response?.error ?? undefined
const code = event.code || nested?.code || undefined
const type = nested?.type || undefined
const message = providerErrorMessage(event, fallback)
const retryable = [
"internal_error",
"rate_limit_error",
"rate_limit_exceeded",
"server_error",
"server_is_overloaded",
"service_unavailable_error",
"stream_read_error",
"upstream_error",
].some((value) => value === code || value === type)
return LLMEvent.providerError({
message,
classification: code === "context_length_exceeded" || isContextOverflow(message) ? "context-overflow" : undefined,
...(code === "context_length_exceeded" || isContextOverflow(message)
? { classification: "context-overflow" as const }
: {}),
...(retryable ? { retryable: true } : {}),
})
}

Expand Down
61 changes: 57 additions & 4 deletions packages/llm/test/provider/openai-responses.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,9 @@ describe("OpenAI Responses route", () => {
// sometimes-generic provider message. The bare message alone meant
// production errors like rate limits were indistinguishable from
// unrelated stream failures.
expect(response.events).toEqual([{ type: "provider-error", message: "rate_limit_exceeded: Slow down" }])
expect(response.events).toEqual([
{ type: "provider-error", message: "rate_limit_exceeded: Slow down", retryable: true },
])
}),
)

Expand All @@ -1337,7 +1339,7 @@ describe("OpenAI Responses route", () => {
Effect.provide(fixedResponse(sseEvents({ type: "error", code: "internal_error" }))),
)

expect(response.events).toEqual([{ type: "provider-error", message: "internal_error" }])
expect(response.events).toEqual([{ type: "provider-error", message: "internal_error", retryable: true }])
}),
)

Expand All @@ -1347,7 +1349,7 @@ describe("OpenAI Responses route", () => {
Effect.provide(fixedResponse(sseEvents({ type: "error", code: "internal_error", message: "" }))),
)

expect(response.events).toEqual([{ type: "provider-error", message: "internal_error" }])
expect(response.events).toEqual([{ type: "provider-error", message: "internal_error", retryable: true }])
}),
)

Expand All @@ -1371,7 +1373,9 @@ describe("OpenAI Responses route", () => {
),
)

expect(response.events).toEqual([{ type: "provider-error", message: "server_error: Upstream model unavailable" }])
expect(response.events).toEqual([
{ type: "provider-error", message: "server_error: Upstream model unavailable", retryable: true },
])
}),
)

Expand Down Expand Up @@ -1420,6 +1424,55 @@ describe("OpenAI Responses route", () => {
}),
)

it.effect("surfaces and marks transient nested error envelopes retryable", () =>
Effect.gen(function* () {
const response = yield* LLMClient.generate(request).pipe(
Effect.provide(
fixedResponse(
sseEvents({
type: "error",
error: {
type: "upstream_error",
code: "stream_read_error",
message: "The upstream stream ended unexpectedly",
},
}),
),
),
)

expect(response.events).toEqual([
{
type: "provider-error",
message: "stream_read_error: The upstream stream ended unexpectedly",
retryable: true,
},
])
}),
)

it.effect("stops parsing after a terminal error event", () =>
Effect.gen(function* () {
const response = yield* LLMClient.generate(request).pipe(
Effect.provide(
fixedResponse(
sseEvents(
{
type: "error",
error: { type: "server_error", code: "server_error", message: "Transient failure" },
},
{ type: "response.output_text.delta", item_id: "ignored", delta: "must not publish" },
),
),
),
)

expect(response.events).toEqual([
{ type: "provider-error", message: "server_error: Transient failure", retryable: true },
])
}),
)

it.effect("falls back to a stable default when both error and response are absent", () =>
Effect.gen(function* () {
const response = yield* LLMClient.generate(request).pipe(
Expand Down
Loading