Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion capiscio_sdk/_rpc/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""gRPC client wrapper for capiscio-core."""

import sys
from typing import Generator, Optional

import grpc
Expand Down Expand Up @@ -95,7 +96,10 @@ def connect(self, timeout: float = 10.0) -> "CapiscioRPCClient":
self._process_manager = get_process_manager()
address = self._process_manager.ensure_running(timeout=timeout)
elif address is None:
address = "unix:///tmp/capiscio.sock"
if sys.platform == "win32":
address = "localhost:50051"
else:
address = "unix:///tmp/capiscio.sock"

# Create channel
if address.startswith("unix://"):
Expand Down
118 changes: 91 additions & 27 deletions capiscio_sdk/_rpc/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import platform
import shutil
import socket
import stat
import subprocess
import sys
Expand Down Expand Up @@ -245,6 +246,41 @@ def ensure_running(
binary = self._download_binary()
self._binary_path = binary

# Windows doesn't support Unix sockets — use TCP instead
if sys.platform == "win32":
return self._start_tcp(binary, timeout)
else:
return self._start_unix_socket(binary, socket_path, timeout)

def _start_tcp(self, binary: Path, timeout: float) -> str:
"""Start the gRPC server with a TCP listener (used on Windows)."""
port = self._find_free_port()
addr = f"localhost:{port}"
cmd = [str(binary), "rpc", "--address", addr]

try:
popen_kwargs = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
}
if sys.platform == "win32":
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
popen_kwargs["start_new_session"] = True
self._process = subprocess.Popen(cmd, **popen_kwargs)
Comment on lines +261 to +270
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _start_tcp, the child process is long-lived but stdout/stderr are piped without any reader. If capiscio-core logs enough data, the OS pipe buffer can fill and block the server process. Consider redirecting these streams to DEVNULL/a log file, or starting a background drain thread if you need to capture output for errors.

Copilot uses AI. Check for mistakes.
except Exception as e:
raise RuntimeError(f"Failed to start capiscio server: {e}") from e

self._tcp_address = addr
self._wait_grpc_ready(addr, timeout)
self._drain_pipes()
self._started = True
return self.address

def _start_unix_socket(
self, binary: Path, socket_path: Optional[Path], timeout: float
) -> str:
"""Start the gRPC server with a Unix socket listener."""
# Set up socket path
self._socket_path = socket_path or DEFAULT_SOCKET_PATH

Expand All @@ -268,7 +304,7 @@ def ensure_running(
except Exception as e:
raise RuntimeError(f"Failed to start capiscio server: {e}") from e

# Wait for socket to appear
# Wait for socket file to appear
start_time = time.time()
while time.time() - start_time < timeout:
if self._socket_path.exists():
Expand All @@ -277,10 +313,11 @@ def ensure_running(
# Check if process died
if self._process.poll() is not None:
stdout, stderr = self._process.communicate()
self.stop()
raise RuntimeError(
f"capiscio server exited unexpectedly:\n"
f"stdout: {stdout.decode()}\n"
f"stderr: {stderr.decode()}"
f"stdout: {stdout.decode(errors='replace') if stdout else ''}\n"
f"stderr: {stderr.decode(errors='replace') if stderr else ''}"
)

time.sleep(0.1)
Expand All @@ -294,33 +331,60 @@ def ensure_running(

# Socket exists — verify gRPC is actually accepting connections
remaining = timeout - (time.time() - start_time)
if remaining > 0:
import grpc
addr = f"unix://{self._socket_path}"
deadline = time.time() + remaining
while time.time() < deadline:
time_left = deadline - time.time()
if time_left <= 0:
break
channel = grpc.insecure_channel(addr)
try:
grpc.channel_ready_future(channel).result(timeout=min(1.0, time_left))
break
except grpc.FutureTimeoutError:
time.sleep(0.1)
except Exception:
time.sleep(0.1)
finally:
channel.close()
else:
addr = f"unix://{self._socket_path}"
self._wait_grpc_ready(addr, remaining)
self._drain_pipes()
self._started = True
return self.address

@staticmethod
def _find_free_port() -> int:
"""Find a free TCP port by binding to port 0."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]

def _drain_pipes(self) -> None:
"""Close piped stdout/stderr to prevent OS buffer fill on long-lived processes."""
if self._process is not None:
if self._process.stdout:
self._process.stdout.close()
if self._process.stderr:
self._process.stderr.close()

def _wait_grpc_ready(self, addr: str, remaining: float) -> None:
"""Wait for the gRPC server to accept connections."""
if remaining <= 0:
return
import grpc
deadline = time.time() + remaining
while time.time() < deadline:
# Check if process died
if self._process is not None and self._process.poll() is not None:
stdout, stderr = self._process.communicate()
self.stop()
raise RuntimeError(
f"capiscio server socket appeared but gRPC not ready "
f"within {timeout}s at {self._socket_path}"
f"capiscio server exited unexpectedly:\n"
f"stdout: {stdout.decode(errors='replace') if stdout else ''}\n"
f"stderr: {stderr.decode(errors='replace') if stderr else ''}"
)

self._started = True
return self.address
time_left = deadline - time.time()
if time_left <= 0:
break
channel = grpc.insecure_channel(addr)
try:
grpc.channel_ready_future(channel).result(timeout=min(1.0, time_left))
return
except grpc.FutureTimeoutError:
time.sleep(0.1)
except Exception:
time.sleep(0.1)
finally:
channel.close()
self.stop()
raise RuntimeError(
f"capiscio server gRPC not ready within timeout at {addr}"
)

def stop(self) -> None:
"""Stop the gRPC server process."""
Expand Down
88 changes: 88 additions & 0 deletions tests/unit/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import platform
import pytest
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch, mock_open

Expand Down Expand Up @@ -216,3 +217,90 @@ def test_binary_download_triggered_when_not_found(self):
# Just verify the method exists and can be mocked for integration
assert hasattr(pm, "_download_binary")
assert callable(pm._download_binary)

def test_find_free_port(self):
"""Test that _find_free_port returns a valid port number."""
port = ProcessManager._find_free_port()
assert isinstance(port, int)
assert 1 <= port <= 65535

def test_ensure_running_delegates_to_tcp_on_windows(self):
"""Test ensure_running uses TCP when sys.platform is win32."""
pm = ProcessManager()
mock_binary = MagicMock()

with patch.object(pm, "find_binary", return_value=mock_binary):
with patch("capiscio_sdk._rpc.process.sys.platform", "win32"):
with patch.object(pm, "_start_tcp", return_value="localhost:9999") as mock_tcp:
result = pm.ensure_running()
mock_tcp.assert_called_once_with(mock_binary, 5.0)
assert result == "localhost:9999"

def test_ensure_running_delegates_to_unix_socket_on_posix(self):
"""Test ensure_running uses Unix socket when not on Windows."""
pm = ProcessManager()
mock_binary = MagicMock()

with patch.object(pm, "find_binary", return_value=mock_binary):
with patch("capiscio_sdk._rpc.process.sys.platform", "darwin"):
with patch.object(
pm, "_start_unix_socket", return_value="unix:///tmp/test.sock"
) as mock_unix:
result = pm.ensure_running()
mock_unix.assert_called_once_with(mock_binary, None, 5.0)
assert result == "unix:///tmp/test.sock"

def test_start_tcp_spawns_with_address_flag(self):
"""Test _start_tcp spawns the binary with --address flag."""
pm = ProcessManager()
mock_binary = Path("/tmp/capiscio")

with patch.object(ProcessManager, "_find_free_port", return_value=54321):
with patch("subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_popen.return_value = mock_proc

with patch.object(pm, "_wait_grpc_ready"):
pm._start_tcp(mock_binary, timeout=5.0)

# Verify it used --address, not --socket
call_args = mock_popen.call_args
cmd = call_args[0][0]
assert cmd == ["/tmp/capiscio", "rpc", "--address", "localhost:54321"]
assert pm._tcp_address == "localhost:54321"

def test_start_tcp_uses_platform_appropriate_process_isolation(self):
"""Test _start_tcp uses correct process isolation per platform."""
pm = ProcessManager()
mock_binary = Path("/tmp/capiscio")

with patch.object(ProcessManager, "_find_free_port", return_value=12345):
with patch("subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.poll.return_value = None
mock_popen.return_value = mock_proc

with patch.object(pm, "_wait_grpc_ready"):
pm._start_tcp(mock_binary, timeout=5.0)

call_kwargs = mock_popen.call_args[1]
if sys.platform == "win32":
import subprocess
assert call_kwargs["creationflags"] == subprocess.CREATE_NEW_PROCESS_GROUP
else:
assert call_kwargs["start_new_session"] is True

def test_address_property_returns_tcp_when_set(self):
"""Test address property returns TCP address when set."""
pm = ProcessManager()
pm._tcp_address = "localhost:50051"
assert pm.address == "localhost:50051"

def test_address_property_returns_unix_by_default(self):
"""Test address property returns Unix socket by default."""
pm = ProcessManager()
pm._tcp_address = None
pm._socket_path = None
from capiscio_sdk._rpc.process import DEFAULT_SOCKET_PATH
assert pm.address == f"unix://{DEFAULT_SOCKET_PATH}"
Loading