Skip to content

Commit 4c95899

Browse files
committed
Trim comments and docstrings in the listen driver changes
Comment-only pass over the branch's additions: keep the non-inferable invariants and motivations (receive-order rationale, sub-resource admission, the wake-snapshot race, peek/commit semantics) at one to three lines each, tighten docstrings to Google style with Raises sections kept, and drop narration the code already states. No code changes.
1 parent 2f20580 commit 4c95899

15 files changed

Lines changed: 147 additions & 403 deletions

src/mcp/client/client.py

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -675,28 +675,19 @@ def listen(
675675
) -> AbstractAsyncContextManager[Subscription]:
676676
"""Open a `subscriptions/listen` stream of typed change events (2026-07-28 only).
677677
678-
The keyword arguments mirror the wire `SubscriptionFilter` field for
679-
field; `resource_subscriptions` names exact resource URIs to watch.
680-
Entering waits for the server's acknowledgment - the subset it agreed
681-
to deliver is `sub.honored` - then the handle yields events:
678+
Keyword args mirror the wire `SubscriptionFilter`; entering waits for the ack (honored subset: `sub.honored`):
682679
683680
async with client.listen(tools_list_changed=True) as sub:
684681
async for event in sub:
685682
tools = await client.list_tools() # refetch on change
686683
687-
A graceful server close ends the loop; an abrupt drop raises
688-
`SubscriptionLost`. Either way the stream is gone and there is no
689-
replay: a client that keeps watching re-listens and refetches.
690-
Exiting the context ends the subscription. Multiple subscriptions may
691-
be open concurrently.
684+
A graceful close ends the loop; an abrupt drop raises `SubscriptionLost`. No replay: re-listen and refetch.
692685
693686
Raises:
694-
ListenNotSupportedError: The negotiated protocol version predates
695-
2026-07-28 (use `subscribe_resource` and `message_handler` there).
696-
MCPError: The server rejected the request, or the connection
697-
failed before the acknowledgment arrived.
687+
ListenNotSupportedError: The negotiated protocol version predates 2026-07-28.
688+
MCPError: The server rejected the request or the connection failed first.
698689
SubscriptionLost: The stream ended before it was acknowledged.
699-
TimeoutError: The session's read timeout elapsed before the acknowledgment.
690+
TimeoutError: The read timeout elapsed before the acknowledgment.
700691
"""
701692
return _listen(
702693
self.session,
@@ -710,14 +701,9 @@ def listen(
710701
async def _evict_for_listen_event(self, event: ServerEvent) -> None:
711702
"""Finish response-cache eviction before a listen consumer can refetch.
712703
713-
The eviction wrapper on message_handler runs on a spawned path; the
714-
consumer's iterator would otherwise wake first, refetch through a
715-
still-warm entry, and - events being deduplicated level triggers -
716-
never get a second wake to correct it. The event's notification still
717-
tees to that wrapper, so the same eviction fires twice; that is
718-
deliberate (eviction is idempotent, and the tee-path one covers
719-
non-iterating consumers sharing this cache). Same containment boundary
720-
as `_evicting_message_handler`: a cache fault must not block delivery.
704+
Without it the iterator wakes first and refetches a still-warm entry, with no
705+
corrective wake (events are deduplicated level triggers). The tee path repeats
706+
the eviction; deliberate: idempotent, and it covers non-iterating consumers.
721707
"""
722708
cache = self._response_cache
723709
assert cache is not None # installed as the event barrier only when a cache exists

src/mcp/client/session.py

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,7 @@ def __init__(
363363
self._negotiated_version: str | None = None
364364
self._stamp: Callable[[dict[str, Any], CallOptions], None] = _preconnect_stamp
365365
self._task_group: anyio.abc.TaskGroup | None = None
366-
# subscriptions/listen demux routes, keyed by the listen request's id
367-
# (verbatim-typed: a plain dict already keeps 1 and "1" distinct).
368-
# Fed by `_intercept_notification` on the dispatcher's receive path;
369-
# membership alone decides ack consumption, so a closed subscription
370-
# releases its id completely - raw escape-hatch listens (never
371-
# registered here) keep receiving their acks via message_handler.
366+
# subscriptions/listen demux routes; membership decides ack consumption (raw listens are never registered)
372367
self._listen_routes: dict[RequestId, ListenRoute] = {}
373368
if dispatcher is not None:
374369
if read_stream is not None or write_stream is not None:
@@ -1257,64 +1252,44 @@ def _unregister_listen_route(self, request_id: RequestId) -> None:
12571252
self._listen_routes.pop(request_id, None)
12581253

12591254
def _settle_listen_routes_closed(self) -> None:
1260-
"""End every open subscription as lost: the session is gone.
1261-
1262-
Without this, a consumer iterating in a sibling task would park forever -
1263-
the driver task dies by cancellation and can never settle its route.
1264-
"""
1255+
"""Settle all open listen routes as lost on session exit; cancelled driver tasks cannot."""
12651256
closed = MCPError(code=CONNECTION_CLOSED, message="Connection closed")
12661257
for route in self._listen_routes.values():
12671258
route.settle("lost", error=closed)
12681259
self._listen_routes.clear()
12691260

12701261
def _intercept_notification(self, method: str, params: Mapping[str, Any] | None) -> bool:
1271-
"""Wire-order listen demux, run by the dispatcher on its receive path.
1272-
1273-
Route bookkeeping must advance in receive order relative to the listen
1274-
request's own result: the result resolves synchronously on this same
1275-
path, so an ack or event handled on the spawned `_on_notify` path could
1276-
lose the race and be dropped after the stream settled - a graceful
1277-
close swallowing the events that preceded it. Only the synchronous
1278-
bookkeeping happens here; the user-facing tee (message_handler,
1279-
logging) stays on the spawned path.
1280-
1281-
Returns True to consume the frame: an ack for a live driver route is
1282-
driver state, never surfaced. Raw escape-hatch listens have no route
1283-
registered and keep observing their acks via message_handler - as does
1284-
a stray ack for an already-closed driver id, whose registration is gone.
1285-
1286-
The `as_request_id` guard is not a tripwire: this reads raw wire
1287-
dicts, where a non-id (even unhashable) `_meta` value is
1288-
constructible and would fail the dict lookup.
1262+
"""Wire-order listen demux, run synchronously on the dispatcher's receive path.
1263+
1264+
Bookkeeping must advance in receive order with the listen result (resolved on
1265+
this same path); the spawned `_on_notify` path would race it and drop events.
1266+
Returns True to consume the frame: a live route's ack is driver state, never surfaced.
12891267
"""
12901268
if not self._listen_routes:
12911269
return False
12921270
if method == "notifications/cancelled":
12931271
request_id = cancelled_request_id_from_params(params)
12941272
if request_id is not None and (listen_route := self._listen_routes.get(request_id)) is not None:
1295-
# A server-sent cancel naming one of our listen requests is
1296-
# that stream's teardown signal (the stream-transport spelling).
1273+
# a server-sent cancel naming a listen request is that stream's teardown signal
12971274
listen_route.settle("lost")
12981275
return False # _on_notify swallows every cancelled either way (v1 parity)
12991276
if params is None:
13001277
return False
13011278
meta = params.get("_meta")
13021279
if not isinstance(meta, Mapping):
13031280
return False
1281+
# as_request_id is not a tripwire: raw wire _meta can carry a non-id (even unhashable) value
13041282
subscription_id = as_request_id(cast("Mapping[str, Any]", meta).get(SUBSCRIPTION_ID_META_KEY))
13051283
if subscription_id is None or (listen_route := self._listen_routes.get(subscription_id)) is None:
13061284
return False
13071285
if method == "notifications/subscriptions/acknowledged":
13081286
raw_filter = params.get("notifications")
13091287
if raw_filter is None:
1310-
# The wire shape requires `notifications`; a frame without it is
1311-
# malformed, not an empty filter - leave it to the spawned
1312-
# path's validation warning rather than fabricating honored={}.
1288+
# malformed, not an empty filter: leave it to the spawned path's validation warning
13131289
return False
13141290
try:
13151291
honored = types.SubscriptionFilter.model_validate(raw_filter)
13161292
except ValidationError:
1317-
# Malformed ack: leave it to the spawned path's validation warning.
13181293
return False
13191294
listen_route.set_acked(honored)
13201295
return True
@@ -1356,9 +1331,7 @@ async def _on_notify(
13561331
logger.warning("Failed to validate notification: %s", method, exc_info=True)
13571332
return
13581333
if isinstance(notification, types.CancelledNotification):
1359-
# Never surfaced to message_handler (v1 parity): the dispatcher
1360-
# already applied any in-flight cancellation, and a cancel naming
1361-
# one of our listen streams was settled by the wire-order intercept.
1334+
# Never surfaced (v1 parity): the dispatcher already applied it; listen cancels settled by the intercept.
13621335
return
13631336
try:
13641337
if isinstance(notification, types.LoggingMessageNotification):

src/mcp/client/streamable_http.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -443,10 +443,8 @@ async def _handle_sse_response(
443443
logger.info("SSE stream disconnected, reconnecting...")
444444
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
445445
else:
446-
# Not resumable: the response can never arrive. Resolve the waiter
447-
# instead of leaving the request pending for the session's lifetime
448-
# (a listen stream's consumer would otherwise hang instead of
449-
# learning the subscription is lost).
446+
# Not resumable: resolve the waiter, else a listen stream's consumer
447+
# would hang forever instead of learning the subscription is lost.
450448
await self._resolve_abandoned_request(
451449
ctx.read_stream_writer, original_request_id, "SSE stream ended without a response"
452450
)
@@ -456,9 +454,7 @@ async def _resolve_abandoned_request(
456454
) -> None:
457455
"""Resolve a request whose response can never arrive with a synthesized error.
458456
459-
Best-effort, like the dispatcher's own resolution writes: the read
460-
stream closing first means the session is being torn down and nobody
461-
is waiting on this request anymore.
457+
Best-effort: a closed read stream means the session is tearing down.
462458
"""
463459
error_data = ErrorData(code=CONNECTION_CLOSED, message=message)
464460
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=request_id, error=error_data))
@@ -475,16 +471,13 @@ async def _handle_reconnection(
475471
attempt: int = 0,
476472
) -> None:
477473
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
478-
# Only requests reconnect: every caller reaches here from a request's
479-
# response stream (`_handle_sse_response` asserts the same), so the
480-
# original id to map responses onto is always present.
474+
# Only requests reconnect: every caller arrives from a request's response stream.
481475
assert isinstance(ctx.session_message.message, JSONRPCRequest)
482476
original_request_id = ctx.session_message.message.id
483477

484478
if attempt >= MAX_RECONNECTION_ATTEMPTS:
485-
# Give up AND resolve: without a synthesized error the waiter is
486-
# pending for the session's lifetime, and a request with no read
487-
# timeout (a listen stream) would hang its caller forever.
479+
# Resolve on give-up: a request with no read timeout (a listen
480+
# stream) would otherwise hang its caller forever.
488481
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
489482
await self._resolve_abandoned_request(
490483
ctx.read_stream_writer,

0 commit comments

Comments
 (0)