|
|
|
|
|
import chainlit as cl |
|
|
from agent import make_graph |
|
|
|
|
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
from langchain_core.messages import AIMessageChunk, HumanMessage |
|
|
|
|
|
from chainlit.input_widget import Select, Slider |
|
|
|
|
|
from typing import Optional |
|
|
import os, uuid, base64 |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
_ : bool = load_dotenv() |
|
|
|
|
|
|
|
|
async def process_image(image: cl.Image): |
|
|
""" |
|
|
Processes an image file, reads its data, and converts it to a base64 encoded string. |
|
|
""" |
|
|
try: |
|
|
with open(image.path, "rb") as image_file: |
|
|
image_data = image_file.read() |
|
|
base64_image = base64.b64encode(image_data).decode("utf-8") |
|
|
return { |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/{image.mime.split('/')[-1]};base64,{base64_image}" |
|
|
} |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"Error reading image file: {e}") |
|
|
return {"type": "text", "text": f"Error processing image {image.name}."} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cl.oauth_callback |
|
|
def oauth_callback( |
|
|
provider_id: str, |
|
|
token: str, |
|
|
raw_user_data: dict[str, str], |
|
|
default_user: cl.User, |
|
|
) -> Optional[cl.User]: |
|
|
|
|
|
return default_user |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cl.set_starters |
|
|
async def set_starters(): |
|
|
return [ |
|
|
cl.Starter( |
|
|
label="LangGraph Agent Creation", |
|
|
message="Create an Agent in LangGraph which can search the web using Tavily.", |
|
|
icon="/public/msg_icons/chatbot.png", |
|
|
), |
|
|
|
|
|
cl.Starter( |
|
|
label="Explain MCP", |
|
|
message="Explain Model Context Protocol (MCP) to a non-tech person.", |
|
|
icon="/public/msg_icons/usb.png", |
|
|
), |
|
|
cl.Starter( |
|
|
label="Composio Tools Integration", |
|
|
message="How can I connect Composio tools to my agent?", |
|
|
icon="/public/msg_icons/tools.png", |
|
|
), |
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cl.set_chat_profiles |
|
|
async def chat_profile(): |
|
|
return [ |
|
|
cl.ChatProfile( |
|
|
name="Agent Mode", |
|
|
markdown_description= "Ideal for complex tasks like brainstorming, code generation, and web apps creation." |
|
|
|
|
|
), |
|
|
cl.ChatProfile( |
|
|
name="Chat Mode", |
|
|
markdown_description="Suited for quick information retrieval and answering questions from the provided documentations." |
|
|
|
|
|
), |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cl.on_chat_start |
|
|
async def on_chat_start(): |
|
|
thread_id = f"thread-{uuid.uuid4()}" |
|
|
|
|
|
cl.user_session.set("thread_id", thread_id) |
|
|
|
|
|
|
|
|
settings = await cl.ChatSettings( |
|
|
[ |
|
|
Select( |
|
|
id="model", |
|
|
label="Gemini - Model", |
|
|
values=[ |
|
|
"gemini-2.5-flash", |
|
|
"gemini-2.5-pro", |
|
|
"gemini-2.5-flash-lite" |
|
|
], |
|
|
initial_index=0, |
|
|
), |
|
|
Slider( |
|
|
id="temperature", |
|
|
label="Temperature", |
|
|
initial=1, |
|
|
min=0, |
|
|
max=2, |
|
|
step=0.1, |
|
|
), |
|
|
] |
|
|
).send() |
|
|
|
|
|
|
|
|
model = ChatGoogleGenerativeAI( |
|
|
model=settings["model"], |
|
|
api_key=os.getenv("GOOGLE_API_KEY"), |
|
|
temperature=settings["temperature"] |
|
|
) |
|
|
|
|
|
|
|
|
cl.user_session.set("model", model) |
|
|
cl.user_session.set("temperature", settings["temperature"]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cl.on_settings_update |
|
|
async def on_settings_update(settings: dict): |
|
|
|
|
|
cl.user_session.set("model", settings.get("model")) |
|
|
cl.user_session.set("temperature", settings.get("temperature")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cl.on_message |
|
|
async def on_message(message: cl.Message): |
|
|
thread_id = cl.user_session.get("thread_id") |
|
|
config = {"configurable": {"thread_id": thread_id}} |
|
|
|
|
|
|
|
|
model = cl.user_session.get("model") |
|
|
answer_mode = cl.user_session.get("chat_profile", "Agent Mode") |
|
|
|
|
|
|
|
|
content = [] |
|
|
|
|
|
|
|
|
if message.content: |
|
|
content.append({"type": "text", "text": message.content}) |
|
|
|
|
|
|
|
|
image_elements = [element for element in message.elements if "image" in element.mime] |
|
|
for image in image_elements: |
|
|
if image.path: |
|
|
content.append(await process_image(image)) |
|
|
else: |
|
|
print(f"Image {image.name} has no content and no path.") |
|
|
content.append({"type": "text", "text": f"Image {image.name} could not be processed."}) |
|
|
|
|
|
msg = cl.Message(content="") |
|
|
|
|
|
try: |
|
|
async with make_graph(model, answer_mode) as agent: |
|
|
async for stream, _ in agent.astream( |
|
|
{"messages": HumanMessage(content=content)}, |
|
|
config=config, |
|
|
stream_mode="messages" |
|
|
): |
|
|
|
|
|
if isinstance(stream, AIMessageChunk) and stream.content: |
|
|
await msg.stream_token(stream.content.replace("```", "\n```")) |
|
|
await msg.send() |
|
|
|
|
|
except Exception as e: |
|
|
await cl.Message(content=f"Error during agent invocation: {e}").send() |
|
|
|