diff --git a/capiscio_sdk/_rpc/client.py b/capiscio_sdk/_rpc/client.py index b3b3f11..b7135b3 100644 --- a/capiscio_sdk/_rpc/client.py +++ b/capiscio_sdk/_rpc/client.py @@ -1,5 +1,6 @@ """gRPC client wrapper for capiscio-core.""" +import sys from typing import Generator, Optional import grpc @@ -95,7 +96,10 @@ def connect(self, timeout: float = 10.0) -> "CapiscioRPCClient": self._process_manager = get_process_manager() address = self._process_manager.ensure_running(timeout=timeout) elif address is None: - address = "unix:///tmp/capiscio.sock" + if sys.platform == "win32": + address = "localhost:50051" + else: + address = "unix:///tmp/capiscio.sock" # Create channel if address.startswith("unix://"): diff --git a/capiscio_sdk/_rpc/process.py b/capiscio_sdk/_rpc/process.py index aaab9f4..dd7ff7a 100644 --- a/capiscio_sdk/_rpc/process.py +++ b/capiscio_sdk/_rpc/process.py @@ -5,6 +5,7 @@ import os import platform import shutil +import socket import stat import subprocess import sys @@ -245,6 +246,41 @@ def ensure_running( binary = self._download_binary() self._binary_path = binary + # Windows doesn't support Unix sockets — use TCP instead + if sys.platform == "win32": + return self._start_tcp(binary, timeout) + else: + return self._start_unix_socket(binary, socket_path, timeout) + + def _start_tcp(self, binary: Path, timeout: float) -> str: + """Start the gRPC server with a TCP listener (used on Windows).""" + port = self._find_free_port() + addr = f"localhost:{port}" + cmd = [str(binary), "rpc", "--address", addr] + + try: + popen_kwargs = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + } + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + else: + popen_kwargs["start_new_session"] = True + self._process = subprocess.Popen(cmd, **popen_kwargs) + except Exception as e: + raise RuntimeError(f"Failed to start capiscio server: {e}") from e + + self._tcp_address = addr + self._wait_grpc_ready(addr, timeout) + self._drain_pipes() + self._started = True + return self.address + + def _start_unix_socket( + self, binary: Path, socket_path: Optional[Path], timeout: float + ) -> str: + """Start the gRPC server with a Unix socket listener.""" # Set up socket path self._socket_path = socket_path or DEFAULT_SOCKET_PATH @@ -268,7 +304,7 @@ def ensure_running( except Exception as e: raise RuntimeError(f"Failed to start capiscio server: {e}") from e - # Wait for socket to appear + # Wait for socket file to appear start_time = time.time() while time.time() - start_time < timeout: if self._socket_path.exists(): @@ -277,10 +313,11 @@ def ensure_running( # Check if process died if self._process.poll() is not None: stdout, stderr = self._process.communicate() + self.stop() raise RuntimeError( f"capiscio server exited unexpectedly:\n" - f"stdout: {stdout.decode()}\n" - f"stderr: {stderr.decode()}" + f"stdout: {stdout.decode(errors='replace') if stdout else ''}\n" + f"stderr: {stderr.decode(errors='replace') if stderr else ''}" ) time.sleep(0.1) @@ -294,33 +331,60 @@ def ensure_running( # Socket exists — verify gRPC is actually accepting connections remaining = timeout - (time.time() - start_time) - if remaining > 0: - import grpc - addr = f"unix://{self._socket_path}" - deadline = time.time() + remaining - while time.time() < deadline: - time_left = deadline - time.time() - if time_left <= 0: - break - channel = grpc.insecure_channel(addr) - try: - grpc.channel_ready_future(channel).result(timeout=min(1.0, time_left)) - break - except grpc.FutureTimeoutError: - time.sleep(0.1) - except Exception: - time.sleep(0.1) - finally: - channel.close() - else: + addr = f"unix://{self._socket_path}" + self._wait_grpc_ready(addr, remaining) + self._drain_pipes() + self._started = True + return self.address + + @staticmethod + def _find_free_port() -> int: + """Find a free TCP port by binding to port 0.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + def _drain_pipes(self) -> None: + """Close piped stdout/stderr to prevent OS buffer fill on long-lived processes.""" + if self._process is not None: + if self._process.stdout: + self._process.stdout.close() + if self._process.stderr: + self._process.stderr.close() + + def _wait_grpc_ready(self, addr: str, remaining: float) -> None: + """Wait for the gRPC server to accept connections.""" + if remaining <= 0: + return + import grpc + deadline = time.time() + remaining + while time.time() < deadline: + # Check if process died + if self._process is not None and self._process.poll() is not None: + stdout, stderr = self._process.communicate() self.stop() raise RuntimeError( - f"capiscio server socket appeared but gRPC not ready " - f"within {timeout}s at {self._socket_path}" + f"capiscio server exited unexpectedly:\n" + f"stdout: {stdout.decode(errors='replace') if stdout else ''}\n" + f"stderr: {stderr.decode(errors='replace') if stderr else ''}" ) - - self._started = True - return self.address + time_left = deadline - time.time() + if time_left <= 0: + break + channel = grpc.insecure_channel(addr) + try: + grpc.channel_ready_future(channel).result(timeout=min(1.0, time_left)) + return + except grpc.FutureTimeoutError: + time.sleep(0.1) + except Exception: + time.sleep(0.1) + finally: + channel.close() + self.stop() + raise RuntimeError( + f"capiscio server gRPC not ready within timeout at {addr}" + ) def stop(self) -> None: """Stop the gRPC server process.""" diff --git a/tests/unit/test_process.py b/tests/unit/test_process.py index 197a38a..005086b 100644 --- a/tests/unit/test_process.py +++ b/tests/unit/test_process.py @@ -3,6 +3,7 @@ import os import platform import pytest +import sys from pathlib import Path from unittest.mock import MagicMock, patch, mock_open @@ -216,3 +217,90 @@ def test_binary_download_triggered_when_not_found(self): # Just verify the method exists and can be mocked for integration assert hasattr(pm, "_download_binary") assert callable(pm._download_binary) + + def test_find_free_port(self): + """Test that _find_free_port returns a valid port number.""" + port = ProcessManager._find_free_port() + assert isinstance(port, int) + assert 1 <= port <= 65535 + + def test_ensure_running_delegates_to_tcp_on_windows(self): + """Test ensure_running uses TCP when sys.platform is win32.""" + pm = ProcessManager() + mock_binary = MagicMock() + + with patch.object(pm, "find_binary", return_value=mock_binary): + with patch("capiscio_sdk._rpc.process.sys.platform", "win32"): + with patch.object(pm, "_start_tcp", return_value="localhost:9999") as mock_tcp: + result = pm.ensure_running() + mock_tcp.assert_called_once_with(mock_binary, 5.0) + assert result == "localhost:9999" + + def test_ensure_running_delegates_to_unix_socket_on_posix(self): + """Test ensure_running uses Unix socket when not on Windows.""" + pm = ProcessManager() + mock_binary = MagicMock() + + with patch.object(pm, "find_binary", return_value=mock_binary): + with patch("capiscio_sdk._rpc.process.sys.platform", "darwin"): + with patch.object( + pm, "_start_unix_socket", return_value="unix:///tmp/test.sock" + ) as mock_unix: + result = pm.ensure_running() + mock_unix.assert_called_once_with(mock_binary, None, 5.0) + assert result == "unix:///tmp/test.sock" + + def test_start_tcp_spawns_with_address_flag(self): + """Test _start_tcp spawns the binary with --address flag.""" + pm = ProcessManager() + mock_binary = Path("/tmp/capiscio") + + with patch.object(ProcessManager, "_find_free_port", return_value=54321): + with patch("subprocess.Popen") as mock_popen: + mock_proc = MagicMock() + mock_proc.poll.return_value = None + mock_popen.return_value = mock_proc + + with patch.object(pm, "_wait_grpc_ready"): + pm._start_tcp(mock_binary, timeout=5.0) + + # Verify it used --address, not --socket + call_args = mock_popen.call_args + cmd = call_args[0][0] + assert cmd == ["/tmp/capiscio", "rpc", "--address", "localhost:54321"] + assert pm._tcp_address == "localhost:54321" + + def test_start_tcp_uses_platform_appropriate_process_isolation(self): + """Test _start_tcp uses correct process isolation per platform.""" + pm = ProcessManager() + mock_binary = Path("/tmp/capiscio") + + with patch.object(ProcessManager, "_find_free_port", return_value=12345): + with patch("subprocess.Popen") as mock_popen: + mock_proc = MagicMock() + mock_proc.poll.return_value = None + mock_popen.return_value = mock_proc + + with patch.object(pm, "_wait_grpc_ready"): + pm._start_tcp(mock_binary, timeout=5.0) + + call_kwargs = mock_popen.call_args[1] + if sys.platform == "win32": + import subprocess + assert call_kwargs["creationflags"] == subprocess.CREATE_NEW_PROCESS_GROUP + else: + assert call_kwargs["start_new_session"] is True + + def test_address_property_returns_tcp_when_set(self): + """Test address property returns TCP address when set.""" + pm = ProcessManager() + pm._tcp_address = "localhost:50051" + assert pm.address == "localhost:50051" + + def test_address_property_returns_unix_by_default(self): + """Test address property returns Unix socket by default.""" + pm = ProcessManager() + pm._tcp_address = None + pm._socket_path = None + from capiscio_sdk._rpc.process import DEFAULT_SOCKET_PATH + assert pm.address == f"unix://{DEFAULT_SOCKET_PATH}"