diff --git a/CONTEXT.md b/CONTEXT.md index 5e5955d344a1..e6e597799809 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -162,12 +162,14 @@ _Avoid_: Response envelope - SDK executes Server's assembled `HttpRouter` in memory. It opens no listener and performs no network I/O, while preserving Server routing, middleware, codecs, handlers, and errors. - The Effect Client and SDK re-export their decoded datatype facade from Schema so callers do not depend on internal package locations or Core's versioned names. - A capability intended for both networked and **Embedded OpenCode** belongs in the authoritative public `HttpApi`; embedded-only same-process capabilities extend **Embedded OpenCode** separately. -- `sessions.events({ sessionID, after })` is a public durable Session event stream. It verifies the Session, replays durable events after the optional aggregate sequence, continues with newly committed durable events, excludes live-only fragments, and is transported as SSE in both networked and embedded modes. +- `sessions.events({ sessionID, after })` is one public Session-scoped event stream. It verifies the Session, captures a fixed durable cutoff after registering observation, replays durable events in `(after, cutoff]`, emits one authoritative process-local `session.activity` value, then continues with committed durable events and live-only Session output fragments. Only durable events carry aggregate-sequence cursor metadata. - `events.subscribe()` is a distinct public instance-wide live stream for Session and non-Session activity. It has no replay guarantee and includes connection, heartbeat, and instance-disposal lifecycle events; consumers recover from disconnection by refreshing authoritative state. - A Session ID is not an optional filter on `events.subscribe()`: instance-wide live events and durable Session events have different schemas, replay guarantees, cursors, lifecycle events, and failure behavior. - The initial common OpenCode Client does not expose server-global event aggregation. `events.subscribe()` is bounded to the connected OpenCode instance or workspace; any future cross-instance administrative stream requires a separately designed API. - `events.subscribe()` does not automatically reconnect after transport loss. The live-only stream fails with `ClientError`; consumers refresh authoritative state before explicitly opening a new subscription because events missed during disconnection cannot be replayed. -- `sessions.events({ sessionID, after })` returns the generated HTTP client's cold durable event stream and does not build reconnection policy into the endpoint or client constructor. Transport loss fails the stream with `ClientError`. Callers may compose an explicit resuming stream above it by retaining the last observed durable sequence and opening a new subscription with `after`; any reusable resume helper remains a separate API design question. +- `sessions.events({ sessionID, after })` returns the generated HTTP client's cold Session event stream and does not build reconnection policy into the endpoint or client constructor. Transport loss fails the stream with `ClientError`. Callers resume with the last observed durable sequence; live-only fragments are not replayed, and every connection receives a fresh authoritative activity value. +- The Session event stream treats database rows as authoritative and process notifications as bounded, non-blocking wakeups. Before emitting an observed live-only fragment it drains later durable rows, preserving causal durable start boundaries without exposing an additional fence field. +- Aggregate deletion currently removes the Session's durable event rows and sequence, including deletion history. The Session event stream therefore cannot promise replay after deletion until retention or a typed history-expired outcome is designed. - The stable `sessions.list(...)` design returns a **Page** in both networked and **Embedded OpenCode**; embedded execution does not define a separate unbounded array-returning list operation. The beta client currently preserves the existing HTTP `{ data, cursor }` envelope until emitter-level Page projection is implemented. - Session list cursors are opaque branded values carrying continuation query and ordering state. Consumers pass them back unchanged and do not inspect storage anchors or encoded filter fields. - A Session list continuation accepts only its opaque cursor. Scope, filters, ordering, and page size are fixed by the initial query and carried by that cursor. diff --git a/packages/client/src/generated/types.ts b/packages/client/src/generated/types.ts index f426a50f87a9..fbc9d47b84cd 100644 --- a/packages/client/src/generated/types.ts +++ b/packages/client/src/generated/types.ts @@ -601,6 +601,14 @@ export type SessionsEventsInput = { } export type SessionsEventsOutput = + | { + readonly id: string + readonly metadata?: { readonly [x: string]: unknown } + readonly type: "session.activity" + readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } + readonly location?: { readonly directory: string; readonly workspaceID?: string } + readonly data: { readonly sessionID: string; readonly active: boolean } + } | { readonly id: string readonly metadata?: { readonly [x: string]: unknown } @@ -810,6 +818,20 @@ export type SessionsEventsOutput = readonly textID: string } } + | { + readonly id: string + readonly metadata?: { readonly [x: string]: unknown } + readonly type: "session.next.text.delta" + readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } + readonly location?: { readonly directory: string; readonly workspaceID?: string } + readonly data: { + readonly timestamp: number + readonly sessionID: string + readonly assistantMessageID: string + readonly textID: string + readonly delta: string + } + } | { readonly id: string readonly metadata?: { readonly [x: string]: unknown } @@ -824,6 +846,49 @@ export type SessionsEventsOutput = readonly text: string } } + | { + readonly id: string + readonly metadata?: { readonly [x: string]: unknown } + readonly type: "session.next.reasoning.started" + readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } + readonly location?: { readonly directory: string; readonly workspaceID?: string } + readonly data: { + readonly timestamp: number + readonly sessionID: string + readonly assistantMessageID: string + readonly reasoningID: string + readonly providerMetadata?: { readonly [x: string]: { readonly [x: string]: unknown } } + } + } + | { + readonly id: string + readonly metadata?: { readonly [x: string]: unknown } + readonly type: "session.next.reasoning.delta" + readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } + readonly location?: { readonly directory: string; readonly workspaceID?: string } + readonly data: { + readonly timestamp: number + readonly sessionID: string + readonly assistantMessageID: string + readonly reasoningID: string + readonly delta: string + } + } + | { + readonly id: string + readonly metadata?: { readonly [x: string]: unknown } + readonly type: "session.next.reasoning.ended" + readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } + readonly location?: { readonly directory: string; readonly workspaceID?: string } + readonly data: { + readonly timestamp: number + readonly sessionID: string + readonly assistantMessageID: string + readonly reasoningID: string + readonly text: string + readonly providerMetadata?: { readonly [x: string]: { readonly [x: string]: unknown } } + } + } | { readonly id: string readonly metadata?: { readonly [x: string]: unknown } @@ -838,6 +903,20 @@ export type SessionsEventsOutput = readonly name: string } } + | { + readonly id: string + readonly metadata?: { readonly [x: string]: unknown } + readonly type: "session.next.tool.input.delta" + readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } + readonly location?: { readonly directory: string; readonly workspaceID?: string } + readonly data: { + readonly timestamp: number + readonly sessionID: string + readonly assistantMessageID: string + readonly callID: string + readonly delta: string + } + } | { readonly id: string readonly metadata?: { readonly [x: string]: unknown } @@ -932,35 +1011,6 @@ export type SessionsEventsOutput = } } } - | { - readonly id: string - readonly metadata?: { readonly [x: string]: unknown } - readonly type: "session.next.reasoning.started" - readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } - readonly location?: { readonly directory: string; readonly workspaceID?: string } - readonly data: { - readonly timestamp: number - readonly sessionID: string - readonly assistantMessageID: string - readonly reasoningID: string - readonly providerMetadata?: { readonly [x: string]: { readonly [x: string]: unknown } } - } - } - | { - readonly id: string - readonly metadata?: { readonly [x: string]: unknown } - readonly type: "session.next.reasoning.ended" - readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } - readonly location?: { readonly directory: string; readonly workspaceID?: string } - readonly data: { - readonly timestamp: number - readonly sessionID: string - readonly assistantMessageID: string - readonly reasoningID: string - readonly text: string - readonly providerMetadata?: { readonly [x: string]: { readonly [x: string]: unknown } } - } - } | { readonly id: string readonly metadata?: { readonly [x: string]: unknown } @@ -994,6 +1044,19 @@ export type SessionsEventsOutput = readonly reason: "auto" | "manual" } } + | { + readonly id: string + readonly metadata?: { readonly [x: string]: unknown } + readonly type: "session.next.compaction.delta" + readonly durable?: { readonly aggregateID: string; readonly seq: number; readonly version: number } + readonly location?: { readonly directory: string; readonly workspaceID?: string } + readonly data: { + readonly timestamp: number + readonly sessionID: string + readonly messageID: string + readonly text: string + } + } | { readonly id: string readonly metadata?: { readonly [x: string]: unknown } diff --git a/packages/client/test/effect.test.ts b/packages/client/test/effect.test.ts index 6ca73c22f46a..12d1316c17c9 100644 --- a/packages/client/test/effect.test.ts +++ b/packages/client/test/effect.test.ts @@ -22,7 +22,7 @@ test("session methods retain decoded Effect inputs and outputs", async () => { return Effect.succeed( HttpClientResponse.fromWeb( request, - new Response(`data: ${JSON.stringify(modelSwitchedEvent)}\n\n`, { + new Response(`data: ${JSON.stringify(modelSwitchedEvent)}\n\ndata: ${JSON.stringify(activityEvent)}\n\n`, { headers: { "content-type": "text/event-stream" }, }), ), @@ -93,6 +93,8 @@ test("session methods retain decoded Effect inputs and outputs", async () => { expect(DateTime.toEpochMillis(result.admitted.timeCreated)).toBe(1_717_171_717_000) expect(result.context).toEqual([]) expect(DateTime.toEpochMillis(result.events[0].data.timestamp)).toBe(1_717_171_717_000) + expect(result.events[1]).toEqual(activityEvent) + expect(result.events[1]).not.toHaveProperty("durable") expect(result.message).toEqual(expect.objectContaining({ id: "msg_model", type: "model-switched" })) }) @@ -145,3 +147,9 @@ const modelSwitchedEvent = { model: { id: "claude", providerID: "anthropic" }, }, } + +const activityEvent = { + id: "evt_activity", + type: "session.activity" as const, + data: { sessionID: "ses_test", active: false }, +} diff --git a/packages/client/test/promise.test.ts b/packages/client/test/promise.test.ts index 952a3fc0d78a..953dc73733c9 100644 --- a/packages/client/test/promise.test.ts +++ b/packages/client/test/promise.test.ts @@ -25,9 +25,12 @@ test("session methods use the public HTTP contract", async () => { const url = typeof input === "string" ? input : input instanceof URL ? input.href : input.url requests.push({ url, init }) if (url.includes("/event")) { - return new Response(`data: ${JSON.stringify(modelSwitchedEvent)}\n\n`, { - headers: { "content-type": "text/event-stream" }, - }) + return new Response( + `data: ${JSON.stringify(modelSwitchedEvent)}\n\ndata: ${JSON.stringify(activityEvent)}\n\n`, + { + headers: { "content-type": "text/event-stream" }, + }, + ) } if (url.includes("/prompt")) return Response.json(admission) if (url.includes("/context")) return Response.json({ data: [] }) @@ -65,7 +68,8 @@ test("session methods use the public HTTP contract", async () => { expect(created.id).toBe("ses_test") expect(admitted.id).toBe("msg_test") expect(context).toEqual([]) - expect(events).toEqual([modelSwitchedEvent]) + expect(events).toEqual([modelSwitchedEvent, activityEvent]) + expect(events[1]).not.toHaveProperty("durable") expect(message).toEqual(modelSwitchedMessage) expect(requests.map((request) => [request.init?.method, request.url])).toEqual([ ["GET", "http://localhost:3000/api/session?limit=10&order=desc"], @@ -153,3 +157,9 @@ const modelSwitchedEvent = { model: { id: "claude", providerID: "anthropic" }, }, } + +const activityEvent = { + id: "evt_activity", + type: "session.activity", + data: { sessionID: "ses_test", active: false }, +} diff --git a/packages/core/src/event.ts b/packages/core/src/event.ts index 5324c319b076..d089627f772a 100644 --- a/packages/core/src/event.ts +++ b/packages/core/src/event.ts @@ -1,9 +1,9 @@ export * as EventV2 from "./event" -import { Cause, Context, Effect, Layer, Option, PubSub, Schema, Stream } from "effect" +import { Cause, Context, Effect, Layer, Option, PubSub, Queue, Schema, Scope, Stream } from "effect" import { Event } from "@opencode-ai/schema/event" import type { Data, Definition, Payload } from "@opencode-ai/schema/event" -import { and, asc, eq, gt } from "drizzle-orm" +import { and, asc, eq, gt, lte } from "drizzle-orm" import { Database } from "./database/database" import { EventSequenceTable, EventTable } from "./event/sql" import { Location } from "./location" @@ -67,6 +67,19 @@ export interface Interface { readonly subscribe: (definition: D) => Stream.Stream> readonly all: () => Stream.Stream readonly durable: (input: { readonly aggregateID: string; readonly after?: number }) => Stream.Stream + readonly observeAggregate: (input: { + readonly aggregateID: string + readonly after?: number + readonly live: (event: Payload) => boolean + }) => Effect.Effect< + { + readonly replay: ReadonlyArray + readonly updates: Stream.Stream + readonly offer: (event: Payload, position?: "after" | "before") => boolean + }, + never, + Scope.Scope + > /** @deprecated Use `all()` and consume the returned stream. */ readonly listen: (listener: Subscriber) => Effect.Effect readonly project: (definition: D, projector: Subscriber) => Effect.Effect @@ -94,12 +107,12 @@ export const layerWith = (options?: LayerOptions) => Effect.gen(function* () { const pubsub = { all: yield* PubSub.unbounded(), - durable: new Map>>(), + durable: new Map>>(), typed: new Map>(), } const projectors = new Map() // TODO: Bind durable projectors to exact type+version before supporting incompatible historical payloads. - const listeners = new Array() + const listeners = new Set() const { db } = yield* Database.Service const getOrCreate = (definition: Definition) => @@ -275,7 +288,7 @@ export const layerWith = (options?: LayerOptions) => if (committed) { yield* Effect.forEach( pubsub.durable.get(committed.aggregateID) ?? [], - (wake) => PubSub.publish(wake, undefined), + (wake) => PubSub.publish(wake, committed.seq), { discard: true }, ) } @@ -472,13 +485,19 @@ export const layerWith = (options?: LayerOptions) => } } - const readAfter = (aggregateID: string, after: number) => + const readAfter = (aggregateID: string, after: number, through?: number) => (options?.beforeAggregateRead?.(aggregateID) ?? Effect.void).pipe( Effect.andThen( db .select() .from(EventTable) - .where(and(eq(EventTable.aggregate_id, aggregateID), gt(EventTable.seq, after))) + .where( + and( + eq(EventTable.aggregate_id, aggregateID), + gt(EventTable.seq, after), + through === undefined ? undefined : lte(EventTable.seq, through), + ), + ) .orderBy(asc(EventTable.seq)) .all(), ), @@ -498,7 +517,7 @@ export const layerWith = (options?: LayerOptions) => const subscribeDurable = (aggregateID: string) => Effect.gen(function* () { - const wake = yield* PubSub.sliding(1) + const wake = yield* PubSub.sliding(1) const subscription = yield* PubSub.subscribe(wake) yield* Effect.acquireRelease( Effect.sync(() => { @@ -516,34 +535,83 @@ export const layerWith = (options?: LayerOptions) => return subscription }) + const aggregateDrain = ( + aggregateID: string, + after: number, + ): ((through?: number) => Effect.Effect>) => { + let sequence = after + return (through?: number) => + through !== undefined && through <= sequence + ? Effect.succeed>([]) + : Effect.suspend(() => readAfter(aggregateID, sequence, through)).pipe( + Effect.tap((events) => + Effect.sync(() => { + sequence = events.at(-1)?.durable?.seq ?? sequence + }), + ), + ) + } + const durable = (input: { readonly aggregateID: string; readonly after?: number }): Stream.Stream => Stream.unwrap( Effect.gen(function* () { const wakes = yield* subscribeDurable(input.aggregateID) - let sequence = input.after ?? -1 - const read = Effect.suspend(() => readAfter(input.aggregateID, sequence)).pipe( - Effect.tap((events) => - Effect.sync(() => { - sequence = events.at(-1)?.durable?.seq ?? sequence - }), - ), - ) - const historical = yield* read - const live = Stream.fromSubscription(wakes).pipe( - Stream.mapEffect(() => read), - Stream.flattenIterable, - ) + const drain = aggregateDrain(input.aggregateID, input.after ?? -1) + const historical = yield* drain() + const live = Stream.fromSubscription(wakes).pipe(Stream.mapEffect(drain), Stream.flattenIterable) return Stream.concat(Stream.fromIterable(historical), live) }), ) + const observeAggregate: Interface["observeAggregate"] = (input) => + Effect.gen(function* () { + type Signal = + | { readonly _tag: "durable" } + | { readonly _tag: "transient"; readonly event: Payload; readonly position: "after" | "before" } + const signals = yield* Queue.dropping(256) + const wakes = yield* subscribeDurable(input.aggregateID) + let durableQueued = false + let durableThrough = -1 + const offer = (event: Payload, position: "after" | "before" = "after") => { + const offered = Queue.offerUnsafe(signals, { _tag: "transient", event, position }) + if (!offered) Queue.endUnsafe(signals) + return offered + } + const unsubscribe = yield* listen((event) => + Effect.sync(() => { + if (input.live(event)) offer(event) + }), + ) + yield* Effect.addFinalizer(() => unsubscribe.pipe(Effect.andThen(Queue.shutdown(signals)))) + yield* Stream.runForEach(Stream.fromSubscription(wakes), (sequence) => + Effect.sync(() => { + durableThrough = Math.max(durableThrough, sequence) + if (durableQueued) return + durableQueued = Queue.offerUnsafe(signals, { _tag: "durable" }) + }), + ).pipe(Effect.forkScoped) + + const cutoff = yield* latestSequence(db, input.aggregateID) + const replay = yield* readAfter(input.aggregateID, input.after ?? -1, cutoff) + const drain = aggregateDrain(input.aggregateID, cutoff) + const updates = Stream.fromQueue(signals).pipe( + Stream.mapEffect((signal) => { + if (signal._tag === "durable") { + durableQueued = false + return drain(durableThrough) + } + if (signal.position === "before") return Effect.succeed([signal.event]) + return drain().pipe(Effect.map((events) => [...events, signal.event])) + }), + Stream.flattenIterable, + ) + return { replay, updates, offer } + }) + const listen = (listener: Subscriber): Effect.Effect => Effect.sync(() => { - listeners.push(listener) - return Effect.sync(() => { - const index = listeners.indexOf(listener) - if (index >= 0) listeners.splice(index, 1) - }) + listeners.add(listener) + return Effect.sync(() => listeners.delete(listener)).pipe(Effect.asVoid) }) const project = (definition: D, projector: Subscriber): Effect.Effect => @@ -558,6 +626,7 @@ export const layerWith = (options?: LayerOptions) => subscribe, all: streamAll, durable, + observeAggregate, listen, project, replay, diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 97745b41bf81..90b50869e8eb 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -128,7 +128,7 @@ export interface Interface { readonly events: (input: { sessionID: SessionSchema.ID after?: number - }) => Stream.Stream + }) => Stream.Stream readonly switchAgent: (input: { sessionID: SessionSchema.ID; agent: string }) => Effect.Effect readonly switchModel: (input: { sessionID: SessionSchema.ID @@ -185,7 +185,11 @@ export const layer = Layer.unwrap( const store = yield* SessionStore.Service const locations = yield* LocationServiceMap const decodeMessage = Schema.decodeUnknownEffect(SessionMessage.Message) - const isDurableSessionEvent = Schema.is(SessionEvent.Durable) + const sessionEventTypes = new Set(SessionEvent.Definitions.map((definition) => definition.type)) + const isSessionEvent = (event: EventV2.Payload): event is SessionEvent.Event => + sessionEventTypes.has(event.type) + const isDurableSessionEvent = (event: EventV2.Payload): event is SessionEvent.DurableEvent => + event.durable !== undefined && isSessionEvent(event) const decode = (row: typeof SessionMessageTable.$inferSelect) => decodeMessage({ ...row.data, id: row.id, type: row.type }).pipe( Effect.mapError( @@ -341,10 +345,34 @@ export const layer = Layer.unwrap( }), events: (input) => Stream.unwrap( - result - .get(input.sessionID) - .pipe(Effect.as(events.durable({ aggregateID: input.sessionID, after: input.after }))), - ).pipe(Stream.filter((event): event is SessionEvent.DurableEvent => isDurableSessionEvent(event))), + Effect.gen(function* () { + yield* result.get(input.sessionID) + const activity = yield* execution.activity(input.sessionID) + const observed = yield* events.observeAggregate({ + aggregateID: input.sessionID, + after: input.after, + live: (event) => + event.durable === undefined && isSessionEvent(event) && event.data.sessionID === input.sessionID, + }) + const initialActivity = SessionEvent.makeActivity( + input.sessionID, + yield* activity.attach((active) => + observed.offer(SessionEvent.makeActivity(input.sessionID, active), active ? "before" : "after"), + ), + ) + return Stream.fromIterable(observed.replay.filter(isDurableSessionEvent)).pipe( + Stream.concat(Stream.make(initialActivity)), + Stream.concat( + observed.updates.pipe( + Stream.filter( + (event): event is SessionEvent.StreamEvent => + event.type === SessionEvent.Activity.type || isSessionEvent(event), + ), + ), + ), + ) + }), + ), prompt: Effect.fn("V2Session.prompt")((input) => Effect.uninterruptible( Effect.gen(function* () { diff --git a/packages/core/src/session/execution.ts b/packages/core/src/session/execution.ts index f92929e6ee9d..1873215922e5 100644 --- a/packages/core/src/session/execution.ts +++ b/packages/core/src/session/execution.ts @@ -1,12 +1,15 @@ export * as SessionExecution from "./execution" -import { Context, Effect, Layer } from "effect" +import { Context, Effect, Layer, Scope } from "effect" import { SessionRunner } from "./runner/index" +import { SessionRunCoordinator } from "./run-coordinator" import { SessionSchema } from "./schema" export interface Interface { /** Snapshots active execution owned by this process. */ readonly active: Effect.Effect> + /** Observes foreground ownership with an authoritative initial snapshot. */ + readonly activity: (sessionID: SessionSchema.ID) => Effect.Effect /** Starts execution while idle or joins the active execution. */ readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect /** Registers newly recorded work. Repeated wakeups may coalesce. */ @@ -23,6 +26,7 @@ export const noopLayer = Layer.succeed( Service, Service.of({ active: Effect.succeed(new Set()), + activity: () => Effect.succeed({ attach: () => Effect.succeed(false) }), resume: () => Effect.void, wake: () => Effect.void, interrupt: () => Effect.void, diff --git a/packages/core/src/session/execution/local.ts b/packages/core/src/session/execution/local.ts index bd1be745393e..0ee2b50a54e4 100644 --- a/packages/core/src/session/execution/local.ts +++ b/packages/core/src/session/execution/local.ts @@ -29,6 +29,7 @@ export const layer = Layer.effect( return SessionExecution.Service.of({ active: coordinator.active, + activity: coordinator.activity, interrupt: coordinator.interrupt, resume: coordinator.run, wake: coordinator.wake, diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index 2f89aff9e3d2..13b65ba53db3 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -2,10 +2,17 @@ export * as SessionRunCoordinator from "./run-coordinator" import { Deferred, Effect, Exit, Fiber, FiberSet, Scope } from "effect" +export interface Activity { + /** Installs a synchronous observer and snapshots current ownership atomically. */ + readonly attach: (observer: (active: boolean) => void) => Effect.Effect +} + /** Serializes execution for each key while allowing different keys to run concurrently. */ export interface Coordinator { /** Snapshots keys with an execution owned by this coordinator. */ readonly active: Effect.Effect> + /** Registers transition observation before taking its authoritative snapshot. */ + readonly activity: (key: Key) => Effect.Effect /** Starts execution while idle or joins the active execution. */ readonly run: (key: Key) => Effect.Effect /** Registers one coalesced follow-up after newly recorded work. */ @@ -26,6 +33,7 @@ export const make = (options: { }): Effect.Effect, never, Scope.Scope> => Effect.gen(function* () { const active = new Map>() + const activityObservers = new Map void>>() const fork = yield* FiberSet.makeRuntime() const makeEntry = (): Entry => ({ @@ -34,6 +42,10 @@ export const make = (options: { stopping: false, }) + const notifyActivity = (key: Key, value: boolean) => { + for (const observer of activityObservers.get(key) ?? []) observer(value) + } + const start = (key: Key, entry: Entry, force: boolean, successor = false) => { const ready = Deferred.makeUnsafe() const owner = fork( @@ -56,8 +68,10 @@ export const make = (options: { } const successor = entry.pendingWake ? makeEntry() : undefined - if (successor === undefined) active.delete(key) - else { + if (successor === undefined) { + active.delete(key) + notifyActivity(key, false) + } else { active.set(key, successor) start(key, successor, false, true) } @@ -74,6 +88,7 @@ export const make = (options: { const next = makeEntry() active.set(key, next) + notifyActivity(key, true) start(key, next, true) return restore(Deferred.await(next.done)) }) @@ -88,6 +103,7 @@ export const make = (options: { const next = makeEntry() active.set(key, next) + notifyActivity(key, true) start(key, next, false) }) @@ -100,5 +116,33 @@ export const make = (options: { return Fiber.interrupt(entry.owner) }) - return { active: Effect.sync(() => new Set(active.keys())), run, wake, interrupt } + const activity = (key: Key) => + Effect.gen(function* () { + let attached: ((active: boolean) => void) | undefined + const observer = (value: boolean) => { + attached?.(value) + } + yield* Effect.acquireRelease( + Effect.sync(() => { + const observers = activityObservers.get(key) ?? new Set() + observers.add(observer) + activityObservers.set(key, observers) + }), + () => + Effect.sync(() => { + const observers = activityObservers.get(key) + observers?.delete(observer) + if (observers?.size === 0) activityObservers.delete(key) + }), + ) + return { + attach: (next: (active: boolean) => void) => + Effect.sync(() => { + attached = next + return active.has(key) + }), + } + }) + + return { active: Effect.sync(() => new Set(active.keys())), activity, run, wake, interrupt } }) diff --git a/packages/core/test/event.test.ts b/packages/core/test/event.test.ts index e2b2a5df046c..32734919238d 100644 --- a/packages/core/test/event.test.ts +++ b/packages/core/test/event.test.ts @@ -418,6 +418,127 @@ describe("EventV2", () => { }), ) + it.effect("observes a fixed replay cutoff and delivers commits during replay as updates", () => + Effect.gen(function* () { + const readStarted = yield* Deferred.make() + const continueRead = yield* Deferred.make() + let pause = true + const eventLayer = EventV2.layerWith({ + beforeAggregateRead: () => + pause + ? Deferred.succeed(readStarted, undefined).pipe(Effect.andThen(Deferred.await(continueRead))) + : Effect.void, + }).pipe(Layer.provide(Database.defaultLayer)) + + yield* Effect.gen(function* () { + const events = yield* EventV2.Service + const aggregateID = Session.ID.create() + const observer = yield* events.observeAggregate({ aggregateID, live: () => false }).pipe(Effect.forkScoped) + yield* Deferred.await(readStarted) + + pause = false + yield* events.publish(DurableMessage, durableData(aggregateID, "after cutoff")) + yield* Deferred.succeed(continueRead, undefined) + const observed = yield* Fiber.join(observer) + const update = yield* observed.updates.pipe(Stream.take(1), Stream.runCollect) + + expect(observed.replay).toEqual([]) + expect(Array.from(update).map((event) => [event.durable?.seq, event.data])).toEqual([ + [0, durableData(aggregateID, "after cutoff")], + ]) + }).pipe(Effect.provide(Layer.mergeAll(Database.defaultLayer, eventLayer))) + }), + ) + + it.effect("observes replay commits that are not republished", () => + Effect.gen(function* () { + const events = yield* EventV2.Service + const aggregateID = Session.ID.create() + const observed = yield* events.observeAggregate({ aggregateID, live: () => false }) + const update = yield* observed.updates.pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) + + yield* events.replay( + { + id: EventV2.ID.create(), + aggregateID, + seq: 0, + type: EventV2.versionedType(DurableMessage.type, 1), + data: durableData(aggregateID, "replayed"), + }, + { publish: false }, + ) + + expect(Array.from(yield* Fiber.join(update)).map((event) => event.data)).toEqual([ + durableData(aggregateID, "replayed"), + ]) + }), + ) + + it.effect("drains causally preceding durable rows before a live payload", () => + Effect.gen(function* () { + const events = yield* EventV2.Service + const aggregateID = Session.ID.create() + const observed = yield* events.observeAggregate({ + aggregateID, + live: (event) => event.type === Message.type, + }) + const updates = yield* observed.updates.pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped) + + yield* events.publish(DurableMessage, durableData(aggregateID, "start")) + const live = yield* events.publish(Message, { text: "delta" }) + + expect(Array.from(yield* Fiber.join(updates))).toEqual([ + expect.objectContaining({ durable: expect.objectContaining({ seq: 0 }) }), + live, + ]) + }), + ) + + it.effect("coalesces saturated observer signals without losing durable rows", () => + Effect.gen(function* () { + const events = yield* EventV2.Service + const aggregateID = Session.ID.create() + const count = 300 + const observed = yield* events.observeAggregate({ aggregateID, live: () => false }) + + for (let index = 0; index < count; index++) { + yield* events.publish(DurableMessage, durableData(aggregateID, String(index))) + } + const updates = yield* observed.updates.pipe(Stream.take(count), Stream.runCollect) + + expect(Array.from(updates, (event) => event.durable?.seq)).toEqual( + Array.from({ length: count }, (_, index) => index), + ) + }), + ) + + it.effect("coalesces durable notifications without repeated empty reads", () => + Effect.gen(function* () { + let reads = 0 + const eventLayer = EventV2.layerWith({ + beforeAggregateRead: () => + Effect.sync(() => { + reads++ + }), + }).pipe(Layer.provide(Database.defaultLayer)) + + yield* Effect.gen(function* () { + const events = yield* EventV2.Service + const aggregateID = Session.ID.create() + const count = 20 + const observed = yield* events.observeAggregate({ aggregateID, live: () => false }) + + for (let index = 0; index < count; index++) { + yield* events.publish(DurableMessage, durableData(aggregateID, String(index))) + } + const updates = yield* observed.updates.pipe(Stream.take(count), Stream.runCollect) + + expect(updates).toHaveLength(count) + expect(reads).toBe(2) + }).pipe(Effect.provide(Layer.mergeAll(Database.defaultLayer, eventLayer))) + }), + ) + it.effect("coalesces durable aggregate wakes while draining every committed event", () => Effect.gen(function* () { const events = yield* EventV2.Service diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index 9ff0e2249034..3e921b8a1595 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -1,5 +1,5 @@ import { describe, expect } from "bun:test" -import { DateTime, Effect, Fiber, Layer, Stream } from "effect" +import { DateTime, Deferred, Effect, Fiber, Layer, Stream } from "effect" import { eq } from "drizzle-orm" import { Database } from "@opencode-ai/core/database/database" import { EventV2 } from "@opencode-ai/core/event" @@ -23,10 +23,29 @@ const executionCalls: SessionV2.ID[] = [] const interruptCalls: SessionV2.ID[] = [] const wakeCalls: SessionV2.ID[] = [] const activeSessions = new Set() +let activityObserver: ((active: boolean) => void) | undefined +let activityState = false +const setActivity = (active: boolean) => + Effect.sync(() => { + activityState = active + activityObserver?.(active) + }) const execution = Layer.succeed( SessionExecution.Service, SessionExecution.Service.of({ active: Effect.sync(() => new Set(activeSessions)), + activity: () => + Effect.sync(() => { + activityState = false + activityObserver = undefined + return { + attach: (observer: (active: boolean) => void) => + Effect.sync(() => { + activityObserver = observer + return activityState + }), + } + }), resume: (sessionID) => Effect.sync(() => { executionCalls.push(sessionID) @@ -179,7 +198,7 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service - const fiber = yield* session.events({ sessionID }).pipe(Stream.take(4), Stream.runCollect, Effect.forkScoped) + const fiber = yield* session.events({ sessionID }).pipe(Stream.take(5), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "First" }), resume: false }) @@ -188,6 +207,7 @@ describe("SessionV2.prompt", () => { const streamed = Array.from(yield* Fiber.join(fiber)) expect(streamed.map((event) => [event.durable?.seq, event.type])).toEqual([ + [undefined, "session.activity"], [0, "session.next.prompt.admitted"], [1, "session.next.prompt.admitted"], [2, "session.next.prompted"], @@ -196,10 +216,122 @@ describe("SessionV2.prompt", () => { expect( Array.from( yield* session - .events({ sessionID, after: streamed[0]!.durable?.seq }) - .pipe(Stream.take(1), Stream.runCollect), + .events({ sessionID, after: streamed[2]?.durable?.seq }) + .pipe(Stream.take(3), Stream.runCollect), ).map((event) => [event.durable?.seq, event.type]), - ).toEqual([[1, "session.next.prompt.admitted"]]) + ).toEqual([ + [2, "session.next.prompted"], + [3, "session.next.prompted"], + [undefined, "session.activity"], + ]) + }), + ) + + it.effect("replays history, emits activity, then fences live deltas behind durable starts", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + const events = yield* EventV2.Service + const assistantMessageID = SessionMessage.ID.create() + yield* events.publish(SessionEvent.Text.Started, { + sessionID, + assistantMessageID, + textID: "text-1", + timestamp: yield* DateTime.now, + }) + const streamed = yield* session.events({ sessionID }).pipe(Stream.take(3), Stream.runCollect, Effect.forkScoped) + yield* Effect.yieldNow + yield* events.publish(SessionEvent.Text.Delta, { + sessionID, + assistantMessageID, + textID: "text-1", + delta: "hello", + timestamp: yield* DateTime.now, + }) + + expect(Array.from(yield* Fiber.join(streamed)).map((event) => event.type)).toEqual([ + "session.next.text.started", + "session.activity", + "session.next.text.delta", + ]) + }), + ) + + it.effect("orders activity around the durable rows owned by a run", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + const events = yield* EventV2.Service + const assistantMessageID = SessionMessage.ID.create() + const streamed = yield* session.events({ sessionID }).pipe(Stream.take(5), Stream.runCollect, Effect.forkScoped) + yield* Effect.yieldNow + + yield* setActivity(true) + yield* events.publish(SessionEvent.Text.Started, { + sessionID, + assistantMessageID, + textID: "text-activity", + timestamp: yield* DateTime.now, + }) + yield* events.publish(SessionEvent.Text.Ended, { + sessionID, + assistantMessageID, + textID: "text-activity", + text: "done", + timestamp: yield* DateTime.now, + }) + yield* setActivity(false) + + expect(Array.from(yield* Fiber.join(streamed)).map((event) => [event.type, event.data])).toEqual([ + ["session.activity", { sessionID, active: false }], + ["session.activity", { sessionID, active: true }], + ["session.next.text.started", expect.objectContaining({ sessionID })], + ["session.next.text.ended", expect.objectContaining({ sessionID })], + ["session.activity", { sessionID, active: false }], + ]) + }), + ) + + it.effect("terminates the Session stream when live event buffering saturates", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + const events = yield* EventV2.Service + const initial = yield* Deferred.make() + const release = yield* Deferred.make() + let seenInitial = false + const streamed = yield* session.events({ sessionID }).pipe( + Stream.mapEffect((event) => { + if (!seenInitial) { + seenInitial = true + return Deferred.succeed(initial, undefined).pipe(Effect.as(event)) + } + return Deferred.await(release).pipe(Effect.as(event)) + }), + Stream.runCollect, + Effect.forkScoped, + ) + yield* Deferred.await(initial) + + const timestamp = yield* DateTime.now + const assistantMessageID = SessionMessage.ID.create() + yield* Effect.forEach( + Array.from({ length: 1_000 }, (_, index) => index), + (index) => + events.publish(SessionEvent.Text.Delta, { + sessionID, + assistantMessageID, + textID: "text-saturation", + delta: String(index), + timestamp, + }), + { concurrency: "unbounded", discard: true }, + ) + yield* Deferred.succeed(release, undefined) + + const result = Array.from(yield* Fiber.join(streamed)) + expect(result[0]?.type).toBe("session.activity") + expect(result.length).toBeLessThan(1_001) }), ) diff --git a/packages/core/test/session-run-coordinator.test.ts b/packages/core/test/session-run-coordinator.test.ts index dfbeda664c54..67b0bba2bfa1 100644 --- a/packages/core/test/session-run-coordinator.test.ts +++ b/packages/core/test/session-run-coordinator.test.ts @@ -99,6 +99,27 @@ describe("SessionRunCoordinator", () => { ), ) + it.effect("reflects activity races in the snapshot or subsequent transitions", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const gate = yield* Deferred.make() + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => Deferred.succeed(started, undefined).pipe(Effect.andThen(Deferred.await(gate))), + }) + const activity = yield* coordinator.activity("session") + + const run = yield* coordinator.run("session").pipe(Effect.forkChild) + yield* Deferred.await(started) + const transitions = new Array() + expect(yield* activity.attach((active) => transitions.push(active))).toBeTrue() + yield* Deferred.succeed(gate, undefined) + yield* Fiber.join(run) + expect(transitions).toEqual([false]) + }), + ), + ) + it.effect("cleans active executions after failure and defect", () => Effect.scoped( Effect.gen(function* () { @@ -137,7 +158,6 @@ describe("SessionRunCoordinator", () => { expect(Array.from(yield* coordinator.active)).toEqual([]) }), ) - it.effect("coalesces wakes received during active execution", () => Effect.scoped( Effect.gen(function* () { diff --git a/packages/core/test/session-runner-recorded.test.ts b/packages/core/test/session-runner-recorded.test.ts index da8699fbd2ff..2ea3a1c55c0a 100644 --- a/packages/core/test/session-runner-recorded.test.ts +++ b/packages/core/test/session-runner-recorded.test.ts @@ -96,6 +96,7 @@ const execution = Layer.effect( }) return SessionExecution.Service.of({ active: coordinator.active, + activity: coordinator.activity, resume: coordinator.run, wake: coordinator.wake, interrupt: coordinator.interrupt, diff --git a/packages/core/test/session-runner-tool-events.test.ts b/packages/core/test/session-runner-tool-events.test.ts index f96ea4dea2b0..f1b252caed4d 100644 --- a/packages/core/test/session-runner-tool-events.test.ts +++ b/packages/core/test/session-runner-tool-events.test.ts @@ -28,6 +28,7 @@ const capture = () => { }), subscribe: () => Stream.empty, all: () => Stream.empty, + observeAggregate: () => Effect.die("not implemented"), durable: () => Stream.empty, listen: () => Effect.succeed(Effect.void), project: () => Effect.void, diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index c9594e8244f1..8b77d10d0f65 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -255,6 +255,7 @@ const execution = Layer.effect( }) return SessionExecution.Service.of({ active: coordinator.active, + activity: coordinator.activity, resume: coordinator.run, wake: coordinator.wake, interrupt: coordinator.interrupt, diff --git a/packages/protocol/src/groups/session.ts b/packages/protocol/src/groups/session.ts index f1f908e6ac88..f623d1bdb80d 100644 --- a/packages/protocol/src/groups/session.ts +++ b/packages/protocol/src/groups/session.ts @@ -295,7 +295,7 @@ export const makeSessionGroup = (sessionLo query: { after: Schema.NumberFromString.pipe(Schema.decodeTo(NonNegativeInt), Schema.optional), }, - success: HttpApiSchema.StreamSse({ data: SessionEvent.Durable }), + success: HttpApiSchema.StreamSse({ data: SessionEvent.Stream }), error: SessionNotFoundError, }) .middleware(sessionLocationMiddleware) @@ -303,7 +303,8 @@ export const makeSessionGroup = (sessionLo OpenApi.annotations({ identifier: "v2.session.events", summary: "Subscribe to session events", - description: "Replay durable events after an aggregate sequence, then continue with new durable events.", + description: + "Replay durable events after an aggregate sequence, emit current process-local activity, then continue with durable and live-only Session events.", }), ), ) diff --git a/packages/schema/src/session-event.ts b/packages/schema/src/session-event.ts index f58c3dcce025..cb5f359c7a92 100644 --- a/packages/schema/src/session-event.ts +++ b/packages/schema/src/session-event.ts @@ -51,6 +51,21 @@ const stepSettlementOptions = { export const UnknownError = SessionMessage.UnknownError export type UnknownError = SessionMessage.UnknownError +export const Activity = Event.define({ + type: "session.activity", + schema: { + sessionID: SessionID, + active: Schema.Boolean, + }, +}) +export type Activity = typeof Activity.Type + +export const makeActivity = (sessionID: SessionID, active: boolean): Activity => ({ + id: Event.ID.create(), + type: Activity.type, + data: { sessionID, active }, +}) + export const AgentSwitched = Event.define({ type: "session.next.agent.switched", ...options, @@ -517,3 +532,6 @@ export type DurableEvent = typeof Durable.Type export const All = Schema.Union(Definitions, { mode: "oneOf" }).pipe(Schema.toTaggedUnion("type")) export type Event = typeof All.Type export type Type = Event["type"] + +export const Stream = Schema.Union([Activity, All], { mode: "oneOf" }).pipe(Schema.toTaggedUnion("type")) +export type StreamEvent = typeof Stream.Type diff --git a/packages/sdk/js/src/v2/gen/sdk.gen.ts b/packages/sdk/js/src/v2/gen/sdk.gen.ts index 9beb8099226f..8b974d99f2c6 100644 --- a/packages/sdk/js/src/v2/gen/sdk.gen.ts +++ b/packages/sdk/js/src/v2/gen/sdk.gen.ts @@ -5713,7 +5713,7 @@ export class Session3 extends HeyApiClient { /** * Subscribe to session events * - * Replay durable events after an aggregate sequence, then continue with new durable events. + * Replay durable events after an aggregate sequence, emit current process-local activity, then continue with durable and live-only Session events. */ public events( parameters: { diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index ae610a3f129c..2f61039abdba 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -4036,6 +4036,24 @@ export type SessionMessage = | SessionMessageAssistant | SessionMessageCompaction +export type SessionActivity = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.activity" + durable?: { + aggregateID: string + seq: number | "NaN" | "Infinity" | "-Infinity" + version: number | "NaN" | "Infinity" | "-Infinity" + } + location?: LocationRef + data: { + sessionID: string + active: boolean + } +} + export type SessionNextAgentSwitched = { id: string metadata?: { @@ -4313,6 +4331,27 @@ export type SessionNextTextStarted = { } } +export type SessionNextTextDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.text.delta" + durable?: { + aggregateID: string + seq: number | "NaN" | "Infinity" | "-Infinity" + version: number | "NaN" | "Infinity" | "-Infinity" + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + textID: string + delta: string + } +} + export type SessionNextTextEnded = { id: string metadata?: { @@ -4334,6 +4373,70 @@ export type SessionNextTextEnded = { } } +export type SessionNextReasoningStarted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.reasoning.started" + durable?: { + aggregateID: string + seq: number | "NaN" | "Infinity" | "-Infinity" + version: number | "NaN" | "Infinity" | "-Infinity" + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + reasoningID: string + providerMetadata?: LlmProviderMetadata + } +} + +export type SessionNextReasoningDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.reasoning.delta" + durable?: { + aggregateID: string + seq: number | "NaN" | "Infinity" | "-Infinity" + version: number | "NaN" | "Infinity" | "-Infinity" + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + reasoningID: string + delta: string + } +} + +export type SessionNextReasoningEnded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.reasoning.ended" + durable?: { + aggregateID: string + seq: number | "NaN" | "Infinity" | "-Infinity" + version: number | "NaN" | "Infinity" | "-Infinity" + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + reasoningID: string + text: string + providerMetadata?: LlmProviderMetadata + } +} + export type SessionNextToolInputStarted = { id: string metadata?: { @@ -4355,6 +4458,27 @@ export type SessionNextToolInputStarted = { } } +export type SessionNextToolInputDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.input.delta" + durable?: { + aggregateID: string + seq: number | "NaN" | "Infinity" | "-Infinity" + version: number | "NaN" | "Infinity" | "-Infinity" + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + delta: string + } +} + export type SessionNextToolInputEnded = { id: string metadata?: { @@ -4484,33 +4608,12 @@ export type SessionNextToolFailed = { } } -export type SessionNextReasoningStarted = { - id: string - metadata?: { - [key: string]: unknown - } - type: "session.next.reasoning.started" - durable?: { - aggregateID: string - seq: number | "NaN" | "Infinity" | "-Infinity" - version: number | "NaN" | "Infinity" | "-Infinity" - } - location?: LocationRef - data: { - timestamp: number - sessionID: string - assistantMessageID: string - reasoningID: string - providerMetadata?: LlmProviderMetadata - } -} - -export type SessionNextReasoningEnded = { +export type SessionNextRetried = { id: string metadata?: { [key: string]: unknown } - type: "session.next.reasoning.ended" + type: "session.next.retried" durable?: { aggregateID: string seq: number | "NaN" | "Infinity" | "-Infinity" @@ -4520,19 +4623,17 @@ export type SessionNextReasoningEnded = { data: { timestamp: number sessionID: string - assistantMessageID: string - reasoningID: string - text: string - providerMetadata?: LlmProviderMetadata + attempt: number + error: SessionNextRetryError } } -export type SessionNextRetried = { +export type SessionNextCompactionStarted = { id: string metadata?: { [key: string]: unknown } - type: "session.next.retried" + type: "session.next.compaction.started" durable?: { aggregateID: string seq: number | "NaN" | "Infinity" | "-Infinity" @@ -4542,17 +4643,17 @@ export type SessionNextRetried = { data: { timestamp: number sessionID: string - attempt: number - error: SessionNextRetryError + messageID: string + reason: "auto" | "manual" } } -export type SessionNextCompactionStarted = { +export type SessionNextCompactionDelta = { id: string metadata?: { [key: string]: unknown } - type: "session.next.compaction.started" + type: "session.next.compaction.delta" durable?: { aggregateID: string seq: number | "NaN" | "Infinity" | "-Infinity" @@ -4563,7 +4664,7 @@ export type SessionNextCompactionStarted = { timestamp: number sessionID: string messageID: string - reason: "auto" | "manual" + text: string } } diff --git a/specs/v2/session.md b/specs/v2/session.md index 54daa6971373..00a16790f406 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -172,11 +172,11 @@ Inbox promotion coalesces pending steers in durable admission order. Once contin Eager local-tool execution is intentionally unbounded in the current local slice. This minimizes tool latency but does not increase SQLite settlement throughput: Session-event publication remains serialized per provider turn. Before broadening exposure, revisit per-turn call limits, output truncation, and operational backpressure using observed workloads. The `session.next.*` event schemas remain experimental and unshipped; databases created by earlier experimental builds are disposable rather than compatibility targets. -The synchronized `session.next.*` event family and projected Session-message model predate this branch. This slice refines their replay contract: projected Session messages retain their source aggregate sequence so canonical context ordering and `sessions.messages(...)` pagination follow durable event order even when caller-supplied IDs or timestamps do not. Consumers can use `sessions.events({ sessionID, after? })` to replay durable `session.next.*` events after an aggregate sequence cursor, then tail durable events without a race. Live-only text, reasoning, and tool-input fragments remain available through EventV2 subscriptions for connected renderers; they are intentionally absent from the replayable Session stream. +The synchronized `session.next.*` event family and projected Session-message model predate this branch. Projected Session messages retain their source aggregate sequence so canonical context ordering and `sessions.messages(...)` pagination follow durable event order even when caller-supplied IDs or timestamps do not. Consumers use `sessions.events({ sessionID, after? })` for one Session-scoped stream containing replayable lifecycle/output events, process-local activity, and connected live-only output fragments. -The first `sessions.events(...)` contract is durable-only during both replay and live tailing. This keeps one cursor equal to one persisted aggregate sequence and is sufficient for reconnect-safe consumers. A later UI-facing API may optionally interleave live-only deltas while connected, but those fragments must remain explicitly ephemeral: they cannot advance the durable cursor, replay after reconnect, or be mistaken for publication boundaries. +The stream registers observation, captures one fixed durable cutoff `B`, emits durable rows in `(after, B]` in aggregate-sequence order, and then emits exactly one authoritative `session.activity` value. Only events carrying `durable` metadata advance the cursor. Activity and text, reasoning, compaction, or tool-input deltas are process-local and never replay after reconnect. Activity means foreground execution ownership in this process; `false` does not describe a terminal outcome, and background work does not activate its parent Session. -Durable event tail wakeups are advisory and edge-triggered. Each active tail owns one sliding-capacity-1 dirty signal for its aggregate and re-queries SQLite after a wake. Repeated commits coalesce while the tail is busy because durable rows, not in-memory notifications, preserve every event and sequence. Subscribe and register the dirty signal before historical replay, then remove it when the tail closes, so replay handoff cannot miss a commit and inactive aggregates retain no wake state. +Durable event tail notifications are advisory. Each active tail uses bounded non-blocking observation and re-queries SQLite after a wake. Repeated commits may coalesce because durable rows, not in-memory notifications, preserve every event and sequence. A live-only fragment is emitted only after draining later durable rows, which fences dependent deltas behind their committed start events. If a live consumer cannot keep up, the stream closes rather than blocking Session execution or SQLite writers. Aggregate deletion still removes durable history and remains a documented replay limitation rather than being redesigned here. Event replay owner claims are separate from clustered Session execution ownership. The former already fences synchronized projection reconstruction; the latter still needs distributed active-run acquisition, stale-runtime rejection, interruption, and placement orchestration.