Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
87ded33
chore: create feature branch for #80
Abernaughty Apr 1, 2026
63277c8
feat(events): add TOOL_CALL event type for agent tool usage tracking …
Abernaughty Apr 1, 2026
38e32ad
feat(runner): add tools_config init, TOOL_CALL SSE events, tool usage…
Abernaughty Apr 1, 2026
73f3312
feat(orchestrator): agent tool binding - bind_tools, tool loop, init_…
Abernaughty Apr 1, 2026
2c8face
feat(orchestrator): full agent tool binding implementation (#80)
Abernaughty Apr 1, 2026
adc12f7
test(tool-binding): add 35 tests for agent tool binding (#80)
Abernaughty Apr 1, 2026
5689745
chore: remove branch placeholder
Abernaughty Apr 1, 2026
04c28c2
fix(test_e2e): convert node-level tests to async for developer_node/q…
Abernaughty Apr 1, 2026
33969b9
placeholder
Abernaughty Apr 1, 2026
5c7bac6
fix(orchestrator): sync node functions with _run_async bridge, fix sa…
Abernaughty Apr 1, 2026
29ad3c4
temp placeholder for test_e2e
Abernaughty Apr 1, 2026
9889eb7
fix(test_e2e): restore sync node calls — developer_node/qa_node are s…
Abernaughty Apr 1, 2026
240c09b
fix(test_tool_binding): sync node tests, fix patch target src.tools.l…
Abernaughty Apr 1, 2026
ba4e2db
fix: address CodeRabbit review — 6 fixes for PR #83
Abernaughty Apr 1, 2026
b86c39a
fix(runner): per-pass dev tool call count (Fix 6)
Abernaughty Apr 1, 2026
9ebf679
fix(tests): update test_tool_binding for Fixes 2/3/4/5
Abernaughty Apr 1, 2026
3a372a7
fix(sandbox): format_validation_summary called with SandboxResult not…
Abernaughty Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .branch-placeholder

This file was deleted.

2 changes: 2 additions & 0 deletions dev-suite/src/api/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Async event bus for real-time SSE streaming to the dashboard.

Issue #35: SSE Event System -- Real-Time Task Streaming
Issue #80: Added TOOL_CALL event type for agent tool usage tracking

The EventBus is a singleton that LangGraph nodes publish events to.
Connected SSE clients each get their own asyncio.Queue for fan-out.
Expand Down Expand Up @@ -49,6 +50,7 @@ class EventType(str, Enum):
MEMORY_ADDED = "memory_added"
LOG_LINE = "log_line"
QA_ESCALATION = "qa_escalation"
TOOL_CALL = "tool_call"
Comment on lines 52 to +53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Update the dashboard SSE contract for tool_call.

This enum addition is still invisible to the browser: dashboard/src/lib/types/api.ts (Lines 172-177) excludes 'tool_call', and dashboard/src/lib/sse.ts (Lines 44-80 and 137-150) neither subscribes to nor dispatches it. The runner can publish the event, but the dashboard will drop it, so Issue #80's visibility goal stays broken. Based on learnings: Use SSE client with window event dispatch for real-time dashboard updates from backend.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@dev-suite/src/api/events.py` around lines 52 - 53, The dashboard doesn't
recognize the new QA_ESCALATION/TOOL_CALL event because the dashboard type union
and SSE wiring lack 'tool_call'; update the dashboard API types (add 'tool_call'
to the union in api type definitions) and add handling in the SSE client
(subscribe for 'tool_call' in the SSE handling code and dispatch it to the rest
of the app using the same window event pattern used for other events so the
dashboard receives real-time updates). Ensure you reference the same event
string "tool_call" in the type union, the SSE subscription switch/handler, and
the window.dispatchEvent/new CustomEvent dispatch so published runner events are
accepted and emitted to the UI.



class SSEEvent(BaseModel):
Expand Down
74 changes: 70 additions & 4 deletions dev-suite/src/api/runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Async task runner bridging the FastAPI API to the LangGraph orchestrator.

Issue #48: StateManager <-> Orchestrator bridge
Issue #80: Tool binding -- tools_config initialization, TOOL_CALL SSE events

Uses LangGraph's astream() to iterate node completions and emit SSE events
in real time. Runs entirely on the async event loop -- no threading needed.
Expand Down Expand Up @@ -28,6 +29,7 @@
GraphState,
WorkflowStatus,
build_graph,
init_tools_config,
MAX_RETRIES,
TOKEN_BUDGET,
)
Expand Down Expand Up @@ -92,6 +94,8 @@ class TaskRunner:

def __init__(self):
self._tasks: dict[str, asyncio.Task] = {}
# Fix 6: Track per-task dev tool call baselines for per-pass counting
self._dev_tool_baselines: dict[str, int] = {}

def submit(self, task_id: str, description: str) -> None:
"""Submit a task for background execution."""
Expand Down Expand Up @@ -122,6 +126,7 @@ async def shutdown(self) -> None:
if self._tasks:
await asyncio.gather(*self._tasks.values(), return_exceptions=True)
self._tasks.clear()
self._dev_tool_baselines.clear()
logger.info("TaskRunner shutdown complete")

@property
Expand All @@ -133,6 +138,8 @@ async def _run_task(self, task_id: str, description: str) -> None:
from .state import state_manager

start_time = time.time()
# Fix 6: Initialize per-task dev tool baseline
self._dev_tool_baselines[task_id] = 0

try:
await self._emit_progress(task_id, "task_started", None, f"Task started: {description[:100]}")
Expand All @@ -142,6 +149,14 @@ async def _run_task(self, task_id: str, description: str) -> None:
graph = build_graph()
workflow = graph.compile()

# Initialize tools config (issue #80)
tools_config = init_tools_config()
n_tools = len(tools_config.get("configurable", {}).get("tools", []))
if n_tools > 0:
await self._emit_log(f"[orchestrator] {n_tools} tools loaded for agents")
else:
await self._emit_log("[orchestrator] No tools configured (single-shot mode)")

initial_state: GraphState = {
"task_description": description,
"blueprint": None,
Expand All @@ -154,13 +169,35 @@ async def _run_task(self, task_id: str, description: str) -> None:
"memory_context": [],
"trace": [],
"parsed_files": [],
"tool_calls_log": [],
}

stream_config = {
"recursion_limit": 25,
**tools_config,
}

prev_node = None
async for event in workflow.astream(initial_state, config={"recursion_limit": 25}):
prev_tool_count = 0
async for event in workflow.astream(initial_state, config=stream_config):
for node_name, node_output in event.items():
if node_name.startswith("__"):
continue

# Emit tool_call SSE events for any new tool calls (issue #80)
tool_calls_log = node_output.get("tool_calls_log", [])
if len(tool_calls_log) > prev_tool_count:
new_calls = tool_calls_log[prev_tool_count:]
for tc in new_calls:
await self._emit_tool_call(
task_id,
tc.get("agent", "unknown"),
tc.get("tool", "unknown"),
tc.get("success", True),
tc.get("result_preview", ""),
)
prev_tool_count = len(tool_calls_log)

await self._handle_node_completion(
task_id, node_name, node_output, state_manager, prev_node,
)
Expand All @@ -181,9 +218,7 @@ async def _run_task(self, task_id: str, description: str) -> None:
if task:
task.status = TaskStatus.CANCELLED
task.completed_at = datetime.now(timezone.utc)
for agent_id in ("arch", "dev", "qa"):
await state_manager.update_agent_status(agent_id, AgentStatus.IDLE)
await self._emit_complete(task_id, "cancelled", "Task cancelled by user")
await self._emit_complete(task_id, "cancelled", "Task was cancelled")

except Exception as e:
logger.error("Task %s failed with exception: %s", task_id, e, exc_info=True)
Expand All @@ -197,6 +232,10 @@ async def _run_task(self, task_id: str, description: str) -> None:
await self._emit_complete(task_id, "failed", f"Task failed: {e}")
await self._emit_log(f"[orchestrator] ERROR: {e}")

finally:
# Fix 6: Clean up per-task baseline
self._dev_tool_baselines.pop(task_id, None)

async def _handle_node_completion(self, task_id, node_name, node_output, state_manager, prev_node):
"""Process a completed node and emit appropriate SSE events."""
if prev_node and prev_node in NODE_TO_AGENT:
Expand Down Expand Up @@ -303,12 +342,23 @@ async def _handle_architect(self, task_id, output, task, state_manager):
async def _handle_developer(self, task_id, output, task, state_manager):
code = output.get("generated_code", "")
task.generated_code = code

# Fix 6: Use per-pass baseline to count only new tool calls from this dev pass
tool_calls_log = output.get("tool_calls_log", [])
baseline = self._dev_tool_baselines.get(task_id, 0)
dev_tool_calls = [tc for tc in tool_calls_log[baseline:] if tc.get("agent") == "developer"]
self._dev_tool_baselines[task_id] = len(tool_calls_log)

if code:
action = f"Code generated ({len(code):,} chars)"
if dev_tool_calls:
action += f" using {len(dev_tool_calls)} tool call(s)"
event_type = "code"
retry = task.budget.retries_used
if retry > 0:
action = f"Retry {retry}/{task.budget.max_retries} -- code regenerated ({len(code):,} chars)"
if dev_tool_calls:
action += f" using {len(dev_tool_calls)} tool call(s)"
event_type = "retry"
await self._emit_log("[sandbox:locked] E2B micro-VM started (dev-sandbox)")
else:
Expand Down Expand Up @@ -362,6 +412,22 @@ async def _emit_log(self, message):
except Exception:
logger.debug("Failed to emit log_line", exc_info=True)

async def _emit_tool_call(self, task_id, agent, tool_name, success, result_preview):
"""Emit a TOOL_CALL SSE event for dashboard tool usage tracking (issue #80)."""
try:
await event_bus.publish(SSEEvent(
type=EventType.TOOL_CALL,
data={
"task_id": task_id,
"agent": agent,
"tool": tool_name,
"success": success,
"result_preview": result_preview[:100] if result_preview else "",
},
))
except Exception:
logger.debug("Failed to emit tool_call", exc_info=True)


# -- Singleton --

Expand Down
Loading
Loading