Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
private _requestResponseMap: Map<RequestId, JSONRPCMessage> = new Map();
private _initialized: boolean = false;
private _enableJsonResponse: boolean = false;
private _standaloneSseStreamId: string = '_GET_stream';
private get _standaloneSseStreamId(): string {
const hardcode = '_GET_stream';
return this.sessionId === undefined ? hardcode : `${hardcode}_${this.sessionId}`;
}
private _eventStore?: EventStore;
private _onsessioninitialized?: ((sessionId: string) => void | Promise<void>) | undefined;
private _onsessionclosed?: ((sessionId: string) => void | Promise<void>) | undefined;
Expand Down
72 changes: 72 additions & 0 deletions packages/server/test/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,78 @@ describe('Zod v4', () => {
});
});

describe('HTTPServerTransport - Standalone SSE Stream Isolation', () => {
// 2 transports sharing one EventStore must record distinct `streamId` when
// server-initiated notifications take the standalone-SSE branch in `send()`
// (streamableHttp.ts: `requestId === undefined` -> `storeEvent(this._standaloneSseStreamId, message)`).

const standaloneNotification: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'notifications/tools/list_changed'
};

const makeRecordingEventStore = (): {
store: EventStore;
calls: { streamId: StreamId; message: JSONRPCMessage }[];
} => {
const calls: { streamId: StreamId; message: JSONRPCMessage }[] = [];
const store: EventStore = {
async storeEvent(streamId, message) {
calls.push({ streamId, message });
return `${streamId}_${calls.length}`;
},
async replayEventsAfter() {
return '';
}
};
return { store, calls };
};

const makeTransport = (sessionIdValue: string | undefined, store: EventStore): WebStandardStreamableHTTPServerTransport => {
const transport = new WebStandardStreamableHTTPServerTransport({
sessionIdGenerator: sessionIdValue === undefined ? undefined : () => sessionIdValue,
eventStore: store
});
// Model the post-initialized state without running the full handshake;
// `send()` does not require `start()`.
if (sessionIdValue !== undefined) {
transport.sessionId = sessionIdValue;
}
return transport;
};

const HARDCODED_STREAM_ID = '_GET_stream';

it('uses distinct streamIds for 2 sessions sharing 1 EventStore', async () => {
const { store, calls } = makeRecordingEventStore();
const transportA = makeTransport('sess-A', store);
const transportB = makeTransport('sess-B', store);
try {
await transportA.send(standaloneNotification);
await transportB.send(standaloneNotification);
expect(calls).toHaveLength(2);
const streamIds = new Set(calls.map(c => c.streamId));
expect(streamIds.size).toBe(2);
expect(streamIds.has(HARDCODED_STREAM_ID)).toBe(false);
} finally {
await transportA.close();
await transportB.close();
}
});

it('falls back to the literal "_GET_stream" in stateless mode', async () => {
const { store, calls } = makeRecordingEventStore();
const transport = makeTransport(undefined, store);
try {
await transport.send(standaloneNotification);
expect(calls).toHaveLength(1);
expect(calls[0]!.streamId).toBe(HARDCODED_STREAM_ID);
} finally {
await transport.close();
}
});
});

describe('HTTPServerTransport - Protocol Version Validation', () => {
let transport: WebStandardStreamableHTTPServerTransport;
let mcpServer: McpServer;
Expand Down
Loading