Spaces:
Runtime error
Runtime error
| # app.py — MCP POC using OpenRouter for LLM (replaces local HF model) | |
| # Place this file next to config.py. Do NOT store secrets here. | |
| from mcp.server.fastmcp import FastMCP | |
| from typing import Optional, List, Tuple, Any, Dict | |
| import requests | |
| import os | |
| import gradio as gr | |
| import json | |
| import time | |
| import traceback | |
| import inspect | |
| import re | |
| # ---------------------------- | |
| # Load config | |
| # ---------------------------- | |
| try: | |
| from config import ( | |
| CLIENT_ID, | |
| CLIENT_SECRET, | |
| REFRESH_TOKEN, | |
| API_BASE, | |
| OPENROUTER_API_KEY, # your OpenRouter API key (put this in config.py) | |
| OPENROUTER_MODEL # e.g. "gpt-4o-mini" or any model name routed by OpenRouter | |
| ) | |
| except Exception: | |
| raise SystemExit( | |
| "Make sure config.py exists with CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE, " | |
| "OPENROUTER_API_KEY and OPENROUTER_MODEL (or set OPENROUTER_MODEL to your preferred model)." | |
| ) | |
| # OpenRouter endpoint (public OpenRouter cloud endpoint) | |
| OPENROUTER_BASE_URL = "https://api.openrouter.ai/v1" | |
| # ---------------------------- | |
| # Initialize FastMCP | |
| # ---------------------------- | |
| mcp = FastMCP("ZohoCRMAgent") | |
| # ---------------------------- | |
| # Analytics / KPI logging (simple local JSON file) | |
| # ---------------------------- | |
| ANALYTICS_PATH = "mcp_analytics.json" | |
| def _init_analytics(): | |
| if not os.path.exists(ANALYTICS_PATH): | |
| base = { | |
| "tool_calls": {}, | |
| "llm_calls": 0, | |
| "last_llm_confidence": None, | |
| "created_at": time.time() | |
| } | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(base, f, indent=2) | |
| def _log_tool_call(tool_name: str, success: bool = True): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["tool_calls"].setdefault(tool_name, {"count": 0, "success": 0, "fail": 0}) | |
| data["tool_calls"][tool_name]["count"] += 1 | |
| if success: | |
| data["tool_calls"][tool_name]["success"] += 1 | |
| else: | |
| data["tool_calls"][tool_name]["fail"] += 1 | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def _log_llm_call(confidence: Optional[float] = None): | |
| try: | |
| with open(ANALYTICS_PATH, "r") as f: | |
| data = json.load(f) | |
| except Exception: | |
| data = {"tool_calls": {}, "llm_calls": 0, "last_llm_confidence": None} | |
| data["llm_calls"] = data.get("llm_calls", 0) + 1 | |
| if confidence is not None: | |
| data["last_llm_confidence"] = confidence | |
| with open(ANALYTICS_PATH, "w") as f: | |
| json.dump(data, f, indent=2) | |
| _init_analytics() | |
| # ---------------------------- | |
| # OpenRouter wrapper | |
| # ---------------------------- | |
| def _openrouter_headers(): | |
| return {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json"} | |
| def openrouter_generate(system_prompt: str, user_prompt: str, history: Optional[List[Tuple[str,str]]] = None, max_tokens: int = 512) -> Dict[str, Any]: | |
| """ | |
| Call OpenRouter chat completions endpoint with messages built from system + history + user prompt. | |
| Returns dict: {'text': <str>, 'raw': <resp_json>, 'confidence': Optional[float]} | |
| """ | |
| messages = [] | |
| # system | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| # history (list of (user,assistant)) | |
| history = history or [] | |
| for pair in history: | |
| try: | |
| u, a = pair[0], pair[1] | |
| if u: | |
| messages.append({"role": "user", "content": u}) | |
| if a: | |
| messages.append({"role": "assistant", "content": a}) | |
| except Exception: | |
| continue | |
| # current user | |
| messages.append({"role": "user", "content": user_prompt}) | |
| body = { | |
| "model": OPENROUTER_MODEL, | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| # "temperature": 0.0, # set if you want deterministic responses | |
| } | |
| url = f"{OPENROUTER_BASE_URL}/chat/completions" | |
| try: | |
| r = requests.post(url, headers=_openrouter_headers(), json=body, timeout=60) | |
| except Exception as e: | |
| raise RuntimeError(f"OpenRouter request failed: {e}") | |
| if r.status_code not in (200, 201): | |
| # surface the error for debugging | |
| raise RuntimeError(f"OpenRouter API error {r.status_code}: {r.text}") | |
| resp_json = r.json() | |
| # Parse response for text; different routers may vary slightly | |
| text = "" | |
| confidence = None | |
| try: | |
| # typical shape: choices[0].message.content or choices[0].message | |
| choice = resp_json.get("choices", [{}])[0] | |
| message = choice.get("message", {}) or {} | |
| if isinstance(message, dict): | |
| text = message.get("content") or message.get("content", "") | |
| # sometimes content is a dict mapping types -> text | |
| if isinstance(text, dict): | |
| # join possible parts | |
| text = text.get("text") or next(iter(text.values()), "") | |
| else: | |
| text = str(message) | |
| # some providers include scores | |
| confidence = choice.get("finish_reason_score") or choice.get("score") or None | |
| except Exception: | |
| text = json.dumps(resp_json) | |
| _log_llm_call(confidence) | |
| return {"text": text, "raw": resp_json, "confidence": confidence} | |
| # ---------------------------- | |
| # Zoho token refresh & headers | |
| # ---------------------------- | |
| def _get_valid_token_headers() -> dict: | |
| token_url = "https://accounts.zoho.in/oauth/v2/token" | |
| params = { | |
| "refresh_token": REFRESH_TOKEN, | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "grant_type": "refresh_token" | |
| } | |
| r = requests.post(token_url, params=params, timeout=20) | |
| if r.status_code == 200: | |
| t = r.json().get("access_token") | |
| return {"Authorization": f"Zoho-oauthtoken {t}"} | |
| else: | |
| raise RuntimeError(f"Failed to refresh Zoho token: {r.status_code} {r.text}") | |
| # ---------------------------- | |
| # MCP tools: Zoho CRUD & process_document (unchanged) | |
| # ---------------------------- | |
| def authenticate_zoho() -> str: | |
| try: | |
| _ = _get_valid_token_headers() | |
| _log_tool_call("authenticate_zoho", True) | |
| return "Zoho token refreshed (ok)." | |
| except Exception as e: | |
| _log_tool_call("authenticate_zoho", False) | |
| return f"Failed to authenticate: {e}" | |
| def create_record(module_name: str, record_data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| payload = {"data": [record_data]} | |
| r = requests.post(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_record", False) | |
| return f"Error creating record: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_record", False) | |
| return f"Exception: {e}" | |
| def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}" | |
| r = requests.get(url, headers=headers, params={"page": page, "per_page": per_page}, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("get_records", True) | |
| return r.json().get("data", []) | |
| _log_tool_call("get_records", False) | |
| return [f"Error retrieving {module_name}: {r.status_code} {r.text}"] | |
| except Exception as e: | |
| _log_tool_call("get_records", False) | |
| return [f"Exception: {e}"] | |
| def update_record(module_name: str, record_id: str, data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| payload = {"data": [data]} | |
| r = requests.put(url, headers=headers, json=payload, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("update_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("update_record", False) | |
| return f"Error updating: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("update_record", False) | |
| return f"Exception: {e}" | |
| def delete_record(module_name: str, record_id: str) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/{module_name}/{record_id}" | |
| r = requests.delete(url, headers=headers, timeout=20) | |
| if r.status_code == 200: | |
| _log_tool_call("delete_record", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("delete_record", False) | |
| return f"Error deleting: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("delete_record", False) | |
| return f"Exception: {e}" | |
| def create_invoice(data: dict) -> str: | |
| try: | |
| headers = _get_valid_token_headers() | |
| url = f"{API_BASE}/invoices" | |
| r = requests.post(url, headers=headers, json={"data": [data]}, timeout=20) | |
| if r.status_code in (200, 201): | |
| _log_tool_call("create_invoice", True) | |
| return json.dumps(r.json(), ensure_ascii=False) | |
| _log_tool_call("create_invoice", False) | |
| return f"Error creating invoice: {r.status_code} {r.text}" | |
| except Exception as e: | |
| _log_tool_call("create_invoice", False) | |
| return f"Exception: {e}" | |
| def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: | |
| """ | |
| Process an uploaded file path (local path or URL). Per developer instruction, | |
| we accept local paths like '/mnt/data/script_zoho_mcp' and return a file:// URL. | |
| Replace the placeholder OCR block with your real OCR pipeline when ready. | |
| """ | |
| try: | |
| if os.path.exists(file_path): | |
| file_url = f"file://{file_path}" | |
| extracted = { | |
| "Name": "ACME Corp (simulated)", | |
| "Email": "[email protected]", | |
| "Phone": "+91-99999-00000", | |
| "Total": "1234.00", | |
| "Confidence": 0.88 | |
| } | |
| _log_tool_call("process_document", True) | |
| return { | |
| "status": "success", | |
| "file": os.path.basename(file_path), | |
| "file_url": file_url, | |
| "target_module": target_module, | |
| "extracted_data": extracted | |
| } | |
| else: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": "file not found", "file_path": file_path} | |
| except Exception as e: | |
| _log_tool_call("process_document", False) | |
| return {"status": "error", "error": str(e)} | |
| # ---------------------------- | |
| # Simple local command parser | |
| # ---------------------------- | |
| def try_parse_and_invoke_command(text: str): | |
| text = text.strip() | |
| # create_record | |
| m = re.match(r"^create_record\s+(\w+)\s+(.+)$", text, re.I) | |
| if m: | |
| module = m.group(1) | |
| body = m.group(2) | |
| try: | |
| record_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for record_data" | |
| return create_record(module, record_data) | |
| # create_invoice | |
| m = re.match(r"^create_invoice\s+(.+)$", text, re.I) | |
| if m: | |
| body = m.group(1) | |
| try: | |
| invoice_data = json.loads(body) | |
| except Exception: | |
| return "Invalid JSON for invoice_data" | |
| return create_invoice(invoice_data) | |
| # process_document via local path | |
| m = re.match(r"^(\/mnt\/data\/\S+)$", text) | |
| if m: | |
| path = m.group(1) | |
| return process_document(path) | |
| return None | |
| # ---------------------------- | |
| # OpenRouter-based chat handler | |
| # ---------------------------- | |
| def openrouter_response(message: str, history: Optional[List[Tuple[str,str]]] = None) -> str: | |
| history = history or [] | |
| system_prompt = ( | |
| "You are Zoho Assistant. Keep responses concise. When appropriate, output a JSON object with keys 'tool' and 'args' " | |
| "so the server can automatically call the corresponding MCP tool. Example:\n" | |
| '{"tool":"create_record","args":{"module_name":"Contacts","record_data":{"Last_Name":"Doe","Email":"[email protected]"}}}\n' | |
| "If not invoking tools, answer conversationally." | |
| ) | |
| try: | |
| resp = openrouter_generate(system_prompt, message, history) | |
| text = resp.get("text", "") | |
| # If LLM returned JSON indicating a tool invocation, attempt to parse & run | |
| parsed = None | |
| payload = text.strip() | |
| if payload.startswith("{") or payload.startswith("["): | |
| try: | |
| parsed = json.loads(payload) | |
| except Exception: | |
| parsed = None | |
| if isinstance(parsed, dict) and "tool" in parsed: | |
| tool_name = parsed.get("tool") | |
| args = parsed.get("args", {}) | |
| # Try call local tool by name if exists | |
| if tool_name in globals() and callable(globals()[tool_name]): | |
| try: | |
| result = globals()[tool_name](**args) if isinstance(args, dict) else globals()[tool_name](args) | |
| return f"Invoked tool '{tool_name}'. Result:\n{result}" | |
| except Exception as e: | |
| return f"Tool invocation error: {e}" | |
| else: | |
| return f"Requested tool '{tool_name}' not found locally." | |
| return text | |
| except Exception as e: | |
| return f"(OpenRouter error) {e}" | |
| # ---------------------------- | |
| # Gradio chat handler | |
| # ---------------------------- | |
| def chat_handler(message, history): | |
| history = history or [] | |
| trimmed = (message or "").strip() | |
| # Explicit POC commands | |
| cmd = try_parse_and_invoke_command(trimmed) | |
| if cmd is not None: | |
| return cmd | |
| # Developer convenience: local path handling (send unchanged) | |
| if trimmed.startswith("/mnt/data/"): | |
| try: | |
| doc = process_document(trimmed) | |
| return f"Processed file {doc.get('file')}. Extracted: {json.dumps(doc.get('extracted_data'), ensure_ascii=False)}" | |
| except Exception as e: | |
| return f"Error processing document: {e}" | |
| # Otherwise, call OpenRouter | |
| return openrouter_response(trimmed, history) | |
| # ---------------------------- | |
| # Gradio UI | |
| # ---------------------------- | |
| def chat_interface(): | |
| return gr.ChatInterface( | |
| fn=chat_handler, | |
| textbox=gr.Textbox(placeholder="Ask me to create contacts, invoices, upload docs (or paste /mnt/data/... for dev).") | |
| ) | |
| # ---------------------------- | |
| # Entrypoint | |
| # ---------------------------- | |
| if __name__ == "__main__": | |
| print("[startup] Launching Gradio UI + FastMCP server (OpenRouter mode).") | |
| demo = chat_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |