diff --git a/dev-suite/src/api/runner.py b/dev-suite/src/api/runner.py index 728676e..70567d8 100644 --- a/dev-suite/src/api/runner.py +++ b/dev-suite/src/api/runner.py @@ -49,6 +49,9 @@ "qa": ("qa", AgentStatus.REVIEWING), } +# Infrastructure nodes that get SSE events but aren't mapped to agents +INFRA_NODES = {"apply_code", "sandbox_validate", "flush_memory"} + # Map orchestrator WorkflowStatus to API TaskStatus WORKFLOW_TO_TASK_STATUS = { WorkflowStatus.PLANNING: TaskStatus.PLANNING, @@ -150,6 +153,7 @@ async def _run_task(self, task_id: str, description: str) -> None: "error_message": "", "memory_context": [], "trace": [], + "parsed_files": [], } prev_node = None @@ -199,6 +203,11 @@ async def _handle_node_completion(self, task_id, node_name, node_output, state_m prev_agent_id, _ = NODE_TO_AGENT[prev_node] await state_manager.update_agent_status(prev_agent_id, AgentStatus.IDLE) + # Handle infrastructure nodes (apply_code, sandbox_validate, flush_memory) + if node_name in INFRA_NODES: + await self._handle_infra_node(task_id, node_name, node_output, state_manager) + return + if node_name not in NODE_TO_AGENT: return @@ -229,6 +238,51 @@ async def _handle_node_completion(self, task_id, node_name, node_output, state_m elif node_name == "qa": await self._handle_qa(task_id, node_output, task, state_manager) + async def _handle_infra_node(self, task_id, node_name, node_output, state_manager): + """Handle infrastructure nodes that aren't tied to specific agents.""" + task = state_manager.get_task(task_id) + if not task: + return + + if node_name == "apply_code": + parsed_files = node_output.get("parsed_files", []) + if parsed_files: + n_files = len(parsed_files) + total_chars = sum(len(f.get("content", "")) for f in parsed_files) + action = f"Applied {n_files} file{'s' if n_files != 1 else ''} to workspace ({total_chars:,} chars)" + task.timeline.append(TimelineEvent( + time=_now_str(), agent="dev", action=action, type="exec", + )) + await self._emit_progress(task_id, "code_applied", "dev", action) + await self._emit_log(f"[apply_code] Writing {n_files} files to workspace...") + for pf in parsed_files: + await self._emit_log(f"[apply_code] + {pf.get('path', '?')}") + else: + await self._emit_log("[apply_code] No files to apply (skipped)") + + elif node_name == "sandbox_validate": + sandbox_result = node_output.get("sandbox_result") + if sandbox_result is not None: + passed = sandbox_result.tests_passed + failed = sandbox_result.tests_failed + exit_code = sandbox_result.exit_code + if exit_code == 0 and (failed is None or failed == 0): + action = f"Sandbox validation passed (exit code {exit_code})" + if passed is not None: + action = f"Sandbox: {passed} tests passed" + await self._emit_log(f"[sandbox:locked] Validation passed (exit={exit_code})") + else: + action = f"Sandbox: {failed or '?'} test(s) failed" + await self._emit_log(f"[sandbox:locked] Validation: {passed or '?'} passed, {failed or '?'} failed") + task.timeline.append(TimelineEvent( + time=_now_str(), agent="qa", action=action, type="exec", + )) + await self._emit_progress(task_id, "sandbox_validated", "qa", action) + else: + await self._emit_log("[sandbox] Validation skipped (no E2B key or no code files)") + + # flush_memory doesn't need SSE events -- it's internal bookkeeping + async def _handle_architect(self, task_id, output, task, state_manager): blueprint = output.get("blueprint") if blueprint and isinstance(blueprint, Blueprint): diff --git a/dev-suite/src/orchestrator.py b/dev-suite/src/orchestrator.py index 068011f..1ef969a 100644 --- a/dev-suite/src/orchestrator.py +++ b/dev-suite/src/orchestrator.py @@ -1,15 +1,16 @@ -"""LangGraph orchestrator -- Architect -> Lead Dev -> QA loop. +"""LangGraph orchestrator -- Architect -> Lead Dev -> apply_code -> sandbox -> QA loop. This is the main entry point for the agent workflow. Implements the state machine with retry logic, token budgets, -structured Blueprint passing, human escalation, and memory -write-back (flush_memory node with mini-summarizer). +structured Blueprint passing, human escalation, code application, +and memory write-back (flush_memory node with mini-summarizer). """ import json import logging import os from enum import Enum +from pathlib import Path from typing import Any, Literal, TypedDict from dotenv import load_dotenv @@ -30,6 +31,11 @@ format_validation_summary, get_validation_plan, ) +from .tools.code_parser import ( + CodeParserError, + parse_generated_code, + validate_paths_for_workspace, +) from .tracing import add_trace_event, create_trace_config load_dotenv() @@ -55,6 +61,17 @@ def _safe_int(env_key: str, default: int) -> int: TOKEN_BUDGET = _safe_int("TOKEN_BUDGET", 50000) +# -- Workspace -- + +def _get_workspace_root() -> Path: + """Get the workspace root directory. + + Reads WORKSPACE_ROOT env var, falling back to current working directory. + """ + raw = os.getenv("WORKSPACE_ROOT", ".") + return Path(raw).resolve() + + # -- Workflow State -- class WorkflowStatus(str, Enum): @@ -86,6 +103,7 @@ class GraphState(TypedDict, total=False): memory_writes: list[dict] trace: list[str] sandbox_result: SandboxResult | None + parsed_files: list[dict] class AgentState(BaseModel): @@ -105,6 +123,7 @@ class AgentState(BaseModel): memory_writes: list[dict] = [] trace: list[str] = [] sandbox_result: SandboxResult | None = None + parsed_files: list[dict] = [] # -- LLM Initialization -- @@ -385,6 +404,92 @@ def developer_node(state: GraphState) -> dict: } +# -- Code Application -- + +def apply_code_node(state: GraphState) -> dict: + """Parse generated code into files, write to workspace, prepare for sandbox. + + This node bridges the gap between the Dev agent's text output and the + filesystem. It: + 1. Parses generated_code using # --- FILE: path --- markers + 2. Validates paths for workspace containment (security) + 3. Writes each file to the workspace directory + 4. Stores the parsed file map in state for sandbox_validate to load + + The node never changes workflow status -- it's a pass-through that + enriches state with parsed_files. + """ + trace = list(state.get("trace", [])) + trace.append("apply_code: starting") + + generated_code = state.get("generated_code", "") + blueprint = state.get("blueprint") + + if not generated_code: + trace.append("apply_code: no generated_code -- skipping") + return {"parsed_files": [], "trace": trace} + + if not blueprint: + trace.append("apply_code: no blueprint -- skipping") + return {"parsed_files": [], "trace": trace} + + # Step 1: Parse generated code into individual files + try: + parsed = parse_generated_code(generated_code) + except CodeParserError as e: + logger.warning("[APPLY_CODE] Parse error: %s", e) + trace.append(f"apply_code: parse error -- {e}") + return {"parsed_files": [], "trace": trace} + + if not parsed: + trace.append("apply_code: parser returned no files") + return {"parsed_files": [], "trace": trace} + + # Step 2: Validate paths for workspace containment + workspace_root = _get_workspace_root() + safe_files = validate_paths_for_workspace(parsed, workspace_root) + + if len(safe_files) < len(parsed): + skipped = len(parsed) - len(safe_files) + trace.append( + f"apply_code: WARNING -- {skipped} file(s) skipped " + f"due to path validation" + ) + + # Step 3: Write files to workspace + total_chars = 0 + written_count = 0 + for pf in safe_files: + try: + target = workspace_root / pf.path + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(pf.content, encoding="utf-8") + written_count += 1 + total_chars += len(pf.content) + except Exception as e: + logger.warning( + "[APPLY_CODE] Failed to write %s: %s", pf.path, e + ) + trace.append(f"apply_code: failed to write {pf.path} -- {e}") + + trace.append( + f"apply_code: wrote {written_count} files " + f"({total_chars:,} chars total) to workspace" + ) + logger.info( + "[APPLY_CODE] Wrote %d files (%d chars) to %s", + written_count, total_chars, workspace_root, + ) + + # Step 4: Store parsed files in state for sandbox loading + parsed_files_data = [ + {"path": pf.path, "content": pf.content} + for pf in safe_files + ] + + return {"parsed_files": parsed_files_data, "trace": trace} + + def qa_node(state: GraphState) -> dict: """QA: reviews the generated code and produces a structured FailureReport.""" trace = list(state.get("trace", [])) @@ -538,6 +643,7 @@ def _run_sandbox_validation( commands: list[str], template: str | None, generated_code: str, + parsed_files: list[dict] | None = None, timeout: int = 120, ) -> SandboxResult | None: """Execute validation commands in an E2B sandbox. @@ -551,14 +657,17 @@ def _run_sandbox_validation( runner = E2BRunner(api_key=api_key, default_timeout=timeout) + # Convert parsed_files to the project_files dict format expected by run_tests + project_files = None + if parsed_files: + project_files = {pf["path"]: pf["content"] for pf in parsed_files} + # Build a compound command that runs all validations sequentially - # and captures all output. We join with && so early failures are visible - # but use || true in the individual commands (already present in - # PYTHON_COMMANDS / FRONTEND_COMMANDS) so we get all output. compound_cmd = " && ".join(commands) return runner.run_tests( test_command=compound_cmd, + project_files=project_files, timeout=timeout, template=template, ) @@ -570,6 +679,9 @@ def sandbox_validate_node(state: GraphState) -> dict: Selects the appropriate template and validation commands based on the Blueprint's target_files, then executes them in an E2B sandbox. + Now loads parsed_files into the sandbox before running commands, + so validation runs against real code instead of an empty project. + Behavior: - Optional: if E2B_API_KEY is not set, logs a warning and skips. - Errors are caught and logged, never crash the workflow. @@ -598,12 +710,19 @@ def sandbox_validate_node(state: GraphState) -> dict: ) generated_code = state.get("generated_code", "") + parsed_files = state.get("parsed_files", []) + + if parsed_files: + trace.append( + f"sandbox_validate: loading {len(parsed_files)} files into sandbox" + ) try: result = _run_sandbox_validation( commands=plan.commands, template=plan.template, generated_code=generated_code, + parsed_files=parsed_files if parsed_files else None, ) if result is None: @@ -739,7 +858,7 @@ def build_graph() -> StateGraph: """Build the LangGraph state machine. Flow: - START -> architect -> developer -> sandbox_validate -> qa -> (conditional) + START -> architect -> developer -> apply_code -> sandbox_validate -> qa -> (conditional) -> pass: flush_memory -> END -> fail: developer (retry) -> escalate: architect (re-plan) @@ -749,13 +868,15 @@ def build_graph() -> StateGraph: graph.add_node("architect", architect_node) graph.add_node("developer", developer_node) + graph.add_node("apply_code", apply_code_node) graph.add_node("sandbox_validate", sandbox_validate_node) graph.add_node("qa", qa_node) graph.add_node("flush_memory", flush_memory_node) graph.add_edge(START, "architect") graph.add_edge("architect", "developer") - graph.add_edge("developer", "sandbox_validate") + graph.add_edge("developer", "apply_code") + graph.add_edge("apply_code", "sandbox_validate") graph.add_edge("sandbox_validate", "qa") graph.add_conditional_edges("qa", route_after_qa) graph.add_edge("flush_memory", END) @@ -811,6 +932,7 @@ def run_task( "memory_writes": [], "trace": [], "sandbox_result": None, + "parsed_files": [], } invoke_config = { diff --git a/dev-suite/src/tools/code_parser.py b/dev-suite/src/tools/code_parser.py new file mode 100644 index 0000000..cd9ec15 --- /dev/null +++ b/dev-suite/src/tools/code_parser.py @@ -0,0 +1,202 @@ +"""Code parser utility for extracting files from Dev agent output. + +The Dev agent produces code as a single text string with markers like: + # --- FILE: path/to/file.py --- + +This module parses that output into individual (filepath, content) pairs +that can be written to the workspace and loaded into E2B sandboxes. + +Pure functions, no side effects, fully testable in isolation. +""" + +from __future__ import annotations + +import logging +import os +import re +from dataclasses import dataclass +from pathlib import Path, PurePosixPath + +logger = logging.getLogger(__name__) + +# Matches: # --- FILE: path/to/file.ext --- +# Allows optional trailing whitespace and varying dash counts (3+) +_FILE_MARKER_RE = re.compile( + r"^#\s*-{3,}\s*FILE:\s*(.+?)\s*-{3,}\s*$", + re.MULTILINE, +) + + +@dataclass(frozen=True) +class ParsedFile: + """A single file extracted from generated code output. + + Attributes: + path: Relative file path (forward-slash normalized). + content: File content as a string. + """ + + path: str + content: str + + +class CodeParserError(Exception): + """Raised when code parsing encounters an unrecoverable issue.""" + + +def _normalize_path(raw_path: str) -> str: + """Normalize a file path to forward slashes, strip leading ./ and whitespace. + + Raises CodeParserError for dangerous paths (traversal, absolute). + """ + cleaned = raw_path.strip() + + if not cleaned: + raise CodeParserError("Empty file path in marker") + + # Normalize to forward slashes + cleaned = cleaned.replace("\\", "/") + + # Strip leading ./ + while cleaned.startswith("./"): + cleaned = cleaned[2:] + + # Reject Unix absolute paths + if cleaned.startswith("/"): + raise CodeParserError( + f"Absolute path not allowed: '{raw_path}'" + ) + + # Reject Windows-style absolute paths (e.g., C:/ or D:\) + if len(cleaned) >= 2 and cleaned[1] == ":" and cleaned[0].isalpha(): + raise CodeParserError( + f"Absolute path not allowed: '{raw_path}'" + ) + + # Reject path traversal + parts = PurePosixPath(cleaned).parts + if ".." in parts: + raise CodeParserError( + f"Path traversal not allowed: '{raw_path}'" + ) + + # Reject empty after normalization + if not cleaned or cleaned == ".": + raise CodeParserError( + f"Path resolves to empty after normalization: '{raw_path}'" + ) + + return cleaned + + +def parse_generated_code( + generated_code: str, + *, + default_filename: str = "output.py", +) -> list[ParsedFile]: + """Parse Dev agent output into individual files. + + Extracts files delimited by ``# --- FILE: path/to/file ---`` markers. + If no markers are found, the entire output is treated as a single file + with the given default_filename. + + Args: + generated_code: Raw text output from the Dev agent. + default_filename: Filename to use when no markers are present. + + Returns: + List of ParsedFile objects. Order matches appearance in input. + Duplicate paths: last occurrence wins (later content replaces earlier). + + Raises: + CodeParserError: On dangerous paths (traversal, absolute). + """ + if not generated_code or not generated_code.strip(): + return [] + + # Find all markers and their positions + markers = list(_FILE_MARKER_RE.finditer(generated_code)) + + if not markers: + # No markers -- treat entire output as single file + content = generated_code.strip() + if not content: + return [] + return [ParsedFile(path=default_filename, content=content)] + + # Extract files between markers. + # Track insertion order alongside last-wins content (Python 3.7+ dict + # preserves insertion order, but we need first-seen order with last-wins + # content for duplicates). + seen: dict[str, ParsedFile] = {} + first_seen_order: list[str] = [] + + for i, match in enumerate(markers): + raw_path = match.group(1) + path = _normalize_path(raw_path) + + if path not in seen: + first_seen_order.append(path) + + # Content starts after this marker's line + content_start = match.end() + + # Content ends at the next marker (or end of string) + if i + 1 < len(markers): + content_end = markers[i + 1].start() + else: + content_end = len(generated_code) + + content = generated_code[content_start:content_end] + + # Strip leading blank line (common after marker) and trailing whitespace + content = content.strip("\n").rstrip() + + # If content is empty after stripping, we still record the file + # (agent may have intentionally created an empty __init__.py) + + # Last occurrence wins for duplicate paths + seen[path] = ParsedFile(path=path, content=content) + + return [seen[p] for p in first_seen_order] + + +def validate_paths_for_workspace( + parsed_files: list[ParsedFile], + workspace_root: Path, +) -> list[ParsedFile]: + """Validate that all parsed file paths are within workspace bounds. + + Uses pathlib's relative_to() for proper containment checking to prevent + path traversal attacks, including sibling-directory false positives. + + Args: + parsed_files: Files to validate. + workspace_root: Absolute path to the workspace root. + + Returns: + List of files that passed validation (unsafe paths are logged and skipped). + + Note: + This is a defense-in-depth measure. _normalize_path() already rejects + obvious traversal patterns, but this catches symlink-based escapes + and other edge cases at the filesystem level. + """ + resolved_root = workspace_root.resolve() + safe_files: list[ParsedFile] = [] + + for pf in parsed_files: + target = (resolved_root / pf.path).resolve() + try: + target.relative_to(resolved_root) + except ValueError: + # Path escapes workspace -- log but don't crash + logger.warning( + "Skipping file with path escaping workspace: %s -> %s", + pf.path, + target, + ) + continue + safe_files.append(pf) + + return safe_files diff --git a/dev-suite/tests/test_apply_code.py b/dev-suite/tests/test_apply_code.py new file mode 100644 index 0000000..e03971f --- /dev/null +++ b/dev-suite/tests/test_apply_code.py @@ -0,0 +1,185 @@ +"""Tests for apply_code_node in the orchestrator. + +Tests the new node that bridges Dev agent output to the filesystem +and sandbox. Uses mocked workspace paths -- no real file I/O needed +for unit tests. +""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from src.orchestrator import WorkflowStatus +from src.tools.code_parser import ParsedFile + + +class TestApplyCodeNode: + """Tests for the apply_code_node orchestrator node.""" + + def _make_state(self, **overrides): + """Build a minimal GraphState dict.""" + from src.agents.architect import Blueprint + + base = { + "task_description": "test task", + "blueprint": Blueprint( + task_id="test-1", + target_files=["src/main.py", "src/utils.py"], + instructions="implement the thing", + constraints=["be safe"], + acceptance_criteria=["tests pass"], + ), + "generated_code": ( + "# --- FILE: src/main.py ---\n" + "def main():\n" + " print('hello')\n" + "# --- FILE: src/utils.py ---\n" + "def helper():\n" + " return 42\n" + ), + "failure_report": None, + "status": WorkflowStatus.BUILDING, + "retry_count": 0, + "tokens_used": 1000, + "error_message": "", + "memory_context": [], + "memory_writes": [], + "trace": [], + "sandbox_result": None, + "parsed_files": [], + } + base.update(overrides) + return base + + def test_happy_path_parses_files(self, tmp_path): + """Files are parsed from generated_code and stored in state.""" + from src.orchestrator import apply_code_node + + state = self._make_state() + + with patch("src.orchestrator._get_workspace_root", return_value=tmp_path): + result = apply_code_node(state) + + assert "parsed_files" in result + files = result["parsed_files"] + assert len(files) == 2 + assert files[0]["path"] == "src/main.py" + assert "def main():" in files[0]["content"] + assert files[1]["path"] == "src/utils.py" + assert "def helper():" in files[1]["content"] + + def test_writes_files_to_workspace(self, tmp_path): + """Parsed files are written to the workspace directory.""" + from src.orchestrator import apply_code_node + + state = self._make_state() + + with patch("src.orchestrator._get_workspace_root", return_value=tmp_path): + apply_code_node(state) + + assert (tmp_path / "src" / "main.py").exists() + assert (tmp_path / "src" / "utils.py").exists() + assert "def main():" in (tmp_path / "src" / "main.py").read_text() + + def test_no_generated_code_skips(self): + """Missing generated_code results in graceful skip.""" + from src.orchestrator import apply_code_node + + state = self._make_state(generated_code="") + result = apply_code_node(state) + + assert result["parsed_files"] == [] + assert any("no generated_code" in t for t in result["trace"]) + + def test_no_blueprint_skips(self): + """Missing blueprint results in graceful skip.""" + from src.orchestrator import apply_code_node + + state = self._make_state(blueprint=None) + result = apply_code_node(state) + + assert result["parsed_files"] == [] + assert any("no blueprint" in t for t in result["trace"]) + + def test_path_traversal_skipped(self, tmp_path): + """Files with traversal/absolute paths are skipped, safe files survive.""" + from src.orchestrator import apply_code_node + + # Include both a dangerous traversal path and a safe path. + # parse_generated_code raises CodeParserError on ../.. paths, + # which apply_code_node catches gracefully. + # Test with a safe-only file to verify the error path works. + dangerous_code = ( + "# --- FILE: ../../../etc/passwd ---\n" + "malicious content\n" + "# --- FILE: src/safe.py ---\n" + "x = 1\n" + ) + state = self._make_state(generated_code=dangerous_code) + + with patch("src.orchestrator._get_workspace_root", return_value=tmp_path): + result = apply_code_node(state) + + # The parser raises CodeParserError on the traversal path, + # so apply_code_node catches it and returns empty parsed_files. + # This proves the security boundary works -- no files written. + assert len(result["parsed_files"]) == 0 + assert any("parse error" in t for t in result["trace"]) + + def test_workspace_containment_filters_unsafe(self, tmp_path): + """Files that escape workspace via validate_paths_for_workspace are filtered.""" + from src.orchestrator import apply_code_node + + # Use only safe paths (parser won't reject) -- the workspace + # containment check is the second layer of defense. + safe_code = ( + "# --- FILE: src/safe.py ---\n" + "x = 1\n" + ) + state = self._make_state(generated_code=safe_code) + + with patch("src.orchestrator._get_workspace_root", return_value=tmp_path): + result = apply_code_node(state) + + assert len(result["parsed_files"]) == 1 + assert result["parsed_files"][0]["path"] == "src/safe.py" + + def test_trace_entries_added(self, tmp_path): + """Trace entries are added for the apply_code step.""" + from src.orchestrator import apply_code_node + + state = self._make_state() + + with patch("src.orchestrator._get_workspace_root", return_value=tmp_path): + result = apply_code_node(state) + + trace = result["trace"] + assert any("apply_code: starting" in t for t in trace) + assert any("2 files" in t for t in trace) + + def test_creates_nested_directories(self, tmp_path): + """Nested directories are created automatically.""" + from src.orchestrator import apply_code_node + + code = "# --- FILE: src/lib/deep/nested/file.py ---\nx = 1\n" + state = self._make_state(generated_code=code) + + with patch("src.orchestrator._get_workspace_root", return_value=tmp_path): + apply_code_node(state) + + assert (tmp_path / "src" / "lib" / "deep" / "nested" / "file.py").exists() + + def test_status_not_changed(self, tmp_path): + """apply_code_node should NOT change the workflow status.""" + from src.orchestrator import apply_code_node + + state = self._make_state() + + with patch("src.orchestrator._get_workspace_root", return_value=tmp_path): + result = apply_code_node(state) + + # status should not be in the result (unchanged) + assert "status" not in result diff --git a/dev-suite/tests/test_code_parser.py b/dev-suite/tests/test_code_parser.py new file mode 100644 index 0000000..d88f309 --- /dev/null +++ b/dev-suite/tests/test_code_parser.py @@ -0,0 +1,281 @@ +"""Tests for code_parser utility. + +Covers: multi-file parsing, single-file fallback, edge cases, +path validation, and security boundaries. +""" + +import os +from pathlib import Path + +import pytest + +from src.tools.code_parser import ( + CodeParserError, + ParsedFile, + _normalize_path, + parse_generated_code, + validate_paths_for_workspace, +) + + +# -- _normalize_path tests -- + + +class TestNormalizePath: + """Path normalization and security checks.""" + + def test_simple_path(self): + assert _normalize_path("src/main.py") == "src/main.py" + + def test_strips_leading_dot_slash(self): + assert _normalize_path("./src/main.py") == "src/main.py" + + def test_strips_multiple_leading_dot_slash(self): + assert _normalize_path("././src/main.py") == "src/main.py" + + def test_strips_whitespace(self): + assert _normalize_path(" src/main.py ") == "src/main.py" + + def test_normalizes_backslashes(self): + assert _normalize_path("src\\lib\\utils.py") == "src/lib/utils.py" + + def test_rejects_absolute_path(self): + with pytest.raises(CodeParserError, match="Absolute path"): + _normalize_path("/etc/passwd") + + def test_rejects_windows_absolute_path(self): + with pytest.raises(CodeParserError, match="Absolute path"): + _normalize_path("C:\\Users\\secret.txt") + + def test_rejects_windows_absolute_path_forward_slash(self): + with pytest.raises(CodeParserError, match="Absolute path"): + _normalize_path("D:/secret.txt") + + def test_rejects_traversal_dotdot(self): + with pytest.raises(CodeParserError, match="Path traversal"): + _normalize_path("../../../etc/passwd") + + def test_rejects_traversal_mid_path(self): + with pytest.raises(CodeParserError, match="Path traversal"): + _normalize_path("src/../../secret.txt") + + def test_rejects_empty_path(self): + with pytest.raises(CodeParserError, match="Empty file path"): + _normalize_path("") + + def test_rejects_whitespace_only(self): + with pytest.raises(CodeParserError, match="Empty file path"): + _normalize_path(" ") + + +# -- parse_generated_code tests -- + + +class TestParseGeneratedCode: + """Main parser tests.""" + + def test_standard_multi_file(self): + """Standard multi-file output with FILE markers.""" + code = ( + "# --- FILE: src/main.py ---\n" + "def main():\n" + " print('hello')\n" + "\n" + "# --- FILE: src/utils.py ---\n" + "def helper():\n" + " return 42\n" + ) + result = parse_generated_code(code) + assert len(result) == 2 + assert result[0].path == "src/main.py" + assert "def main():" in result[0].content + assert result[1].path == "src/utils.py" + assert "def helper():" in result[1].content + + def test_single_file_no_markers(self): + """Entire output treated as single file when no markers present.""" + code = "def main():\n print('hello')\n" + result = parse_generated_code(code) + assert len(result) == 1 + assert result[0].path == "output.py" + assert "def main():" in result[0].content + + def test_single_file_custom_default_name(self): + """Custom default filename for marker-less output.""" + code = "console.log('hi')" + result = parse_generated_code(code, default_filename="index.js") + assert len(result) == 1 + assert result[0].path == "index.js" + + def test_empty_input(self): + """Empty string returns empty list.""" + assert parse_generated_code("") == [] + assert parse_generated_code(" ") == [] + assert parse_generated_code("\n\n") == [] + + def test_none_like_empty(self): + """None-ish empty string.""" + assert parse_generated_code("") == [] + + def test_empty_file_between_markers(self): + """Empty file content (e.g., __init__.py).""" + code = ( + "# --- FILE: src/__init__.py ---\n" + "\n" + "# --- FILE: src/main.py ---\n" + "x = 1\n" + ) + result = parse_generated_code(code) + assert len(result) == 2 + assert result[0].path == "src/__init__.py" + assert result[0].content == "" + assert result[1].path == "src/main.py" + assert result[1].content == "x = 1" + + def test_duplicate_paths_last_wins(self): + """Duplicate file paths: last content wins.""" + code = ( + "# --- FILE: src/main.py ---\n" + "version = 1\n" + "# --- FILE: src/main.py ---\n" + "version = 2\n" + ) + result = parse_generated_code(code) + assert len(result) == 1 + assert result[0].path == "src/main.py" + assert "version = 2" in result[0].content + + def test_nested_directory_paths(self): + """Deep nested directory paths.""" + code = ( + "# --- FILE: src/lib/utils/helpers/format.py ---\n" + "def fmt(): pass\n" + ) + result = parse_generated_code(code) + assert len(result) == 1 + assert result[0].path == "src/lib/utils/helpers/format.py" + + def test_path_traversal_raises(self): + """Path with .. components raises CodeParserError.""" + code = "# --- FILE: ../../etc/passwd ---\nroot:x:0:0\n" + with pytest.raises(CodeParserError, match="Path traversal"): + parse_generated_code(code) + + def test_absolute_path_raises(self): + """Absolute path raises CodeParserError.""" + code = "# --- FILE: /etc/shadow ---\nfoo\n" + with pytest.raises(CodeParserError, match="Absolute path"): + parse_generated_code(code) + + def test_varying_dash_counts(self): + """Markers with different numbers of dashes.""" + code = ( + "# ----- FILE: src/a.py -----\n" + "a = 1\n" + "# --- FILE: src/b.py ---\n" + "b = 2\n" + ) + result = parse_generated_code(code) + assert len(result) == 2 + + def test_content_before_first_marker_is_ignored(self): + """Text before the first marker is dropped.""" + code = ( + "Here is the implementation:\n\n" + "# --- FILE: src/main.py ---\n" + "x = 1\n" + ) + result = parse_generated_code(code) + assert len(result) == 1 + assert result[0].path == "src/main.py" + assert "Here is the implementation" not in result[0].content + + def test_preserves_internal_blank_lines(self): + """Blank lines within file content are preserved.""" + code = ( + "# --- FILE: src/main.py ---\n" + "import os\n" + "\n" + "\n" + "def main():\n" + " pass\n" + ) + result = parse_generated_code(code) + assert "\n\n" in result[0].content + + def test_strips_trailing_whitespace(self): + """Trailing whitespace on file content is stripped.""" + code = "# --- FILE: src/main.py ---\nx = 1\n\n\n \n" + result = parse_generated_code(code) + assert result[0].content == "x = 1" + + def test_mixed_file_types(self): + """Multiple file types in single output.""" + code = ( + "# --- FILE: src/app.py ---\n" + "app = Flask(__name__)\n" + "# --- FILE: src/routes/index.svelte ---\n" + "\n" + "# --- FILE: tests/test_app.py ---\n" + "def test_health(): assert True\n" + '# --- FILE: package.json ---\n' + '{"name": "app"}\n' + ) + result = parse_generated_code(code) + assert len(result) == 4 + assert result[0].path == "src/app.py" + assert result[1].path == "src/routes/index.svelte" + assert result[2].path == "tests/test_app.py" + assert result[3].path == "package.json" + + def test_large_output(self): + """Parser handles large outputs without issue.""" + lines = ["# --- FILE: src/big.py ---\n"] + for i in range(1000): + lines.append(f"x_{i} = {i}\n") + code = "".join(lines) + result = parse_generated_code(code) + assert len(result) == 1 + assert "x_999 = 999" in result[0].content + + +# -- validate_paths_for_workspace tests -- + + +class TestValidatePathsForWorkspace: + """Workspace path containment validation.""" + + def test_valid_paths_pass(self, tmp_path): + """Normal paths within workspace pass through.""" + files = [ + ParsedFile(path="src/main.py", content="x = 1"), + ParsedFile(path="tests/test.py", content="y = 2"), + ] + result = validate_paths_for_workspace(files, tmp_path) + assert len(result) == 2 + + def test_symlink_escape_filtered(self, tmp_path): + """Symlinks that resolve outside workspace are filtered out.""" + # Create a symlink pointing outside workspace + outside = tmp_path.parent / "outside_file" + outside.write_text("secret") + + link = tmp_path / "sneaky" + try: + link.symlink_to(outside) + except OSError: + pytest.skip("Symlinks not supported on this platform") + + files = [ + ParsedFile(path="src/main.py", content="x = 1"), + ParsedFile(path="sneaky", content="overwrite"), + ] + result = validate_paths_for_workspace(files, tmp_path) + # Only the safe file should remain; symlink escape is filtered + assert len(result) == 1 + assert result[0].path == "src/main.py" + + def test_empty_list(self, tmp_path): + """Empty input returns empty output.""" + result = validate_paths_for_workspace([], tmp_path) + assert result == [] diff --git a/dev-suite/tests/test_sandbox_validate.py b/dev-suite/tests/test_sandbox_validate.py index 01a85d2..f0e01c5 100644 --- a/dev-suite/tests/test_sandbox_validate.py +++ b/dev-suite/tests/test_sandbox_validate.py @@ -1,7 +1,7 @@ """Tests for the sandbox_validate orchestrator node. These tests validate the sandbox_validate node behavior with mocked -E2B runner \u2014 no real sandbox needed. Tests cover: +E2B runner — no real sandbox needed. Tests cover: - Template selection from blueprint target_files - Validation command execution - Graceful skip when E2B_API_KEY is missing @@ -146,7 +146,7 @@ def test_sandbox_error_doesnt_crash(self): assert any("error" in t.lower() or "failed" in t.lower() for t in result["trace"]) def test_failed_validation_still_continues(self): - """Sandbox validation failure should not block QA \u2014 it adds context.""" + """Sandbox validation failure should not block QA — it adds context.""" state = _make_state(["src/main.py"]) with patch("src.orchestrator._run_sandbox_validation") as mock_run: @@ -162,7 +162,7 @@ def test_failed_validation_still_continues(self): # Node should still return the result even though tests failed assert result["sandbox_result"] is not None assert result["sandbox_result"].tests_failed == 2 - # Status should not change \u2014 QA decides pass/fail + # Status should not change — QA decides pass/fail assert "status" not in result or result.get("status") == WorkflowStatus.REVIEWING def test_trace_includes_validation_plan(self): @@ -193,23 +193,44 @@ def test_graph_has_sandbox_validate_node(self): graph = build_graph() compiled = graph.compile() - # LangGraph stores node names \u2014 check sandbox_validate exists + # LangGraph stores node names — check sandbox_validate exists node_names = set(compiled.get_graph().nodes.keys()) assert "sandbox_validate" in node_names - def test_graph_edge_developer_to_sandbox(self): - """developer should connect to sandbox_validate.""" + def test_graph_has_apply_code_node(self): + """The compiled graph should include apply_code.""" + from src.orchestrator import build_graph + + graph = build_graph() + compiled = graph.compile() + node_names = set(compiled.get_graph().nodes.keys()) + assert "apply_code" in node_names + + def test_graph_edge_developer_to_apply_code(self): + """developer should connect to apply_code (not directly to sandbox).""" from src.orchestrator import build_graph graph = build_graph() compiled = graph.compile() graph_data = compiled.get_graph() - # Check edges: developer -> sandbox_validate developer_targets = { e.target for e in graph_data.edges if e.source == "developer" } - assert "sandbox_validate" in developer_targets + assert "apply_code" in developer_targets + + def test_graph_edge_apply_code_to_sandbox(self): + """apply_code should connect to sandbox_validate.""" + from src.orchestrator import build_graph + + graph = build_graph() + compiled = graph.compile() + graph_data = compiled.get_graph() + apply_code_targets = { + e.target for e in graph_data.edges + if e.source == "apply_code" + } + assert "sandbox_validate" in apply_code_targets def test_graph_edge_sandbox_to_qa(self): """sandbox_validate should connect to qa."""