diff --git a/dev-suite/tests/test_e2e_integration.py b/dev-suite/tests/test_e2e_integration.py new file mode 100644 index 0000000..4c10aac --- /dev/null +++ b/dev-suite/tests/test_e2e_integration.py @@ -0,0 +1,374 @@ +"""E2E integration test -- full pipeline with real LLMs and MCP tools. + +Issue #86: Proves the complete orchestrator pipeline works end-to-end: + Architect (Gemini) -> Developer (Claude + tools) -> apply_code -> + sandbox_validate (E2B) -> QA (Claude + tools) -> flush_memory + +Prerequisites: + - GOOGLE_API_KEY in dev-suite/.env (Gemini -- Architect) + - ANTHROPIC_API_KEY in dev-suite/.env (Claude -- Dev + QA) + - Node.js + npx on PATH (Filesystem MCP server) + - E2B_API_KEY in dev-suite/.env (optional -- sandbox skipped without it) + +Usage: + cd dev-suite + uv run --group dev --group api pytest tests/test_e2e_integration.py -v -s -m integration +""" + +from __future__ import annotations + +import json +import os +import shutil +import time +from pathlib import Path + +import pytest + +import src.orchestrator as orchestrator +from src.orchestrator import AgentState, WorkflowStatus, run_task + + +# ============================================================ +# Helpers +# ============================================================ + + +def _has_node_in_trace(trace: list[str], node_name: str) -> bool: + """Check if a graph node appears in the execution trace.""" + return any(node_name in entry for entry in trace) + + +def _print_result(result: AgentState, elapsed: float): + """Diagnostic dump on test completion or failure.""" + print("\n" + "=" * 70) + print(" E2E INTEGRATION TEST RESULT") + print("=" * 70) + print(f" Status: {result.status.value}") + print(f" Tokens used: {result.tokens_used:,}") + print(f" Retries: {result.retry_count}") + print(f" Elapsed: {elapsed:.1f}s") + print() + + if result.blueprint: + print(" BLUEPRINT:") + print(f" Task ID: {result.blueprint.task_id}") + print(f" Target files: {', '.join(result.blueprint.target_files)}") + print(f" Criteria: {len(result.blueprint.acceptance_criteria)}") + print() + + if result.generated_code: + lines = result.generated_code.strip().split("\n") + print(f" GENERATED CODE ({len(lines)} lines):") + for line in lines[:15]: + print(f" {line}") + if len(lines) > 15: + print(f" ... ({len(lines) - 15} more lines)") + print() + + if result.parsed_files: + print(f" PARSED FILES ({len(result.parsed_files)}):") + for pf in result.parsed_files: + print(f" {pf['path']} ({len(pf['content'])} chars)") + print() + + if result.tool_calls_log: + print(f" TOOL CALLS ({len(result.tool_calls_log)}):") + for tc in result.tool_calls_log[:10]: + icon = "ok" if tc.get("success") else "FAIL" + print(f" [{icon}] {tc.get('agent', '?')}: {tc.get('tool', '?')}") + if len(result.tool_calls_log) > 10: + print(f" ... ({len(result.tool_calls_log) - 10} more)") + print() + + if result.sandbox_result: + sr = result.sandbox_result + print(" SANDBOX RESULT:") + print(f" Exit code: {sr.exit_code}") + print(f" Tests passed: {sr.tests_passed}") + print(f" Tests failed: {sr.tests_failed}") + if sr.errors: + print(f" Errors: {', '.join(sr.errors[:3])}") + print() + + if result.failure_report: + fr = result.failure_report + print(" QA REPORT:") + print(f" Verdict: {fr.status}") + print(f" Passed: {fr.tests_passed}") + print(f" Failed: {fr.tests_failed}") + if fr.errors: + print(f" Errors: {', '.join(fr.errors[:3])}") + print() + + if result.memory_writes: + print(f" MEMORY WRITES ({len(result.memory_writes)}):") + for mw in result.memory_writes[:5]: + print(f" [{mw.get('tier', '?')}] {mw.get('content', '')[:80]}") + print() + + if result.trace: + print(f" TRACE ({len(result.trace)} entries):") + for t in result.trace: + print(f" -> {t}") + print() + + if result.error_message: + print(f" ERROR: {result.error_message}") + print() + + print("=" * 70) + + +# ============================================================ +# Fixtures +# ============================================================ + + +@pytest.fixture +def workspace(tmp_path): + """Seed a minimal Python project for the agents to work on. + + Creates: + - utils.py: stub file (agent must add the greet function) + - test_utils.py: test expecting greet('World') == 'Hello, World!' + """ + utils = tmp_path / "utils.py" + utils.write_text('"""Utility functions."""\n', encoding="utf-8") + + test_utils = tmp_path / "test_utils.py" + test_utils.write_text( + '"""Tests for utils module."""\n' + "from utils import greet\n" + "\n\n" + "def test_greet_basic():\n" + ' assert greet("World") == "Hello, World!"\n' + "\n\n" + "def test_greet_name():\n" + ' assert greet("Alice") == "Hello, Alice!"\n', + encoding="utf-8", + ) + + return tmp_path + + +@pytest.fixture +def mcp_config(workspace): + """Seed mcp-config.json pointing Filesystem MCP at the workspace. + + Enables tool binding for Dev (read/write) and QA (read-only). + Requires Node.js + npx on PATH. + """ + has_npx = shutil.which("npx") is not None + if not has_npx: + # No npx -- write a config with no servers so tools are empty + # but the file exists (init_tools_config won't bail on missing file) + config = {"servers": {}} + else: + config = { + "servers": { + "filesystem": { + "package": "@modelcontextprotocol/server-filesystem", + "version": "2026.1.14", + "command": "npx", + "args": [ + "-y", + "@modelcontextprotocol/server-filesystem@2026.1.14", + str(workspace), + ], + "env": {}, + } + } + } + + config_path = workspace / "mcp-config.json" + config_path.write_text(json.dumps(config, indent=2), encoding="utf-8") + return config_path + + +# ============================================================ +# Test class +# ============================================================ + + +@pytest.mark.integration +class TestE2EIntegration: + """Full pipeline integration test with real LLMs. + + Gating: @pytest.mark.integration -- excluded from CI via -m "not integration". + Keys loaded from dev-suite/.env by conftest.py's load_dotenv(). + """ + + @pytest.fixture(autouse=True) + def _check_keys(self): + """Skip if required API keys are missing.""" + if not os.getenv("ANTHROPIC_API_KEY"): + pytest.skip("ANTHROPIC_API_KEY not set -- set in dev-suite/.env") + if not os.getenv("GOOGLE_API_KEY"): + pytest.skip("GOOGLE_API_KEY not set -- set in dev-suite/.env") + + @pytest.fixture(autouse=True) + def _configure_env(self, workspace, mcp_config, monkeypatch): + """Point the orchestrator at the temp workspace with generous budgets. + + Note: TOKEN_BUDGET and MAX_RETRIES are module-level globals in + src.orchestrator, evaluated at import time via _safe_int(). Setting + os.environ alone won't affect them -- we must patch the module + globals directly. + """ + monkeypatch.setenv("WORKSPACE_ROOT", str(workspace)) + monkeypatch.setenv("TOKEN_BUDGET", "100000") + monkeypatch.setenv("MAX_RETRIES", "3") + # Patch module globals directly (evaluated at import time) + monkeypatch.setattr(orchestrator, "TOKEN_BUDGET", 100000) + monkeypatch.setattr(orchestrator, "MAX_RETRIES", 3) + # Store workspace on self for assertions + self._workspace = workspace + self._has_e2b = bool(os.getenv("E2B_API_KEY")) + self._has_npx = shutil.which("npx") is not None + + def test_workspace_seeded_correctly(self, workspace): + """Sanity check: fixture creates the expected seed files.""" + assert (workspace / "utils.py").exists() + assert (workspace / "test_utils.py").exists() + assert (workspace / "mcp-config.json").exists() + + test_content = (workspace / "test_utils.py").read_text() + assert "greet" in test_content + assert "Hello, World!" in test_content + + def test_full_pipeline(self, workspace): + """Run the complete orchestrator pipeline with real LLMs. + + Asserts structural completion -- every graph node executed and + produced output. Does NOT assert on code quality or LLM content, + since model output is non-deterministic. + """ + task = ( + "Add a greet(name) function to utils.py that takes a name " + "parameter (str) and returns the string 'Hello, {name}!'. " + "For example, greet('World') should return 'Hello, World!'. " + "Include a type hint and a docstring." + ) + + print(f"\n Task: {task}") + print(f" Workspace: {workspace}") + print(f" E2B available: {self._has_e2b}") + print(f" npx available: {self._has_npx}") + print(" Running pipeline...\n") + + start = time.time() + result = run_task( + task, + enable_tracing=bool(os.getenv("LANGFUSE_PUBLIC_KEY")), + ) + elapsed = time.time() - start + + # Always print diagnostics + _print_result(result, elapsed) + + # -- Hard assertions: pipeline completed without crash -- + + assert isinstance(result, AgentState), "run_task must return AgentState" + assert result.tokens_used > 0, "Real API calls should consume tokens" + + # -- Architect produced a Blueprint -- + + assert result.blueprint is not None, "Architect should produce a Blueprint" + assert len(result.blueprint.target_files) > 0, "Blueprint should have target files" + assert len(result.blueprint.instructions) > 0, "Blueprint should have instructions" + + # -- Developer produced code -- + + assert len(result.generated_code) > 0, "Developer should generate code" + + # -- QA produced a report -- + + assert result.failure_report is not None, "QA should produce a failure report" + + # -- Trace covers all graph nodes -- + + trace = result.trace + assert _has_node_in_trace(trace, "architect"), "Trace should show architect ran" + assert _has_node_in_trace(trace, "developer"), "Trace should show developer ran" + assert _has_node_in_trace(trace, "apply_code"), "Trace should show apply_code ran" + assert _has_node_in_trace(trace, "sandbox_validate"), "Trace should show sandbox_validate ran" + assert _has_node_in_trace(trace, "qa"), "Trace should show qa ran" + assert _has_node_in_trace(trace, "flush_memory"), "Trace should show flush_memory ran" + + # -- Memory layer engaged -- + + assert len(result.memory_writes) > 0, "Pipeline should produce memory writes" + + # -- Tool binding (conditional on npx) -- + + if self._has_npx: + assert len(result.tool_calls_log) > 0, ( + "With npx available, agents should have used filesystem tools" + ) + # Verify Dev used tools + dev_calls = [ + tc for tc in result.tool_calls_log + if tc.get("agent") == "developer" + ] + assert len(dev_calls) > 0, "Developer should have made tool calls" + dev_tool_names = {tc.get("tool") for tc in dev_calls} + assert dev_tool_names & { + "filesystem_read", "filesystem_write", "filesystem_list" + }, ( + f"Developer should use filesystem tools, got: {dev_tool_names}" + ) + + # -- Files written to workspace by apply_code -- + + assert result.parsed_files, "apply_code should parse at least one file" + workspace_root = workspace.resolve() + for pf in result.parsed_files: + file_path = (workspace / pf["path"]).resolve() + # Verify file stays within workspace (no path traversal) + assert file_path == workspace_root or workspace_root in file_path.parents, ( + f"Parsed file escaped workspace: {pf['path']}" + ) + assert file_path.exists(), ( + f"Parsed file should exist on disk: {pf['path']}" + ) + # Verify on-disk content matches what apply_code wrote + disk_content = file_path.read_text(encoding="utf-8").strip() + parsed_content = pf["content"].strip() + assert disk_content == parsed_content, ( + f"Disk content should match parsed content for {pf['path']}" + ) + + # -- Sandbox validation (conditional on E2B key) -- + + if self._has_e2b: + assert result.sandbox_result is not None, ( + "With E2B_API_KEY set, sandbox_validate should produce a result" + ) + else: + # Sandbox skipped -- trace should indicate it + assert _has_node_in_trace(trace, "sandbox_validate"), ( + "sandbox_validate node should still appear in trace even when skipped" + ) + + # -- Terminal state -- + + assert result.status in ( + WorkflowStatus.PASSED, + WorkflowStatus.FAILED, + WorkflowStatus.ESCALATED, + ), f"Pipeline should reach a terminal state, got: {result.status}" + + # -- Log outcome -- + + if result.status == WorkflowStatus.PASSED: + print( + f"\n >>> PIPELINE PASSED" + f" ({elapsed:.1f}s, {result.tokens_used:,} tokens)" + ) + else: + print(f"\n >>> PIPELINE ended: {result.status.value} ({elapsed:.1f}s)") + print( + f" >>> Retries: {result.retry_count}," + f" Error: {result.error_message or 'none'}" + )