Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 77 additions & 26 deletions capiscio_sdk/_rpc/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shutil
import stat
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, Tuple
Expand Down Expand Up @@ -152,6 +153,7 @@ def _download_binary(self) -> Path:
"""Download the capiscio-core binary for the current platform.

Downloads from GitHub releases to ~/.capiscio/bin/<version>/.
Retries up to 3 times with exponential backoff.
Returns the path to the executable.
"""
os_name, arch_name = self._get_platform_info()
Expand All @@ -164,30 +166,50 @@ def _download_binary(self) -> Path:
filename = f"capiscio-{os_name}-{arch_name}{ext}"
url = f"https://github.com/{GITHUB_REPO}/releases/download/v{CORE_VERSION}/{filename}"

sys.stderr.write(
f"capiscio-core v{CORE_VERSION} not found. "
f"Downloading for {os_name}/{arch_name}...\n"
)
sys.stderr.flush()
logger.info("Downloading capiscio-core v%s for %s/%s...", CORE_VERSION, os_name, arch_name)

target_path.parent.mkdir(parents=True, exist_ok=True)
try:
with httpx.stream("GET", url, follow_redirects=True, timeout=60.0) as resp:
resp.raise_for_status()
with open(target_path, "wb") as f:
for chunk in resp.iter_bytes(chunk_size=8192):
f.write(chunk)
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
with httpx.stream("GET", url, follow_redirects=True, timeout=60.0) as resp:
resp.raise_for_status()
with open(target_path, "wb") as f:
for chunk in resp.iter_bytes(chunk_size=8192):
f.write(chunk)

# Make executable
st = os.stat(target_path)
os.chmod(target_path, st.st_mode | stat.S_IEXEC)
# Make executable
st = os.stat(target_path)
os.chmod(target_path, st.st_mode | stat.S_IEXEC)

logger.info("Installed capiscio-core v%s at %s", CORE_VERSION, target_path)
return target_path
sys.stderr.write(f"Installed capiscio-core v{CORE_VERSION} at {target_path}\n")
sys.stderr.flush()
logger.info("Installed capiscio-core v%s at %s", CORE_VERSION, target_path)
return target_path

except Exception as e:
if target_path.exists():
target_path.unlink()
raise RuntimeError(
f"Failed to download capiscio-core from {url}: {e}\n"
"You can also set CAPISCIO_BINARY to point to an existing binary."
) from e
except Exception as e:
if target_path.exists():
target_path.unlink()
if attempt < max_attempts:
delay = 2 ** (attempt - 1)
logger.warning(
"Download attempt %d/%d failed: %s. Retrying in %ds...",
attempt, max_attempts, e, delay,
)
time.sleep(delay)
else:
raise RuntimeError(
f"Failed to download capiscio-core from {url} "
f"after {max_attempts} attempts: {e}\n"
"You can also set CAPISCIO_BINARY to point to an existing binary."
) from e
# unreachable, but keeps type checker happy
raise RuntimeError("Download failed")

def ensure_running(
self,
Expand Down Expand Up @@ -250,8 +272,7 @@ def ensure_running(
start_time = time.time()
while time.time() - start_time < timeout:
if self._socket_path.exists():
self._started = True
return self.address
break

# Check if process died
if self._process.poll() is not None:
Expand All @@ -263,13 +284,43 @@ def ensure_running(
)

time.sleep(0.1)
else:
# Timeout - kill process and raise
self.stop()
raise RuntimeError(
f"capiscio server did not start within {timeout}s. "
f"Socket not found at {self._socket_path}"
)

# Timeout - kill process and raise
self.stop()
raise RuntimeError(
f"capiscio server did not start within {timeout}s. "
f"Socket not found at {self._socket_path}"
)
# Socket exists — verify gRPC is actually accepting connections
remaining = timeout - (time.time() - start_time)
if remaining > 0:
import grpc
addr = f"unix://{self._socket_path}"
deadline = time.time() + remaining
while time.time() < deadline:
time_left = deadline - time.time()
if time_left <= 0:
break
channel = grpc.insecure_channel(addr)
try:
grpc.channel_ready_future(channel).result(timeout=min(1.0, time_left))
break
except grpc.FutureTimeoutError:
time.sleep(0.1)
except Exception:
time.sleep(0.1)
Comment on lines +306 to +312
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the gRPC readiness loop, the channel is only closed on the success path. If channel_ready_future(...).result(...) raises (including FutureTimeoutError), the channel stays open, which can leak file descriptors/sockets and keep piling up across retries. Consider creating the channel once per iteration and always closing it in a finally block, and add a small sleep/backoff on FutureTimeoutError to avoid a tight spin loop.

Suggested change
try:
channel = grpc.insecure_channel(addr)
grpc.channel_ready_future(channel).result(timeout=min(1.0, remaining))
channel.close()
break
except grpc.FutureTimeoutError:
pass
except Exception:
time.sleep(0.1)
channel = grpc.insecure_channel(addr)
try:
iter_remaining = deadline - time.time()
if iter_remaining <= 0:
break
grpc.channel_ready_future(channel).result(timeout=min(1.0, iter_remaining))
break
except grpc.FutureTimeoutError:
# Backoff slightly to avoid a tight spin loop while waiting for readiness
time.sleep(0.1)
except Exception:
time.sleep(0.1)
finally:
channel.close()

Copilot uses AI. Check for mistakes.
finally:
channel.close()
else:
self.stop()
raise RuntimeError(
f"capiscio server socket appeared but gRPC not ready "
f"within {timeout}s at {self._socket_path}"
)

self._started = True
return self.address

def stop(self) -> None:
"""Stop the gRPC server process."""
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,18 @@ def test_download_binary_already_cached(self, mock_stream):
mock_stream.assert_not_called()
assert result == mock_path

@patch("capiscio_sdk._rpc.process.time.sleep")
@patch("httpx.stream")
def test_download_binary_http_error(self, mock_stream):
"""Test download handles HTTP errors."""
def test_download_binary_http_error(self, mock_stream, mock_sleep):
"""Test download handles HTTP errors with retries."""
pm = ProcessManager()

with patch("capiscio_sdk._rpc.process.platform.system", return_value="Linux"):
with patch("capiscio_sdk._rpc.process.platform.machine", return_value="x86_64"):
with patch.object(ProcessManager, "_get_cached_binary_path") as mock_cached:
mock_path = MagicMock()
mock_path.exists.side_effect = [False, False] # Not exists before download, not exists after cleanup
# exists() called: once before loop, then once per attempt for cleanup
mock_path.exists.return_value = False
mock_path.parent = MagicMock()
mock_cached.return_value = mock_path

Expand Down
Loading