Skip to content
91 changes: 89 additions & 2 deletions src/lib/init/wizard-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { randomBytes } from "node:crypto";

import { MastraClient } from "@mastra/client-js";
import {
addBreadcrumb,
captureException,
getTraceData,
setTag,
Expand Down Expand Up @@ -423,17 +424,72 @@ const MAX_RESUME_RETRIES = 3;
const RETRY_BACKOFF_MS = [2000, 4000, 8000];

type ResumeRetryArgs = {
run: { resumeAsync: (args: Record<string, unknown>) => Promise<unknown> };
run: {
resumeAsync: (args: Record<string, unknown>) => Promise<unknown>;
readonly runId: string;
};
workflow: {
runById: (runId: string, opts?: { fields?: string[] }) => Promise<unknown>;
};
stepId: string;
resumeData: Record<string, unknown>;
tracingOptions: Record<string, unknown>;
spin: SpinnerHandle;
ui: WizardUI;
};

/**
* Detect Mastra's "step not suspended" 500 — means the server already
* processed this step (our previous request succeeded but the response was
* dropped before we received it). The MastraClientError message embeds the
* server body, e.g.:
* "HTTP error! status: 500 - {"error":"This workflow step 'X' was not suspended..."}"
*/
function isStepAlreadyAdvancedError(err: unknown): boolean {
return err instanceof Error && err.message.includes("was not suspended");
}

/**
* Recover from a stale-step retry by fetching the current run state.
* If the workflow has already advanced (e.g. plan-codemods is now suspended),
* the returned WorkflowRunResult lets the main loop continue from the right step.
*/
async function tryRecoverCurrentRunState(
workflow: ResumeRetryArgs["workflow"],
runId: string
): Promise<WorkflowRunResult | null> {
try {
const raw = await withTimeout(
workflow.runById(runId, {
fields: ["steps", "activeStepsPath", "result"],
}),
Comment thread
betegon marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
API_TIMEOUT_MS,
"Run state recovery"
);
// runById returns activeStepsPath (Record<stepId, executionPath>) but
// not suspended (string[][]). The main loop reads result.suspended to
// find the active step; without it, stepId falls back to "unknown" and
// extractSuspendPayload iterates all steps — picking the first with any
// suspendPayload, which could be a completed step with stale D1 data.
// Derive suspended from the activeStepsPath keys so the lookup is
// deterministic: those keys are exactly the currently-active step IDs.
const state = raw as Record<string, unknown>;
if (!state.suspended && state.activeStepsPath) {
state.suspended = Object.keys(
state.activeStepsPath as Record<string, unknown>
).map((id) => [id]);
}
return assertWorkflowResult(state);
Comment thread
betegon marked this conversation as resolved.
Comment thread
betegon marked this conversation as resolved.
} catch {
return null;
}
Comment thread
sentry[bot] marked this conversation as resolved.
}

// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: retry loop branches across transient errors, stale-step recovery, and backoff
async function resumeWithRetry(
args: ResumeRetryArgs
): Promise<WorkflowRunResult> {
const { run, stepId, resumeData, tracingOptions, ui } = args;
const { run, workflow, stepId, resumeData, tracingOptions, spin, ui } = args;
let lastError: unknown;
for (let attempt = 0; attempt <= MAX_RESUME_RETRIES; attempt++) {
try {
Expand All @@ -458,6 +514,35 @@ async function resumeWithRetry(
return assertWorkflowResult(raw);
} catch (err) {
lastError = err;
// "Step not suspended" means the server processed our step but the
// response was dropped (network blip, CF response timeout, etc.).
// Retrying the same step will always 500. Fetch the current run state
// so the main loop can continue from whichever step is actually suspended.
if (isStepAlreadyAdvancedError(err)) {
ui.clearOverlay?.();
spin.message("Reconnecting...");
const recovered = await tryRecoverCurrentRunState(workflow, run.runId);
if (recovered) {
addBreadcrumb({
category: "wizard",
message: `stale-step recovery succeeded for ${stepId}`,
level: "info",
data: { stepId, runId: run.runId },
});
return recovered;
}
// Recovery failed — the step is confirmed not suspended and retrying
// it will always 500. Throw immediately instead of wasting 14s.
captureException(err, {
level: "warning",
tags: {
"wizard.stale_step_recovery": "failed",
"wizard.resume_step": stepId,
},
extra: { runId: run.runId },
});
throw err;
}
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
betegon marked this conversation as resolved.
if (attempt === MAX_RESUME_RETRIES) {
ui.clearOverlay?.();
throw err;
Expand Down Expand Up @@ -680,9 +765,11 @@ export async function runWizard(initialOptions: WizardOptions): Promise<void> {

result = await resumeWithRetry({
run,
workflow,
stepId: extracted.stepId,
resumeData,
tracingOptions,
spin,
ui,
});
}
Expand Down
Loading
Loading