Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
"Programming Language :: Python :: 3.14",
]
dependencies = [
"anyio>=4.5",
"anyio>=4.9",
"httpx>=0.27.1",
"httpx-sse>=0.4",
"pydantic>=2.12.0",
Expand Down
2 changes: 2 additions & 0 deletions src/mcp/client/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,5 @@ async def post_writer(endpoint_url: str):
finally:
await read_stream_writer.aclose()
await write_stream.aclose()
await read_stream.aclose()
await write_stream_reader.aclose()
Comment on lines +163 to +164
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟣 Not blocking — follow-up note. The same pattern this PR fixes exists in the server-side transports: server/stdio.py, server/websocket.py, and server/sse.py each create 4 stream ends but only close 2 (via inner async with in reader/writer tasks), with no outer try/finally — read_stream and write_stream are never closed by the transport itself. Ironically, server/stdio.py doesn't follow the client/stdio.py pattern this PR cites as the correct reference. This likely doesn't cause the current CI flakiness (server tests run in subprocesses, and tests/server/test_stdio.py manually closes both streams via async with), but it's worth a follow-up issue to apply the same fix on the server side for consistency.

Extended reasoning...

Summary

The server-side transport files have the exact same memory-stream leak pattern that this PR fixes on the client side. This is a pre-existing issue, not introduced by this PR, and does not need to block the PR — but it's directly relevant context that a reviewer should know about when approving a fix scoped to half of the affected code.

Affected files and the pattern

I verified each server transport against the code:

src/mcp/server/stdio.py (lines 52–83): Creates 4 stream ends via two create_memory_object_stream calls. The stdin_reader task uses async with read_stream_writer: and stdout_writer uses async with write_stream_reader:, so 2 ends are closed when the tasks complete. But read_stream and write_stream are yielded to the caller and never closed by the transport. There is no outer try/finally.

src/mcp/server/websocket.py (lines 28–58): Identical structure. ws_reader closes read_stream_writer, ws_writer closes write_stream_reader. read_stream and write_stream are never closed. No outer try/finally.

src/mcp/server/sse.py: Creates 6 stream ends (4 + an SSE pair). A subset is closed by inner task bodies; read_stream, write_stream, and sse_stream_reader are never explicitly closed.

Comparison with the "correct" pattern

The PR description cites client/stdio.py as the reference correct pattern. Lines 208–211 of client/stdio.py close all 4 ends in the finally block:

await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
await write_stream_reader.aclose()

Ironically, server/stdio.py — the file most directly parallel to client/stdio.py — does not follow this pattern.

Why this doesn't cause current CI flakiness (addressing the counter-argument)

One might expect server-side leaks to cause the same knock-on ResourceWarning failures the PR describes. I investigated and this is largely not the case in the current test suite, for two reasons:

  1. Subprocess isolation: tests/shared/test_ws.py and tests/shared/test_sse.py run the server via multiprocessing.Process (see the server fixture in tests/shared/test_sse.py around line 119). Leaked memory streams live in the subprocess's memory and die with the subprocess when proc.kill() is called — they never trigger a ResourceWarning in the pytest process.

  2. Manual close in test code: tests/server/test_stdio.py runs stdio_server in-process, but the test explicitly wraps the yielded streams: async with read_stream: (line 30) and async with write_stream: (line 49). So the test compensates for the transport's missing cleanup.

So the CI evidence cited in the PR description (client-side test_basic_resourcestest_tool_progress cascade) is genuinely a client-side problem, and this PR fully addresses the observed flakiness.

Why it's still worth a follow-up

  • Defense in depth: If someone adds an in-process server test that doesn't manually async with the yielded streams, the leak surfaces.
  • API contract: Relying on callers to close streams that the transport created is brittle. The transport owns the resource; it should own the cleanup.
  • Consistency: After this PR, 4 client transports close all stream ends; 3 server transports don't. That asymmetry is a maintenance hazard.

Recommended fix (for the follow-up, not this PR)

Same pattern as this PR — add the missing aclose() calls in a finally block, or use the compound async with approach from client/websocket.py. Since anyio's aclose() is idempotent, double-close is safe when the caller has already closed its end.

2 changes: 2 additions & 0 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,3 +577,5 @@ def start_get_stream() -> None:
finally:
await read_stream_writer.aclose()
await write_stream.aclose()
await read_stream.aclose()
await write_stream_reader.aclose()
13 changes: 9 additions & 4 deletions src/mcp/client/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ async def websocket_client(
write_stream: MemoryObjectSendStream[SessionMessage]
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]

read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

# Connect using websockets, requesting the "mcp" subprotocol
async with ws_connect(url, subprotocols=[Subprotocol("mcp")]) as ws:
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

async def ws_reader():
"""Reads text messages from the WebSocket, parses them as JSON-RPC messages,
Expand All @@ -68,7 +67,13 @@ async def ws_writer():
msg_dict = session_message.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
await ws.send(json.dumps(msg_dict))

async with anyio.create_task_group() as tg:
async with (
read_stream_writer,
read_stream,
write_stream,
write_stream_reader,
anyio.create_task_group() as tg,
):
# Start reader and writer tasks
tg.start_soon(ws_reader)
tg.start_soon(ws_writer)
Expand Down
102 changes: 102 additions & 0 deletions tests/client/test_transport_stream_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Regression tests for memory stream leaks in client transports.

When a connection error occurs (404, 403, ConnectError), transport context
managers must close ALL 4 memory stream ends they created. anyio memory streams
are paired but independent — closing the writer does NOT close the reader.
Unclosed stream ends emit ResourceWarning on GC, which pytest promotes to a
test failure in whatever test happens to be running when GC triggers.

These tests force GC after the transport context exits, so any leaked stream
triggers a ResourceWarning immediately and deterministically here, rather than
nondeterministically in an unrelated later test.
"""

import gc
import sys
from collections.abc import Iterator
from contextlib import contextmanager

import httpx
import pytest

from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamable_http_client
from mcp.client.websocket import websocket_client


@contextmanager
def _assert_no_memory_stream_leak() -> Iterator[None]:
"""Fail if any anyio MemoryObject stream emits ResourceWarning during the block.

Uses a custom sys.unraisablehook to capture ONLY MemoryObject stream leaks,
ignoring unrelated resources (e.g. PipeHandle from flaky stdio tests on the
same xdist worker). gc.collect() is forced after the block to make leaks
deterministic.
"""
leaked: list[str] = []
old_hook = sys.unraisablehook

def hook(args: "sys.UnraisableHookArgs") -> None: # pragma: no cover
# Only executes if a leak occurs (i.e. the bug is present).
# args.object is the __del__ function (not the stream instance) when
# unraisablehook fires from a finalizer, so check exc_value — the
# actual ResourceWarning("Unclosed <MemoryObjectSendStream at ...>").
# Non-MemoryObject unraisables (e.g. PipeHandle leaked by an earlier
# flaky test on the same xdist worker) are deliberately ignored —
# this test should not fail for another test's resource leak.
if "MemoryObject" in str(args.exc_value):
leaked.append(str(args.exc_value))

sys.unraisablehook = hook
try:
yield
gc.collect()
assert not leaked, f"Memory streams leaked: {leaked}"
finally:
sys.unraisablehook = old_hook


@pytest.mark.anyio
async def test_sse_client_closes_all_streams_on_connection_error(free_tcp_port: int) -> None:
"""sse_client must close all 4 stream ends when the connection fails.

Before the fix, only read_stream_writer and write_stream were closed in
the finally block. read_stream and write_stream_reader were leaked.
"""
with _assert_no_memory_stream_leak():
# sse_client enters a task group BEFORE connecting, so anyio wraps the
# ConnectError from aconnect_sse in an ExceptionGroup.
with pytest.raises(Exception) as exc_info: # noqa: B017
async with sse_client(f"http://127.0.0.1:{free_tcp_port}/sse"):
pytest.fail("should not reach here") # pragma: no cover

assert exc_info.group_contains(httpx.ConnectError)
# exc_info holds the traceback → holds frame locals → keeps leaked
# streams alive. Must drop it before gc.collect() can detect a leak.
del exc_info


@pytest.mark.anyio
async def test_streamable_http_client_closes_all_streams_on_exit() -> None:
"""streamable_http_client must close all 4 stream ends on exit.

Before the fix, read_stream was never closed — not even on the happy path.
This test enters and exits the context without sending any messages, so no
network connection is ever attempted (streamable_http connects lazily).
"""
with _assert_no_memory_stream_leak():
async with streamable_http_client("http://127.0.0.1:1/mcp"):
pass


@pytest.mark.anyio
async def test_websocket_client_closes_all_streams_on_connection_error(free_tcp_port: int) -> None:
"""websocket_client must close all 4 stream ends when ws_connect fails.

Before the fix, there was no try/finally at all — if ws_connect raised,
all 4 streams were leaked.
"""
with _assert_no_memory_stream_leak():
with pytest.raises(OSError):
async with websocket_client(f"ws://127.0.0.1:{free_tcp_port}/ws"):
pytest.fail("should not reach here") # pragma: no cover
6 changes: 0 additions & 6 deletions tests/shared/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,6 @@ def test_sse_server_transport_endpoint_validation(endpoint: str, expected_result
assert sse._endpoint.startswith("/")


# ResourceWarning filter: When mocking aconnect_sse, the sse_client's internal task
# group doesn't receive proper cancellation signals, so the sse_reader task's finally
# block (which closes read_stream_writer) doesn't execute. This is a test artifact -
# the actual code path (`if not sse.data: continue`) IS exercised and works correctly.
# Production code with real SSE connections cleans up properly.
@pytest.mark.filterwarnings("ignore::ResourceWarning")
@pytest.mark.anyio
async def test_sse_client_handles_empty_keepalive_pings() -> None:
"""Test that SSE client properly handles empty data lines (keep-alive pings).
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading