Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions dev-suite/src/api/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
"qa": ("qa", AgentStatus.REVIEWING),
}

# Infrastructure nodes that get SSE events but aren't mapped to agents
INFRA_NODES = {"apply_code", "sandbox_validate", "flush_memory"}

# Map orchestrator WorkflowStatus to API TaskStatus
WORKFLOW_TO_TASK_STATUS = {
WorkflowStatus.PLANNING: TaskStatus.PLANNING,
Expand Down Expand Up @@ -150,6 +153,7 @@ async def _run_task(self, task_id: str, description: str) -> None:
"error_message": "",
"memory_context": [],
"trace": [],
"parsed_files": [],
}

prev_node = None
Expand Down Expand Up @@ -199,6 +203,11 @@ async def _handle_node_completion(self, task_id, node_name, node_output, state_m
prev_agent_id, _ = NODE_TO_AGENT[prev_node]
await state_manager.update_agent_status(prev_agent_id, AgentStatus.IDLE)

# Handle infrastructure nodes (apply_code, sandbox_validate, flush_memory)
if node_name in INFRA_NODES:
await self._handle_infra_node(task_id, node_name, node_output, state_manager)
return

if node_name not in NODE_TO_AGENT:
return

Expand Down Expand Up @@ -229,6 +238,51 @@ async def _handle_node_completion(self, task_id, node_name, node_output, state_m
elif node_name == "qa":
await self._handle_qa(task_id, node_output, task, state_manager)

async def _handle_infra_node(self, task_id, node_name, node_output, state_manager):
"""Handle infrastructure nodes that aren't tied to specific agents."""
task = state_manager.get_task(task_id)
if not task:
return

if node_name == "apply_code":
parsed_files = node_output.get("parsed_files", [])
if parsed_files:
n_files = len(parsed_files)
total_chars = sum(len(f.get("content", "")) for f in parsed_files)
action = f"Applied {n_files} file{'s' if n_files != 1 else ''} to workspace ({total_chars:,} chars)"
task.timeline.append(TimelineEvent(
time=_now_str(), agent="dev", action=action, type="exec",
))
await self._emit_progress(task_id, "code_applied", "dev", action)
await self._emit_log(f"[apply_code] Writing {n_files} files to workspace...")
for pf in parsed_files:
await self._emit_log(f"[apply_code] + {pf.get('path', '?')}")
else:
await self._emit_log("[apply_code] No files to apply (skipped)")

elif node_name == "sandbox_validate":
sandbox_result = node_output.get("sandbox_result")
if sandbox_result is not None:
passed = sandbox_result.tests_passed
failed = sandbox_result.tests_failed
exit_code = sandbox_result.exit_code
if exit_code == 0 and (failed is None or failed == 0):
action = f"Sandbox validation passed (exit code {exit_code})"
if passed is not None:
action = f"Sandbox: {passed} tests passed"
await self._emit_log(f"[sandbox:locked] Validation passed (exit={exit_code})")
else:
action = f"Sandbox: {failed or '?'} test(s) failed"
await self._emit_log(f"[sandbox:locked] Validation: {passed or '?'} passed, {failed or '?'} failed")
task.timeline.append(TimelineEvent(
time=_now_str(), agent="qa", action=action, type="exec",
))
await self._emit_progress(task_id, "sandbox_validated", "qa", action)
else:
await self._emit_log("[sandbox] Validation skipped (no E2B key or no code files)")

# flush_memory doesn't need SSE events -- it's internal bookkeeping

async def _handle_architect(self, task_id, output, task, state_manager):
blueprint = output.get("blueprint")
if blueprint and isinstance(blueprint, Blueprint):
Expand Down
138 changes: 130 additions & 8 deletions dev-suite/src/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""LangGraph orchestrator -- Architect -> Lead Dev -> QA loop.
"""LangGraph orchestrator -- Architect -> Lead Dev -> apply_code -> sandbox -> QA loop.

This is the main entry point for the agent workflow.
Implements the state machine with retry logic, token budgets,
structured Blueprint passing, human escalation, and memory
write-back (flush_memory node with mini-summarizer).
structured Blueprint passing, human escalation, code application,
and memory write-back (flush_memory node with mini-summarizer).
"""

import json
import logging
import os
from enum import Enum
from pathlib import Path
from typing import Any, Literal, TypedDict

from dotenv import load_dotenv
Expand All @@ -30,6 +31,11 @@
format_validation_summary,
get_validation_plan,
)
from .tools.code_parser import (
CodeParserError,
parse_generated_code,
validate_paths_for_workspace,
)
from .tracing import add_trace_event, create_trace_config

load_dotenv()
Expand All @@ -55,6 +61,17 @@ def _safe_int(env_key: str, default: int) -> int:
TOKEN_BUDGET = _safe_int("TOKEN_BUDGET", 50000)


# -- Workspace --

def _get_workspace_root() -> Path:
"""Get the workspace root directory.

Reads WORKSPACE_ROOT env var, falling back to current working directory.
"""
raw = os.getenv("WORKSPACE_ROOT", ".")
return Path(raw).resolve()


# -- Workflow State --

class WorkflowStatus(str, Enum):
Expand Down Expand Up @@ -86,6 +103,7 @@ class GraphState(TypedDict, total=False):
memory_writes: list[dict]
trace: list[str]
sandbox_result: SandboxResult | None
parsed_files: list[dict]


class AgentState(BaseModel):
Expand All @@ -105,6 +123,7 @@ class AgentState(BaseModel):
memory_writes: list[dict] = []
trace: list[str] = []
sandbox_result: SandboxResult | None = None
parsed_files: list[dict] = []


# -- LLM Initialization --
Expand Down Expand Up @@ -385,6 +404,92 @@ def developer_node(state: GraphState) -> dict:
}


# -- Code Application --

def apply_code_node(state: GraphState) -> dict:
"""Parse generated code into files, write to workspace, prepare for sandbox.

This node bridges the gap between the Dev agent's text output and the
filesystem. It:
1. Parses generated_code using # --- FILE: path --- markers
2. Validates paths for workspace containment (security)
3. Writes each file to the workspace directory
4. Stores the parsed file map in state for sandbox_validate to load

The node never changes workflow status -- it's a pass-through that
enriches state with parsed_files.
"""
trace = list(state.get("trace", []))
trace.append("apply_code: starting")

generated_code = state.get("generated_code", "")
blueprint = state.get("blueprint")

if not generated_code:
trace.append("apply_code: no generated_code -- skipping")
return {"parsed_files": [], "trace": trace}

if not blueprint:
trace.append("apply_code: no blueprint -- skipping")
return {"parsed_files": [], "trace": trace}

# Step 1: Parse generated code into individual files
try:
parsed = parse_generated_code(generated_code)
except CodeParserError as e:
logger.warning("[APPLY_CODE] Parse error: %s", e)
trace.append(f"apply_code: parse error -- {e}")
return {"parsed_files": [], "trace": trace}

if not parsed:
trace.append("apply_code: parser returned no files")
return {"parsed_files": [], "trace": trace}

# Step 2: Validate paths for workspace containment
workspace_root = _get_workspace_root()
safe_files = validate_paths_for_workspace(parsed, workspace_root)

if len(safe_files) < len(parsed):
skipped = len(parsed) - len(safe_files)
trace.append(
f"apply_code: WARNING -- {skipped} file(s) skipped "
f"due to path validation"
)

# Step 3: Write files to workspace
total_chars = 0
written_count = 0
for pf in safe_files:
try:
target = workspace_root / pf.path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(pf.content, encoding="utf-8")
written_count += 1
total_chars += len(pf.content)
except Exception as e:
logger.warning(
"[APPLY_CODE] Failed to write %s: %s", pf.path, e
)
trace.append(f"apply_code: failed to write {pf.path} -- {e}")

trace.append(
f"apply_code: wrote {written_count} files "
f"({total_chars:,} chars total) to workspace"
)
logger.info(
"[APPLY_CODE] Wrote %d files (%d chars) to %s",
written_count, total_chars, workspace_root,
)

# Step 4: Store parsed files in state for sandbox loading
parsed_files_data = [
{"path": pf.path, "content": pf.content}
for pf in safe_files
]

return {"parsed_files": parsed_files_data, "trace": trace}


def qa_node(state: GraphState) -> dict:
"""QA: reviews the generated code and produces a structured FailureReport."""
trace = list(state.get("trace", []))
Expand Down Expand Up @@ -538,6 +643,7 @@ def _run_sandbox_validation(
commands: list[str],
template: str | None,
generated_code: str,
parsed_files: list[dict] | None = None,
timeout: int = 120,
) -> SandboxResult | None:
"""Execute validation commands in an E2B sandbox.
Expand All @@ -551,14 +657,17 @@ def _run_sandbox_validation(

runner = E2BRunner(api_key=api_key, default_timeout=timeout)

# Convert parsed_files to the project_files dict format expected by run_tests
project_files = None
if parsed_files:
project_files = {pf["path"]: pf["content"] for pf in parsed_files}

# Build a compound command that runs all validations sequentially
# and captures all output. We join with && so early failures are visible
# but use || true in the individual commands (already present in
# PYTHON_COMMANDS / FRONTEND_COMMANDS) so we get all output.
compound_cmd = " && ".join(commands)

return runner.run_tests(
test_command=compound_cmd,
project_files=project_files,
timeout=timeout,
template=template,
)
Expand All @@ -570,6 +679,9 @@ def sandbox_validate_node(state: GraphState) -> dict:
Selects the appropriate template and validation commands based on
the Blueprint's target_files, then executes them in an E2B sandbox.

Now loads parsed_files into the sandbox before running commands,
so validation runs against real code instead of an empty project.

Behavior:
- Optional: if E2B_API_KEY is not set, logs a warning and skips.
- Errors are caught and logged, never crash the workflow.
Expand Down Expand Up @@ -598,12 +710,19 @@ def sandbox_validate_node(state: GraphState) -> dict:
)

generated_code = state.get("generated_code", "")
parsed_files = state.get("parsed_files", [])

if parsed_files:
trace.append(
f"sandbox_validate: loading {len(parsed_files)} files into sandbox"
)

try:
result = _run_sandbox_validation(
commands=plan.commands,
template=plan.template,
generated_code=generated_code,
parsed_files=parsed_files if parsed_files else None,
)

if result is None:
Expand Down Expand Up @@ -739,7 +858,7 @@ def build_graph() -> StateGraph:
"""Build the LangGraph state machine.

Flow:
START -> architect -> developer -> sandbox_validate -> qa -> (conditional)
START -> architect -> developer -> apply_code -> sandbox_validate -> qa -> (conditional)
-> pass: flush_memory -> END
-> fail: developer (retry)
-> escalate: architect (re-plan)
Expand All @@ -749,13 +868,15 @@ def build_graph() -> StateGraph:

graph.add_node("architect", architect_node)
graph.add_node("developer", developer_node)
graph.add_node("apply_code", apply_code_node)
graph.add_node("sandbox_validate", sandbox_validate_node)
graph.add_node("qa", qa_node)
graph.add_node("flush_memory", flush_memory_node)

graph.add_edge(START, "architect")
graph.add_edge("architect", "developer")
graph.add_edge("developer", "sandbox_validate")
graph.add_edge("developer", "apply_code")
graph.add_edge("apply_code", "sandbox_validate")
graph.add_edge("sandbox_validate", "qa")
graph.add_conditional_edges("qa", route_after_qa)
graph.add_edge("flush_memory", END)
Expand Down Expand Up @@ -811,6 +932,7 @@ def run_task(
"memory_writes": [],
"trace": [],
"sandbox_result": None,
"parsed_files": [],
}

invoke_config = {
Expand Down
Loading
Loading