diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9f3dd5e0b..143d7bbe5 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -466,11 +466,27 @@ async def post_writer( read_stream_writer=read_stream_writer, ) - async def handle_request_async(): - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) + async def handle_request_async() -> None: + try: + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) + except anyio.get_cancelled_exc_class(): + raise + except Exception as exc: + if isinstance(message, JSONRPCRequest): + with contextlib.suppress(Exception): + error_data = ErrorData( + code=INTERNAL_ERROR, + message=str(exc) or "Connection error", + ) + error_msg = SessionMessage( + JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data) + ) + await read_stream_writer.send(error_msg) + else: + logger.warning(f"Failed to send notification: {exc}") # If this is a request, start a new task to handle it if isinstance(message, JSONRPCRequest): diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index f8ca30441..3c24d4b63 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -56,6 +56,8 @@ CallToolRequestParams, CallToolResult, InitializeResult, + JSONRPCError, + JSONRPCNotification, JSONRPCRequest, ListToolsResult, PaginatedRequestParams, @@ -2316,3 +2318,100 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_connection_error_forwarded_to_read_stream(): + """Test that connection errors in post_writer are forwarded to the read + stream instead of crashing the task group. + + When the server is unreachable, _handle_post_request raises a connection + error. The post_writer should catch it and send it through the read stream + so the client session can handle it gracefully. + """ + # Use a port that is not listening to trigger a connection error + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + _, port = s.getsockname() + # Port is now closed — nothing is listening + + async with streamable_http_client(f"http://127.0.0.1:{port}/mcp") as ( + read_stream, + write_stream, + ): + async with read_stream, write_stream: + # Send an initialize request — this will fail to connect + init_message = SessionMessage( + JSONRPCRequest( + jsonrpc="2.0", + id="init-err", + method="initialize", + params={ + "protocolVersion": types.LATEST_PROTOCOL_VERSION, + "capabilities": {}, + "clientInfo": {"name": "test", "version": "1.0"}, + }, + ) + ) + await write_stream.send(init_message) + + # The connection error should be forwarded as a JSONRPCError + # so that send_request can route it to the correct response stream + with anyio.fail_after(5): + result = await read_stream.receive() + assert isinstance(result, SessionMessage) + assert isinstance(result.message, JSONRPCError) + assert result.message.id == "init-err" + + +@pytest.mark.anyio +async def test_notification_connection_error_logged_not_forwarded(): + """Test that notification connection errors are logged and do not crash + the post_writer loop. + + Unlike request errors (which must be forwarded as JSONRPCError to unblock + send_request), notification errors are simply logged since nothing waits + for a notification response. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + _, port = s.getsockname() + + async with streamable_http_client(f"http://127.0.0.1:{port}/mcp") as ( + read_stream, + write_stream, + ): + async with read_stream, write_stream: + # Send a notification — this will fail to connect but should + # not crash the post_writer + notification = SessionMessage( + JSONRPCNotification( + jsonrpc="2.0", + method="notifications/initialized", + ) + ) + await write_stream.send(notification) + + # Send a request after the notification to verify post_writer + # is still alive + request = SessionMessage( + JSONRPCRequest( + jsonrpc="2.0", + id="after-notif", + method="initialize", + params={ + "protocolVersion": types.LATEST_PROTOCOL_VERSION, + "capabilities": {}, + "clientInfo": {"name": "test", "version": "1.0"}, + }, + ) + ) + await write_stream.send(request) + + # The request error should arrive (proving post_writer survived + # the notification error) + with anyio.fail_after(5): + result = await read_stream.receive() + assert isinstance(result, SessionMessage) + assert isinstance(result.message, JSONRPCError) + assert result.message.id == "after-notif"