Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
integration-tests:
name: SDK Server Contract Tests
runs-on: ubuntu-latest
timeout-minutes: 15
timeout-minutes: 25

steps:
- name: Checkout SDK repository
Expand Down
9 changes: 6 additions & 3 deletions capiscio_sdk/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def connect(self) -> AgentIdentity:
# Step 3: Initialize identity via capiscio-core Init RPC (one call does everything)
# If keys already exist locally, we recover the DID without calling core.
did = self._init_identity()
self.did = did
logger.info(f"DID: {did}")

# Step 3.5: Activate agent on server
Expand Down Expand Up @@ -804,11 +805,13 @@ def _setup_badge(self):
from .badge_keeper import BadgeKeeper
from .simple_guard import SimpleGuard

# Set up SimpleGuard with correct parameters
# Set up SimpleGuard — keys are already loaded in gRPC server
# from _init_identity(), so skip file-based PEM loading
guard = SimpleGuard(
base_dir=str(self.keys_dir.parent),
agent_id=self.agent_id,
agent_id=self.did,
dev_mode=self.dev_mode,
signing_kid=self.did,
keys_preloaded=True,
)

# Set up BadgeKeeper with correct parameters
Expand Down
48 changes: 44 additions & 4 deletions capiscio_sdk/integrations/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""FastAPI integration for Capiscio SimpleGuard."""
from typing import Callable, Awaitable, Any, Dict, List, Optional, TYPE_CHECKING
from typing import Callable, Awaitable, Any, Dict, List, Optional, Union, TYPE_CHECKING
try:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
Expand All @@ -25,7 +25,9 @@ class CapiscioMiddleware(BaseHTTPMiddleware):

Args:
app: The ASGI application.
guard: SimpleGuard instance for verification.
guard: SimpleGuard instance, or a callable returning one (for lazy binding).
When a callable is provided, the guard is resolved on first request.
This allows registering middleware at module level before connect() runs.
exclude_paths: List of paths to skip verification (e.g., ["/health", "/.well-known/agent-card.json"]).
config: Optional SecurityConfig to control enforcement behavior.
emitter: Optional EventEmitter for auto-event emission. When provided,
Expand All @@ -42,14 +44,21 @@ class CapiscioMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: ASGIApp,
guard: SimpleGuard,
guard: Union[SimpleGuard, Callable[[], Optional[SimpleGuard]], None] = None,
exclude_paths: Optional[List[str]] = None,
*, # Force config to be keyword-only
config: Optional["SecurityConfig"] = None,
emitter: Optional[EventEmitter] = None,
) -> None:
super().__init__(app)
self.guard = guard
self._guard_factory: Optional[Callable[[], Optional[SimpleGuard]]] = None
# Treat as factory if it's a plain callable without guard interface,
# OR if it's a class/type (e.g. passing SimpleGuard itself, not an instance)
if guard is not None and callable(guard) and (isinstance(guard, type) or not hasattr(guard, 'verify_inbound')):
self._guard_factory = guard
self._guard: Optional[SimpleGuard] = None
else:
self._guard = guard
self.config = config
self.exclude_paths = exclude_paths or []
self._emitter = emitter
Expand All @@ -60,6 +69,21 @@ def __init__(

logger.info(f"CapiscioMiddleware initialized: exclude_paths={self.exclude_paths}, require_signatures={self.require_signatures}, fail_mode={self.fail_mode}, auto_events={emitter is not None}")

@property
def guard(self) -> Optional[SimpleGuard]:
"""Resolve guard lazily if a factory was provided."""
if self._guard is None and self._guard_factory is not None:
self._guard = self._guard_factory()
return self._guard

def set_guard(self, guard: SimpleGuard) -> None:
"""Set or replace the guard instance after construction.

Useful for binding the guard in a lifespan handler after connect().
"""
self._guard = guard
self._guard_factory = None

async def dispatch(
self,
request: Request,
Expand All @@ -76,6 +100,22 @@ async def dispatch(
logger.debug(f"CapiscioMiddleware: SKIPPING verification for {path}")
return await call_next(request)

# If guard is not yet bound (lazy binding), fail closed to avoid unverified access
if self.guard is None:
logger.error("CapiscioMiddleware: guard not bound or unavailable; blocking request")
request.state.agent = None
request.state.agent_id = None
self._auto_emit(EventEmitter.EVENT_VERIFICATION_FAILED, {
"method": request.method,
"path": path,
"reason": "guard_unavailable",
"duration_ms": 0.0,
})
return JSONResponse(
{"error": "CapiscIO guard is not available. Requests cannot be verified at this time."},
status_code=503,
)

request_start = time.perf_counter()

# Auto-event: request.received
Expand Down
111 changes: 61 additions & 50 deletions capiscio_sdk/simple_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def __init__(
rpc_address: Optional[str] = None,
agent_id: Optional[str] = None,
badge_token: Optional[str] = None,
signing_kid: Optional[str] = None,
keys_preloaded: bool = False,
) -> None:
"""
Initialize SimpleGuard.
Expand All @@ -54,12 +56,17 @@ def __init__(
rpc_address: gRPC server address. If None, auto-starts local server.
agent_id: Explicit agent DID. If None:
- In dev_mode: Auto-generates did:key from keypair
- Otherwise: Loaded from agent-card.json
- Otherwise: Loaded from agent-card.json (deprecated)
badge_token: Pre-obtained badge token to use for identity. When set,
make_headers() will use this token instead of signing.
signing_kid: Explicit key ID for signing. When provided with agent_id,
skips agent-card.json entirely.
keys_preloaded: If True, skip file-based key loading (keys already
loaded in gRPC server, e.g. from CapiscIO.connect()).
"""
self.dev_mode = dev_mode
self._explicit_agent_id = agent_id
self._explicit_signing_kid = signing_kid
self._badge_token = badge_token

# 1. Safety Check
Expand All @@ -69,26 +76,29 @@ def __init__(
"This is insecure! Disable dev_mode in production."
)

# 2. Resolve base_dir
# 2. Resolve base_dir (skip walking for agent-card.json when identity params provided)
self.project_root = self._resolve_project_root(base_dir)
self.keys_dir = self.project_root / "capiscio_keys"
self.trusted_dir = self.keys_dir / "trusted"
self.agent_card_path = self.project_root / "agent-card.json"

# 3. Connect to gRPC server
self._client = CapiscioRPCClient(address=rpc_address)
self._client.connect()

# 4. Load or generate agent identity
# 4. Resolve agent identity
self.agent_id: str
self.signing_kid: str
self._load_or_generate_card()
self._resolve_identity()

# 5. Load or generate keys via gRPC (may update agent_id with did:key)
self._load_or_generate_keys()
if not keys_preloaded:
self._load_or_generate_keys()
else:
logger.info(f"Keys preloaded in gRPC server, skipping file-based key loading")
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log call uses an f-string but doesn't interpolate anything. Consider using a plain string to avoid unnecessary formatting overhead and keep logging consistent.

Suggested change
logger.info(f"Keys preloaded in gRPC server, skipping file-based key loading")
logger.info("Keys preloaded in gRPC server, skipping file-based key loading")

Copilot uses AI. Check for mistakes.

# 6. Load trust store
self._setup_trust_store()
if not keys_preloaded:
self._setup_trust_store()

def sign_outbound(self, payload: Dict[str, Any], body: Optional[bytes] = None) -> str:
"""
Expand Down Expand Up @@ -201,9 +211,17 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()

def _resolve_project_root(self, base_dir: Optional[Union[str, Path]]) -> Path:
"""Walk up the directory tree to find agent-card.json or stop at root."""
"""Resolve the project root directory.

When agent_id is provided explicitly, uses base_dir (or cwd) directly
without walking up the tree looking for agent-card.json.
"""
current = Path(base_dir or os.getcwd()).resolve()

# When identity params are provided, don't walk looking for agent-card.json
if self._explicit_agent_id:
return current

search_path = current
while search_path != search_path.parent:
if (search_path / "agent-card.json").exists():
Expand All @@ -212,18 +230,31 @@ def _resolve_project_root(self, base_dir: Optional[Union[str, Path]]) -> Path:

return current

def _load_or_generate_card(self) -> None:
"""Load agent-card.json or generate a minimal one in dev_mode."""
# If explicit agent_id was provided, use it
def _resolve_identity(self) -> None:
"""Resolve agent identity from explicit params, agent-card.json (legacy), or dev defaults.

Priority order:
1. Explicit agent_id + signing_kid params (preferred — no file needed)
2. Explicit agent_id only (signing_kid defaults to "key-0")
3. Legacy agent-card.json file (deprecated)
4. Dev mode auto-generation
"""
# Case 1 & 2: Explicit agent_id provided
if self._explicit_agent_id:
self.agent_id = self._explicit_agent_id
self.signing_kid = "key-0" # Will be updated when keys are generated/loaded
self.signing_kid = self._explicit_signing_kid or "key-0"
logger.info(f"Using explicit agent_id: {self.agent_id}")
return

if self.agent_card_path.exists():

# Case 3: Legacy agent-card.json (deprecated path)
agent_card_path = self.project_root / "agent-card.json"
if agent_card_path.exists():
logger.warning(
"Loading identity from agent-card.json is deprecated. "
"Pass agent_id and signing_kid to SimpleGuard() directly."
)
try:
with open(self.agent_card_path, "r") as f:
with open(agent_card_path, "r") as f:
data = json.load(f)
self.agent_id = data.get("agent_id")
keys = data.get("public_keys", [])
Expand All @@ -233,15 +264,25 @@ def _load_or_generate_card(self) -> None:

if not self.agent_id or not self.signing_kid:
raise ConfigurationError("agent-card.json missing 'agent_id' or 'public_keys[0].kid'.")
except ConfigurationError:
raise
except Exception as e:
raise ConfigurationError(f"Failed to load agent-card.json: {e}")
elif self.dev_mode:
return

# Case 4: Dev mode — placeholder until key generation
if self.dev_mode:
logger.info("Dev Mode: Will generate did:key identity from keypair")
# Placeholder - will be updated with did:key after key generation
self.agent_id = "local-dev-agent" # Temporary, replaced in _load_or_generate_keys
self.agent_id = "local-dev-agent"
self.signing_kid = "local-dev-key"
else:
raise ConfigurationError(f"agent-card.json not found at {self.project_root}")
return

raise ConfigurationError(
"No agent identity configured. Either:\n"
" - Pass agent_id (and optionally signing_kid) to SimpleGuard()\n"
" - Use dev_mode=True for auto-generated identity\n"
" - Use CapiscIO.connect() which handles identity automatically"
)

def _load_or_generate_keys(self) -> None:
"""Load keys or generate them in dev_mode via gRPC.
Expand Down Expand Up @@ -290,39 +331,9 @@ def _load_or_generate_keys(self) -> None:
# Save public key
with open(public_key_path, "w") as f:
f.write(key_info["public_key_pem"])

# Update agent-card.json with JWK
self._update_agent_card_with_pem(key_info["public_key_pem"])
else:
raise ConfigurationError(f"private.pem not found at {private_key_path}")

def _update_agent_card_with_pem(self, public_key_pem: str) -> None:
"""Helper to write agent-card.json with the generated key."""
# For simplicity, just create a minimal card
# In production, would convert PEM to JWK
card_data = {
"agent_id": self.agent_id,
"public_keys": [{
"kty": "OKP",
"crv": "Ed25519",
"kid": self.signing_kid,
"use": "sig",
# Note: x would need to be extracted from PEM
}],
"protocolVersion": "0.3.0",
"name": "Local Dev Agent",
"description": "Auto-generated by SimpleGuard",
"url": "http://localhost:8000",
"version": "0.1.0",
"provider": {
"organization": "Local Dev"
}
}

with open(self.agent_card_path, "w") as f:
json.dump(card_data, f, indent=2)
logger.info(f"Created agent-card.json at {self.agent_card_path}")

def _setup_trust_store(self) -> None:
"""Ensure trust store exists and add self-trust in dev_mode."""
if not self.trusted_dir.exists() and self.dev_mode:
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ def test_setup_badge_success(self, tmp_path):
auto_badge=True,
dev_mode=False,
)
connector.did = "did:key:z6Mktest"

mock_keeper = MagicMock()
mock_keeper.get_current_badge.return_value = "badge-jwt"
Expand Down
69 changes: 69 additions & 0 deletions tests/unit/test_fastapi_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,75 @@ async def test_endpoint(request: Request):
assert data["agent_id"] is None


class TestLazyGuardBinding:
"""Tests for lazy guard binding (callable and set_guard)."""

def test_middleware_with_callable_guard(self):
"""Test that guard can be a callable that returns the SimpleGuard."""
mock_guard = MagicMock()
mock_guard.agent_id = "test-agent"
mock_guard.verify_inbound.return_value = {"iss": "did:key:caller", "sub": "test"}

app = FastAPI()
app.add_middleware(
CapiscioMiddleware,
guard=lambda: mock_guard,
exclude_paths=["/health"],
)

@app.post("/test")
async def test_endpoint(request: Request):
return {"agent_id": getattr(request.state, 'agent_id', None)}

client = TestClient(app)
headers = {"X-Capiscio-Badge": "mock.jws.token", "Content-Type": "application/json"}
response = client.post("/test", json={}, headers=headers)
assert response.status_code == 200
assert response.json()["agent_id"] == "did:key:caller"

def test_middleware_with_none_guard_passes_through(self):
"""Test that None guard returns 503 (fail closed)."""
app = FastAPI()
app.add_middleware(
CapiscioMiddleware,
guard=None,
exclude_paths=["/health"],
)

@app.post("/test")
async def test_endpoint(request: Request):
return {
"agent": getattr(request.state, 'agent', 'not-set'),
"agent_id": getattr(request.state, 'agent_id', 'not-set'),
}

client = TestClient(app)
response = client.post("/test", json={})
assert response.status_code == 503
assert "guard is not available" in response.json()["error"]

Comment on lines +457 to +477
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test name says it "passes_through" but the assertions verify the middleware fails closed with a 503 when guard=None. Please rename the test (and/or docstring) to reflect the actual behavior to avoid confusion when reading failures.

Copilot uses AI. Check for mistakes.
def test_middleware_callable_returning_none(self):
"""Test that callable returning None (guard not ready) returns 503."""
app = FastAPI()
app.add_middleware(
CapiscioMiddleware,
guard=lambda: None,
exclude_paths=["/health"],
)

@app.post("/test")
async def test_endpoint(request: Request):
return {
"agent": getattr(request.state, 'agent', 'not-set'),
"agent_id": getattr(request.state, 'agent_id', 'not-set'),
}

client = TestClient(app)
response = client.post("/test", json={})
assert response.status_code == 503
assert "guard is not available" in response.json()["error"]


class TestAutoEvents:
"""Tests for middleware auto-event emission."""

Expand Down
Loading
Loading