| """ |
| LangGraph Multi-Agent System Implementation |
| |
| This module implements a multi-agent system using LangGraph with the following components: |
| - LeadAgent: Orchestrates the workflow and makes decisions |
| - ResearchAgent: Handles information gathering and research tasks |
| - CodeAgent: Handles computational and code execution tasks |
| - AnswerFormatter: Formats final answers according to GAIA requirements |
| - Memory: Persistent storage for context and learning |
| """ |
|
|
| import os |
| from typing import Dict, Any, TypedDict, Literal, Annotated, List |
| from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage |
| from langgraph.graph import StateGraph, START, END |
| from langgraph.types import Command |
| import operator |
| from dotenv import load_dotenv |
|
|
| |
| from observability import ( |
| start_root_span, |
| get_callback_handler, |
| flush_traces, |
| shutdown_observability |
| ) |
|
|
| |
| load_dotenv("env.local") |
|
|
| class AgentState(TypedDict): |
| """ |
| State schema for the multi-agent system following LangGraph best practices. |
| Treats every agent node as a pure function AgentState → Command. |
| """ |
| |
| messages: Annotated[List[BaseMessage], operator.add] |
| |
| |
| draft_answer: str |
| research_notes: Annotated[str, operator.add] |
| code_outputs: Annotated[str, operator.add] |
| |
| |
| loop_counter: int |
| max_iterations: int |
| |
| |
| next: Literal["research", "code", "formatter", "__end__"] |
| |
| |
| final_answer: str |
| |
| |
| user_id: str |
| session_id: str |
|
|
|
|
| |
|
|
|
|
| def create_agent_graph(): |
| """ |
| Create the LangGraph workflow following the specified architecture: |
| lead -> research -> code -> lead (loop) -> formatter -> END |
| """ |
| from agents.lead_agent import lead_agent |
| from agents.research_agent import research_agent |
| from agents.code_agent import code_agent |
| from agents.answer_formatter import answer_formatter |
| |
| |
| workflow = StateGraph(AgentState) |
| |
| |
| workflow.add_node("lead", lead_agent) |
| workflow.add_node("research", research_agent) |
| workflow.add_node("code", code_agent) |
| workflow.add_node("formatter", answer_formatter) |
| |
| |
| workflow.add_edge(START, "lead") |
| |
| |
| def route_from_lead(state: AgentState) -> str: |
| """Route from lead agent based on the 'next' field""" |
| |
| if (state.get("loop_counter", 0) >= state.get("max_iterations", 3) or |
| state.get("final_answer")): |
| return "__end__" |
| return state.get("next", "research") |
| |
| workflow.add_conditional_edges( |
| "lead", |
| route_from_lead, |
| { |
| "research": "research", |
| "code": "code", |
| "formatter": "formatter", |
| "__end__": END |
| } |
| ) |
| |
| |
| workflow.add_edge("research", "lead") |
| workflow.add_edge("code", "lead") |
| workflow.add_edge("formatter", END) |
| |
| return workflow |
|
|
|
|
| async def run_agent_system( |
| query: str, |
| user_id: str = "default_user", |
| session_id: str = "default_session", |
| max_iterations: int = 3 |
| ) -> str: |
| """ |
| Main entry point for the agent system. |
| |
| Args: |
| query: User question to answer |
| user_id: User identifier for tracing |
| session_id: Session identifier for tracing |
| max_iterations: Maximum number of research/code loops |
| |
| Returns: |
| Final formatted answer |
| """ |
| try: |
| |
| callback_handler = get_callback_handler() |
| |
| |
| with start_root_span( |
| name="user-request", |
| user_id=user_id, |
| session_id=session_id, |
| metadata={"query": query, "max_iterations": max_iterations} |
| ) as root_span: |
| |
| |
| workflow = create_agent_graph() |
| app = workflow.compile() |
| |
| |
| initial_state: AgentState = { |
| "messages": [HumanMessage(content=query)], |
| "draft_answer": "", |
| "research_notes": "", |
| "code_outputs": "", |
| "loop_counter": 0, |
| "max_iterations": max_iterations, |
| "next": "research", |
| "final_answer": "", |
| "user_id": user_id, |
| "session_id": session_id |
| } |
| |
| |
| if callback_handler: |
| final_state = await app.ainvoke( |
| initial_state, |
| config={"callbacks": [callback_handler]} |
| ) |
| else: |
| print("Warning: Running without Langfuse tracing") |
| final_state = await app.ainvoke(initial_state) |
| |
| |
| if root_span: |
| root_span.update_trace(output={"final_answer": final_state["final_answer"]}) |
| |
| return final_state["final_answer"] |
| |
| except Exception as e: |
| print(f"Error in agent system: {e}") |
| return f"I apologize, but I encountered an error while processing your query: {str(e)}" |
| |
| finally: |
| |
| flush_traces(background=True) |
|
|
|
|
| if __name__ == "__main__": |
| import asyncio |
| |
| |
| async def test(): |
| result = await run_agent_system( |
| "What is the capital of Maharashtra?", |
| user_id="test_user", |
| session_id="test_session" |
| ) |
| print(f"Final Answer: {result}") |
| |
| asyncio.run(test()) |