diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index 9e29ae71ee9e..86d38359649d 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -37,6 +37,9 @@ import { toLLMMessages } from "./to-llm-message" import { MAX_STEPS_PROMPT } from "./max-steps" import { Snapshot } from "../../snapshot" +const MAX_PROVIDER_RETRIES = 2 +const PROVIDER_RETRY_DELAY_MS = 500 + /** * Runs one durable coding-agent Session until it settles. * @@ -222,54 +225,72 @@ export const layer = Layer.effect( const publish = (event: LLMEvent, outputPaths: ReadonlyArray = []) => withPublication(publisher.publish(event, outputPaths)) let overflowFailure: ProviderErrorEvent | undefined - const providerStream = llm.stream(request).pipe( - Stream.runForEach((event) => - Effect.gen(function* () { - if (overflowFailure || publisher.hasProviderError()) return - if (LLMEvent.is.providerError(event)) { - if (isContextOverflowFailure(event) && !publisher.hasAssistantStarted()) { - overflowFailure = event - return - } - } - yield* publish(event) - if (event.type !== "tool-call" || event.providerExecuted) return - if (!toolMaterialization) { - yield* withPublication(publisher.failUnsettledTools("Tools are disabled after the maximum agent steps")) - return - } - needsContinuation = true - const assistantMessageID = yield* publisher.assistantMessageID(event.id) - yield* Effect.uninterruptibleMask((restore) => - restore( - toolMaterialization.settle({ - sessionID: session.id, - agent: agent.id, - assistantMessageID, - call: event, - }), - ).pipe( - Effect.flatMap((settlement) => - publish( - LLMEvent.toolResult({ - id: event.id, - name: event.name, - result: settlement.result, - output: settlement.output, + const runProvider: (remaining: number, attempt: number) => Effect.Effect = Effect.fnUntraced( + function* (remaining: number, attempt: number) { + let retryFailure: ProviderErrorEvent | undefined + yield* llm.stream(request).pipe( + Stream.runForEach((event) => + Effect.gen(function* () { + if (overflowFailure || retryFailure || publisher.hasProviderError()) return + if (LLMEvent.is.providerError(event) && !publisher.hasAssistantStarted()) { + if (isContextOverflowFailure(event)) { + overflowFailure = event + return + } + if (event.retryable) { + retryFailure = event + return + } + } + yield* publish(event) + if (event.type !== "tool-call" || event.providerExecuted) return + if (!toolMaterialization) { + yield* withPublication( + publisher.failUnsettledTools("Tools are disabled after the maximum agent steps"), + ) + return + } + needsContinuation = true + const assistantMessageID = yield* publisher.assistantMessageID(event.id) + yield* Effect.uninterruptibleMask((restore) => + restore( + toolMaterialization.settle({ + sessionID: session.id, + agent: agent.id, + assistantMessageID, + call: event, }), - settlement.outputPaths ?? [], + ).pipe( + Effect.flatMap((settlement) => + publish( + LLMEvent.toolResult({ + id: event.id, + name: event.name, + result: settlement.result, + output: settlement.output, + }), + settlement.outputPaths ?? [], + ), + ), ), - ), - ), - ).pipe(FiberSet.run(toolFibers)) - }), - ), - Effect.ensuring(withPublication(publisher.flush())), + ).pipe(FiberSet.run(toolFibers)) + }), + ), + Effect.ensuring(withPublication(publisher.flush())), + ) + if (!retryFailure) return + if (remaining === 0) { + yield* publish(retryFailure) + return + } + yield* Effect.sleep(`${PROVIDER_RETRY_DELAY_MS * 2 ** attempt} millis`) + yield* runProvider(remaining - 1, attempt + 1) + }, ) return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { - const stream = yield* restore(providerStream).pipe(Effect.exit) + const stream = yield* restore(runProvider(MAX_PROVIDER_RETRIES, 0)).pipe(Effect.exit) const failure = stream._tag === "Failure" ? Option.getOrUndefined(Cause.findErrorOption(stream.cause)) : undefined if ( diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 6df50ac57575..21e95094fb24 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -54,6 +54,7 @@ import { ModelV2 } from "@opencode-ai/core/model" import { Location } from "@opencode-ai/core/location" import { ProviderV2 } from "@opencode-ai/core/provider" import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect" +import * as TestClock from "effect/testing/TestClock" import { asc, eq } from "drizzle-orm" import { testEffect } from "./lib/effect" @@ -2943,6 +2944,62 @@ describe("SessionRunnerLLM", () => { }), ) + it.effect("retries transient provider errors before durable assistant output", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Recover transient stream" }), resume: false }) + + requests.length = 0 + responses = [ + [LLMEvent.providerError({ message: "stream_read_error", retryable: true })], + [ + LLMEvent.stepStart({ index: 0 }), + LLMEvent.textStart({ id: "text-after-retry" }), + LLMEvent.textDelta({ id: "text-after-retry", text: "Recovered" }), + LLMEvent.textEnd({ id: "text-after-retry" }), + LLMEvent.stepFinish({ index: 0, reason: "stop" }), + LLMEvent.finish({ reason: "stop" }), + ], + ] + + const run = yield* session.resume(sessionID).pipe(Effect.forkChild) + yield* Effect.yieldNow + yield* TestClock.adjust("500 millis") + yield* Fiber.join(run) + + expect(requests).toHaveLength(2) + expect(yield* session.context(sessionID)).toMatchObject([ + { type: "user", text: "Recover transient stream" }, + { type: "assistant", finish: "stop", content: [{ type: "text", text: "Recovered" }] }, + ]) + }), + ) + + it.effect("bounds transient provider retries", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Bound transient retries" }), resume: false }) + + requests.length = 0 + responses = Array.from({ length: 3 }, () => [ + LLMEvent.providerError({ message: "server_error", retryable: true }), + ]) + + const run = yield* session.resume(sessionID).pipe(Effect.forkChild) + yield* Effect.yieldNow + yield* TestClock.adjust("1500 millis") + yield* Fiber.join(run) + + expect(requests).toHaveLength(3) + expect(yield* session.context(sessionID)).toMatchObject([ + { type: "user", text: "Bound transient retries" }, + { type: "assistant", finish: "error", error: { type: "unknown", message: "server_error" } }, + ]) + }), + ) + it.effect("does not recover context overflow after durable assistant output", () => Effect.gen(function* () { yield* setup diff --git a/packages/llm/src/protocols/openai-responses.ts b/packages/llm/src/protocols/openai-responses.ts index be395b053f61..900618f4a2b1 100644 --- a/packages/llm/src/protocols/openai-responses.ts +++ b/packages/llm/src/protocols/openai-responses.ts @@ -201,6 +201,7 @@ type OpenAIResponsesStreamItem = Schema.Schema.Type] const NO_EVENTS: StepResult["1"] = [] // `response.completed` / `response.incomplete` are clean finishes that emit a -// `finish` event; `response.failed` is a hard failure that emits a -// `provider-error`. All three end the stream — kept in one set so `step` and +// `finish` event; `response.failed` and `error` are hard failures that emit a +// `provider-error`. All four end the stream — kept in one set so `step` and // the protocol's `terminal` predicate stay in sync. -const TERMINAL_TYPES = new Set(["response.completed", "response.incomplete", "response.failed"]) +const TERMINAL_TYPES = new Set(["response.completed", "response.incomplete", "response.failed", "error"]) const onOutputTextDelta = (state: ParserState, event: OpenAIResponsesEvent): StepResult => { if (!event.delta) return [state, NO_EVENTS] @@ -880,7 +882,7 @@ const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): Step // the bare message — production rate limits and context-length failures used // to be indistinguishable from generic stream drops. const providerErrorMessage = (event: OpenAIResponsesEvent, fallback: string): string => { - const nested = event.response?.error ?? undefined + const nested = event.error ?? event.response?.error ?? undefined const message = event.message || nested?.message || undefined const code = event.code || nested?.code || undefined if (message && code) return `${code}: ${message}` @@ -888,11 +890,26 @@ const providerErrorMessage = (event: OpenAIResponsesEvent, fallback: string): st } const providerError = (event: OpenAIResponsesEvent, fallback: string) => { - const code = event.code || event.response?.error?.code || undefined + const nested = event.error ?? event.response?.error ?? undefined + const code = event.code || nested?.code || undefined + const type = nested?.type || undefined const message = providerErrorMessage(event, fallback) + const retryable = [ + "internal_error", + "rate_limit_error", + "rate_limit_exceeded", + "server_error", + "server_is_overloaded", + "service_unavailable_error", + "stream_read_error", + "upstream_error", + ].some((value) => value === code || value === type) return LLMEvent.providerError({ message, - classification: code === "context_length_exceeded" || isContextOverflow(message) ? "context-overflow" : undefined, + ...(code === "context_length_exceeded" || isContextOverflow(message) + ? { classification: "context-overflow" as const } + : {}), + ...(retryable ? { retryable: true } : {}), }) } diff --git a/packages/llm/test/provider/openai-responses.test.ts b/packages/llm/test/provider/openai-responses.test.ts index 06434e1c0291..26757d018e70 100644 --- a/packages/llm/test/provider/openai-responses.test.ts +++ b/packages/llm/test/provider/openai-responses.test.ts @@ -1327,7 +1327,9 @@ describe("OpenAI Responses route", () => { // sometimes-generic provider message. The bare message alone meant // production errors like rate limits were indistinguishable from // unrelated stream failures. - expect(response.events).toEqual([{ type: "provider-error", message: "rate_limit_exceeded: Slow down" }]) + expect(response.events).toEqual([ + { type: "provider-error", message: "rate_limit_exceeded: Slow down", retryable: true }, + ]) }), ) @@ -1337,7 +1339,7 @@ describe("OpenAI Responses route", () => { Effect.provide(fixedResponse(sseEvents({ type: "error", code: "internal_error" }))), ) - expect(response.events).toEqual([{ type: "provider-error", message: "internal_error" }]) + expect(response.events).toEqual([{ type: "provider-error", message: "internal_error", retryable: true }]) }), ) @@ -1347,7 +1349,7 @@ describe("OpenAI Responses route", () => { Effect.provide(fixedResponse(sseEvents({ type: "error", code: "internal_error", message: "" }))), ) - expect(response.events).toEqual([{ type: "provider-error", message: "internal_error" }]) + expect(response.events).toEqual([{ type: "provider-error", message: "internal_error", retryable: true }]) }), ) @@ -1371,7 +1373,9 @@ describe("OpenAI Responses route", () => { ), ) - expect(response.events).toEqual([{ type: "provider-error", message: "server_error: Upstream model unavailable" }]) + expect(response.events).toEqual([ + { type: "provider-error", message: "server_error: Upstream model unavailable", retryable: true }, + ]) }), ) @@ -1420,6 +1424,55 @@ describe("OpenAI Responses route", () => { }), ) + it.effect("surfaces and marks transient nested error envelopes retryable", () => + Effect.gen(function* () { + const response = yield* LLMClient.generate(request).pipe( + Effect.provide( + fixedResponse( + sseEvents({ + type: "error", + error: { + type: "upstream_error", + code: "stream_read_error", + message: "The upstream stream ended unexpectedly", + }, + }), + ), + ), + ) + + expect(response.events).toEqual([ + { + type: "provider-error", + message: "stream_read_error: The upstream stream ended unexpectedly", + retryable: true, + }, + ]) + }), + ) + + it.effect("stops parsing after a terminal error event", () => + Effect.gen(function* () { + const response = yield* LLMClient.generate(request).pipe( + Effect.provide( + fixedResponse( + sseEvents( + { + type: "error", + error: { type: "server_error", code: "server_error", message: "Transient failure" }, + }, + { type: "response.output_text.delta", item_id: "ignored", delta: "must not publish" }, + ), + ), + ), + ) + + expect(response.events).toEqual([ + { type: "provider-error", message: "server_error: Transient failure", retryable: true }, + ]) + }), + ) + it.effect("falls back to a stable default when both error and response are absent", () => Effect.gen(function* () { const response = yield* LLMClient.generate(request).pipe(