| """ |
| End-to-End Integration Test for SPARKNET Phase 2B |
| Tests the complete workflow with: |
| - PlannerAgent with memory-informed planning |
| - CriticAgent with VISTA validation |
| - MemoryAgent with ChromaDB storage |
| - LangChain tools integrated with executor |
| """ |
|
|
| import asyncio |
| from src.llm.langchain_ollama_client import get_langchain_client |
| from src.agents.planner_agent import PlannerAgent |
| from src.agents.critic_agent import CriticAgent |
| from src.agents.memory_agent import create_memory_agent |
| from src.workflow.langgraph_workflow import create_workflow |
| from src.workflow.langgraph_state import ScenarioType |
|
|
|
|
| async def test_full_workflow_integration(): |
| """Test complete workflow with all Phase 2B components.""" |
| print("=" * 80) |
| print("PHASE 2B INTEGRATION TEST: Full Workflow with Memory & Tools") |
| print("=" * 80) |
| print() |
|
|
| |
| print("Step 1: Initializing LangChain client...") |
| client = get_langchain_client(default_complexity='standard', enable_monitoring=False) |
| print("β LangChain client ready") |
| print() |
|
|
| print("Step 2: Initializing agents...") |
| planner = PlannerAgent(llm_client=client) |
| print("β PlannerAgent with LangChain chains") |
|
|
| critic = CriticAgent(llm_client=client) |
| print("β CriticAgent with VISTA validation") |
|
|
| memory = create_memory_agent(llm_client=client) |
| print("β MemoryAgent with ChromaDB") |
| print() |
|
|
| print("Step 3: Creating integrated workflow...") |
| workflow = create_workflow( |
| llm_client=client, |
| planner_agent=planner, |
| critic_agent=critic, |
| memory_agent=memory, |
| quality_threshold=0.85, |
| max_iterations=2 |
| ) |
| print("β SparknetWorkflow with StateGraph") |
| print() |
|
|
| |
| print("=" * 80) |
| print("TEST 1: Patent Wake-Up Scenario (with tools)") |
| print("=" * 80) |
| print() |
|
|
| task_description = """ |
| Analyze dormant patent US20210123456 on 'AI-powered drug discovery platform'. |
| Identify commercialization opportunities and create outreach brief. |
| """ |
|
|
| print(f"Task: {task_description.strip()}") |
| print(f"Scenario: patent_wakeup") |
| print() |
|
|
| print("Running workflow...") |
| result1 = await workflow.run( |
| task_description=task_description, |
| scenario=ScenarioType.PATENT_WAKEUP, |
| task_id="test_patent_001" |
| ) |
|
|
| print("\nWorkflow Results:") |
| print(f" Status: {result1.status}") |
| print(f" Success: {result1.success}") |
| print(f" Execution Time: {result1.execution_time_seconds:.2f}s") |
| print(f" Iterations: {result1.iterations_used}") |
| if result1.quality_score: |
| print(f" Quality Score: {result1.quality_score:.2f}") |
| if result1.error: |
| print(f" Error: {result1.error[:100]}...") |
| print(f" Subtasks Created: {len(result1.subtasks)}") |
|
|
| |
| if "executor" in result1.agent_outputs: |
| executor_output = result1.agent_outputs["executor"] |
| tools_available = executor_output.get("tools_available", []) |
| tools_called = executor_output.get("tools_called", []) |
| print(f"\n Tools Available: {len(tools_available)}") |
| print(f" Tools: {', '.join(tools_available)}") |
| if tools_called: |
| print(f" Tools Called: {', '.join(tools_called)}") |
|
|
| |
| if "memory_context" in result1.agent_outputs: |
| memory_contexts = result1.agent_outputs["memory_context"] |
| print(f"\n Memory Contexts Retrieved: {len(memory_contexts)}") |
|
|
| print() |
|
|
| |
| print("=" * 80) |
| print("TEST 2: Similar Patent Task (should use memory from Test 1)") |
| print("=" * 80) |
| print() |
|
|
| task_description_2 = """ |
| Analyze patent US20210789012 on 'Machine learning for pharmaceutical research'. |
| Find commercialization potential. |
| """ |
|
|
| print(f"Task: {task_description_2.strip()}") |
| print(f"Scenario: patent_wakeup") |
| print() |
|
|
| print("Running workflow...") |
| result2 = await workflow.run( |
| task_description=task_description_2, |
| scenario=ScenarioType.PATENT_WAKEUP, |
| task_id="test_patent_002" |
| ) |
|
|
| print("\nWorkflow Results:") |
| print(f" Status: {result2.status}") |
| print(f" Success: {result2.success}") |
| print(f" Execution Time: {result2.execution_time_seconds:.2f}s") |
| if result2.quality_score: |
| print(f" Quality Score: {result2.quality_score:.2f}") |
| if result2.error: |
| print(f" Error (likely GPU memory): {result2.error[:80]}...") |
|
|
| |
| if "memory_context" in result2.agent_outputs: |
| memory_contexts = result2.agent_outputs["memory_context"] |
| print(f"\n Memory Contexts Retrieved: {len(memory_contexts)}") |
| print(" β Memory system working: Past experience informed planning!") |
| if memory_contexts: |
| print(f" Example memory: {memory_contexts[0]['content'][:100]}...") |
|
|
| print() |
|
|
| |
| print("=" * 80) |
| print("TEST 3: Agreement Safety Scenario (different tool set)") |
| print("=" * 80) |
| print() |
|
|
| task_description_3 = """ |
| Review collaboration agreement for GDPR compliance. |
| Identify potential risks and provide recommendations. |
| """ |
|
|
| print(f"Task: {task_description_3.strip()}") |
| print(f"Scenario: agreement_safety") |
| print() |
|
|
| print("Running workflow...") |
| result3 = await workflow.run( |
| task_description=task_description_3, |
| scenario=ScenarioType.AGREEMENT_SAFETY, |
| task_id="test_agreement_001" |
| ) |
|
|
| print("\nWorkflow Results:") |
| print(f" Status: {result3.status}") |
| print(f" Success: {result3.success}") |
| print(f" Execution Time: {result3.execution_time_seconds:.2f}s") |
| if result3.quality_score: |
| print(f" Quality Score: {result3.quality_score:.2f}") |
| if result3.error: |
| print(f" Error: {result3.error[:80]}...") |
|
|
| |
| if "executor" in result3.agent_outputs: |
| executor_output = result3.agent_outputs["executor"] |
| tools_available = executor_output.get("tools_available", []) |
| print(f"\n Tools Available: {', '.join(tools_available)}") |
| print(" β Tool selection working: Different tools for different scenarios!") |
|
|
| print() |
|
|
| |
| print("=" * 80) |
| print("MEMORY SYSTEM STATISTICS") |
| print("=" * 80) |
|
|
| stats = memory.get_collection_stats() |
| print(f"\nChromaDB Collections:") |
| print(f" Episodic Memory: {stats['episodic_count']} episodes") |
| print(f" Semantic Memory: {stats['semantic_count']} documents") |
| print(f" Stakeholder Profiles: {stats['stakeholders_count']} profiles") |
| print() |
|
|
| |
| print("=" * 80) |
| print("INTEGRATION TEST SUMMARY") |
| print("=" * 80) |
| print() |
|
|
| |
| memory_retrieved_1 = "memory_context" in result1.agent_outputs |
| subtasks_created_1 = len(result1.subtasks) > 0 |
| tools_loaded_1 = "executor" in result1.agent_outputs and "tools_available" in result1.agent_outputs.get("executor", {}) |
|
|
| all_tests = [ |
| ("Planning with Memory Retrieval", memory_retrieved_1 and subtasks_created_1), |
| ("Tool Loading and Binding", tools_loaded_1), |
| ("Memory Storage System", stats['episodic_count'] >= 0), |
| ("Workflow Structure Complete", len(result1.subtasks) > 0), |
| ] |
|
|
| |
|
|
| passed = sum(1 for _, success in all_tests if success) |
| total = len(all_tests) |
|
|
| for test_name, success in all_tests: |
| status = "β PASSED" if success else "β FAILED" |
| print(f"{status}: {test_name}") |
|
|
| print() |
| print(f"Total: {passed}/{total} tests passed ({passed/total*100:.1f}%)") |
|
|
| if passed == total: |
| print("\n" + "=" * 80) |
| print("β PHASE 2B INTEGRATION COMPLETE!") |
| print("=" * 80) |
| print() |
| print("All components working together:") |
| print(" β PlannerAgent with LangChain chains") |
| print(" β CriticAgent with VISTA validation") |
| print(" β MemoryAgent with ChromaDB") |
| print(" β LangChain tools integrated") |
| print(" β Cyclic workflow with quality refinement") |
| print(" β Memory-informed planning") |
| print(" β Scenario-specific tool selection") |
| print() |
| print("Ready for Phase 2C: Scenario-specific agent implementation!") |
| else: |
| print(f"\nβ {total - passed} test(s) failed") |
|
|
| return passed == total |
|
|
|
|
| async def test_memory_retrieval(): |
| """Test memory retrieval specifically.""" |
| print("\n") |
| print("=" * 80) |
| print("BONUS TEST: Memory Retrieval System") |
| print("=" * 80) |
| print() |
|
|
| client = get_langchain_client(default_complexity='standard', enable_monitoring=False) |
| memory = create_memory_agent(llm_client=client) |
|
|
| |
| print("Storing test episodes...") |
| await memory.store_episode( |
| task_id="memory_test_001", |
| task_description="Analyze AI patent for commercialization", |
| scenario=ScenarioType.PATENT_WAKEUP, |
| workflow_steps=[ |
| {"id": "step1", "description": "Extract patent claims"}, |
| {"id": "step2", "description": "Identify market opportunities"} |
| ], |
| outcome={"success": True, "matches": 5}, |
| quality_score=0.92, |
| execution_time=45.3, |
| iterations_used=1 |
| ) |
| print("β Episode 1 stored") |
|
|
| await memory.store_episode( |
| task_id="memory_test_002", |
| task_description="Review drug discovery patent portfolio", |
| scenario=ScenarioType.PATENT_WAKEUP, |
| workflow_steps=[ |
| {"id": "step1", "description": "Analyze patent family"}, |
| {"id": "step2", "description": "Assess market potential"} |
| ], |
| outcome={"success": True, "matches": 3}, |
| quality_score=0.88, |
| execution_time=52.1, |
| iterations_used=2 |
| ) |
| print("β Episode 2 stored") |
| print() |
|
|
| |
| print("Testing retrieval...") |
| results = await memory.get_similar_episodes( |
| task_description="Analyze pharmaceutical AI patent", |
| scenario=ScenarioType.PATENT_WAKEUP, |
| min_quality_score=0.85, |
| top_k=2 |
| ) |
|
|
| print(f"β Retrieved {len(results)} similar episodes") |
| if results: |
| print(f"\nTop match:") |
| print(f" Quality Score: {results[0]['metadata'].get('quality_score', 0):.2f}") |
| print(f" Scenario: {results[0]['metadata'].get('scenario')}") |
| print(f" Content: {results[0]['content'][:150]}...") |
|
|
| print() |
| return len(results) > 0 |
|
|
|
|
| async def main(): |
| """Run all integration tests.""" |
| print("\n") |
| print("#" * 80) |
| print("# SPARKNET PHASE 2B: END-TO-END INTEGRATION TEST") |
| print("#" * 80) |
| print("\n") |
|
|
| |
| success = await test_full_workflow_integration() |
|
|
| |
| memory_success = await test_memory_retrieval() |
|
|
| print("\n") |
| print("#" * 80) |
| print("# TEST SUITE COMPLETE") |
| print("#" * 80) |
| print() |
|
|
| if success and memory_success: |
| print("β ALL INTEGRATION TESTS PASSED!") |
| print() |
| print("Phase 2B Status: COMPLETE") |
| print() |
| print("Next Steps:") |
| print(" 1. Implement scenario-specific agents (Phase 2C)") |
| print(" 2. Add LangSmith monitoring") |
| print(" 3. Create production deployment configuration") |
| else: |
| print("Some tests failed. Review logs above.") |
|
|
| print() |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|