diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index fd3563a077..bdf703dfdc 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -231,7 +231,10 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { private _requestResponseMap: Map = new Map(); private _initialized: boolean = false; private _enableJsonResponse: boolean = false; - private _standaloneSseStreamId: string = '_GET_stream'; + private get _standaloneSseStreamId(): string { + const hardcode = '_GET_stream'; + return this.sessionId === undefined ? hardcode : `${hardcode}_${this.sessionId}`; + } private _eventStore?: EventStore; private _onsessioninitialized?: ((sessionId: string) => void | Promise) | undefined; private _onsessionclosed?: ((sessionId: string) => void | Promise) | undefined; diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index 7a23dd56bb..549d908914 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -707,6 +707,78 @@ describe('Zod v4', () => { }); }); + describe('HTTPServerTransport - Standalone SSE Stream Isolation', () => { + // 2 transports sharing one EventStore must record distinct `streamId` when + // server-initiated notifications take the standalone-SSE branch in `send()` + // (streamableHttp.ts: `requestId === undefined` -> `storeEvent(this._standaloneSseStreamId, message)`). + + const standaloneNotification: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/tools/list_changed' + }; + + const makeRecordingEventStore = (): { + store: EventStore; + calls: { streamId: StreamId; message: JSONRPCMessage }[]; + } => { + const calls: { streamId: StreamId; message: JSONRPCMessage }[] = []; + const store: EventStore = { + async storeEvent(streamId, message) { + calls.push({ streamId, message }); + return `${streamId}_${calls.length}`; + }, + async replayEventsAfter() { + return ''; + } + }; + return { store, calls }; + }; + + const makeTransport = (sessionIdValue: string | undefined, store: EventStore): WebStandardStreamableHTTPServerTransport => { + const transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: sessionIdValue === undefined ? undefined : () => sessionIdValue, + eventStore: store + }); + // Model the post-initialized state without running the full handshake; + // `send()` does not require `start()`. + if (sessionIdValue !== undefined) { + transport.sessionId = sessionIdValue; + } + return transport; + }; + + const HARDCODED_STREAM_ID = '_GET_stream'; + + it('uses distinct streamIds for 2 sessions sharing 1 EventStore', async () => { + const { store, calls } = makeRecordingEventStore(); + const transportA = makeTransport('sess-A', store); + const transportB = makeTransport('sess-B', store); + try { + await transportA.send(standaloneNotification); + await transportB.send(standaloneNotification); + expect(calls).toHaveLength(2); + const streamIds = new Set(calls.map(c => c.streamId)); + expect(streamIds.size).toBe(2); + expect(streamIds.has(HARDCODED_STREAM_ID)).toBe(false); + } finally { + await transportA.close(); + await transportB.close(); + } + }); + + it('falls back to the literal "_GET_stream" in stateless mode', async () => { + const { store, calls } = makeRecordingEventStore(); + const transport = makeTransport(undefined, store); + try { + await transport.send(standaloneNotification); + expect(calls).toHaveLength(1); + expect(calls[0]!.streamId).toBe(HARDCODED_STREAM_ID); + } finally { + await transport.close(); + } + }); + }); + describe('HTTPServerTransport - Protocol Version Validation', () => { let transport: WebStandardStreamableHTTPServerTransport; let mcpServer: McpServer;