Spaces:
Sleeping
Sleeping
File size: 5,208 Bytes
5b89d45 a3bdcf1 5b89d45 a3bdcf1 5b89d45 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
from typing import TypedDict, Annotated, Sequence
import operator
from langchain_core.messages import BaseMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from code_chatbot.core.rate_limiter import get_rate_limiter
# Define State
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
def create_agent_graph(llm, retriever, repo_name: str = "Codebase", repo_dir: str = ".", provider: str = "gemini", code_analyzer=None):
"""
Creates a LangGraph for the Code Chatbot.
Enables: Search -> Read File -> Reason -> Search -> Answer.
Uses adaptive rate limiting to maximize usage within free tier.
"""
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="The query string to search for in the codebase.")
# 1. Wrap Retriever as a Tool
@tool("search_codebase", args_schema=SearchInput)
def search_codebase(query: str):
"""
Search the codebase for code snippets relevant to the query.
Returns top 5 most relevant code sections with file paths.
Use this when you need to find specific functions, classes, or implementations.
You can call this multiple times with different queries to gather comprehensive information.
"""
docs = retriever.invoke(query)
result = ""
# Increased to 5 results * 2000 chars = ~10000 chars (~2500 tokens) - much better context
for i, doc in enumerate(docs[:5]):
fp = doc.metadata.get('file_path', 'unknown')
# Get relative path for cleaner display
import os
display_path = os.path.basename(fp) if fp != 'unknown' else 'unknown'
content = doc.page_content[:2000] # Increased from 1000 to 2000
result += f"--- Result {i+1}: {display_path} ---\n{content}\n\n"
if not result:
return "No relevant code found. Try a different search query or use list_files to explore the codebase structure."
return result
# 2. Import File System Tools
from code_chatbot.agents.tools import get_filesystem_tools, get_call_graph_tools
# 3. Combine Tools
fs_tools = get_filesystem_tools(repo_dir)
call_graph_tools = get_call_graph_tools(code_analyzer) if code_analyzer else []
tools = fs_tools + [search_codebase] + call_graph_tools
# 4. Bind to LLM
# Note: Not all LLMs support bind_tools cleanly, but Gemini/Groq(Llama3) do via LangChain
model_with_tools = llm.bind_tools(tools)
# 5. Define Nodes
# Get rate limiter for this provider
rate_limiter = get_rate_limiter(provider)
def agent(state):
messages = state["messages"]
import logging
logger = logging.getLogger(__name__)
# Smart adaptive delay - only waits when approaching rate limit
rate_limiter.wait_if_needed()
# Retry loop for 429 errors
# FAIL FAST: Only retry twice (5s, 10s) = 15s max delay.
# If it still fails, we want to bubble up to rag.py to trigger Linear RAG fallback.
for i in range(2):
try:
response = model_with_tools.invoke(messages)
# Track usage for statistics (if available in response metadata)
try:
usage = getattr(response, 'usage_metadata', None)
if usage:
rate_limiter.record_usage(
input_tokens=getattr(usage, 'input_tokens', 0),
output_tokens=getattr(usage, 'output_tokens', 0)
)
except:
pass
return {"messages": [response]}
except Exception as e:
# Catch both Gemini 429 and Groq Overloaded errors
if any(err in str(e) for err in ["429", "RESOURCE_EXHAUSTED", "rate_limit_exceeded"]):
import time
wait = 5 * (2 ** i) # 5, 10
logger.warning(f"⚠️ Rate limit hit. Cooling down for {wait}s...")
time.sleep(wait)
if i == 1: raise e
else:
raise e
return {"messages": []} # Should not reach here
tool_node = ToolNode(tools)
# 6. Define Limits (Graph recursion limit is set in .compile(), but we can add logic here)
# 7. Build Graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
# Conditional Edge
def should_continue(state):
last_message = state["messages"][-1]
# If there is no tool call, then we finish
if not last_message.tool_calls:
return END
# Otherwise context switch to tools
return "tools"
workflow.add_conditional_edges(
"agent",
should_continue,
)
workflow.add_edge("tools", "agent")
return workflow.compile()
|