```markdown # MCP Server Implementation Guide ## Overview RewardPilot implements a multi-agent MCP (Model Context Protocol) architecture with 4 independent microservices that work together to provide intelligent credit card recommendations. ## Architecture Diagram ``` ┌─────────────────────────────────────────────────────────────────┐ │ User Interface │ │ (Gradio 6.0 App) │ └────────────────────────────┬────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Orchestrator Agent │ │ (Claude 3.5 Sonnet) │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Phase 1: Planning │ │ │ │ - Analyze transaction context │ │ │ │ - Determine required MCP servers │ │ │ │ - Create execution strategy │ │ │ └──────────────────────────────────────────────────────────┘ │ └────────────────────────────┬────────────────────────────────────┘ │ ┌────────────┼────────────┐ ▼ ▼ ▼ ┌───────────────┐ ┌──────────┐ ┌────────────┐ │ Smart Wallet │ │ RAG │ │ Forecast │ │ MCP Server │ │ MCP │ │ MCP Server │ └───────┬───────┘ └────┬─────┘ └─────┬──────┘ │ │ │ ▼ ▼ ▼ ┌──────────────────────────────────────────┐ │ Gemini 2.0 Flash │ │ (Reasoning & Synthesis) │ └──────────────┬───────────────────────────┘ │ ▼ ┌───────────────┐ │ Final Response│ └───────────────┘ ``` --- ## MCP Server 1: Orchestrator ### Purpose Coordinates all MCP servers and manages the agent workflow. ### Deployment - **URL:** https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space - **Stack:** FastAPI + Claude 3.5 Sonnet - **Hosting:** Hugging Face Spaces ### API Endpoints #### POST `/recommend` Get card recommendation for a transaction. **Request:** ```json { "user_id": "u_alice", "merchant": "Whole Foods", "mcc": "5411", "amount_usd": 127.50, "category": "Groceries" } ``` **Response:** ```json { "recommended_card": { "card_id": "c_amex_gold", "card_name": "American Express Gold", "issuer": "American Express" }, "rewards": { "points_earned": 510, "cash_value": 5.10, "earn_rate": "4x points" }, "reasoning": "Amex Gold offers 4x points on U.S. supermarkets...", "confidence": 0.95, "alternatives": [ { "card_name": "Citi Custom Cash", "rewards": 3.82, "reason": "5% but monthly cap already hit" } ], "warnings": [ "You're at $450/$1500 monthly cap. 3 more grocery trips available." ] } ``` ### Implementation ```python # orchestrator_server.py from fastapi import FastAPI, HTTPException from anthropic import Anthropic import httpx import asyncio app = FastAPI(title="RewardPilot Orchestrator") anthropic = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) @app.post("/recommend") async def recommend_card(request: TransactionRequest): # Phase 1: Planning with Claude plan = await create_execution_plan(request) # Phase 2: Parallel MCP calls mcp_results = await execute_mcp_calls(plan) # Phase 3: Reasoning with Gemini explanation = await synthesize_reasoning(request, mcp_results) # Phase 4: Format response return format_recommendation(mcp_results, explanation) async def create_execution_plan(request: TransactionRequest): """Claude analyzes transaction and plans MCP calls""" prompt = f""" Analyze this transaction and determine which MCP servers to call: Transaction: - Merchant: {request.merchant} - Category: {request.category} - Amount: ${request.amount_usd} Available MCP servers: 1. smart_wallet - Card recommendations and reward calculations 2. rewards_rag - Semantic search of card benefits 3. spend_forecast - Spending predictions and cap warnings Return a JSON plan with: - strategy: optimization approach - mcp_calls: list of servers to call (priority order) - confidence_threshold: minimum confidence for recommendation """ response = anthropic.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) return json.loads(response.content[0].text) async def execute_mcp_calls(plan: dict): """Call MCP servers in parallel""" tasks = [] for mcp_call in plan["mcp_calls"]: if mcp_call["service"] == "smart_wallet": tasks.append(call_smart_wallet(request)) elif mcp_call["service"] == "rewards_rag": tasks.append(call_rewards_rag(request)) elif mcp_call["service"] == "spend_forecast": tasks.append(call_forecast(request)) results = await asyncio.gather(*tasks) return dict(zip([c["service"] for c in plan["mcp_calls"]], results)) ``` --- ## MCP Server 2: Smart Wallet ### Purpose Analyzes user's credit cards and calculates optimal rewards. ### Deployment - **URL:** https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space - **Stack:** FastAPI + Python + PostgreSQL - **Hosting:** Hugging Face Spaces ### API Endpoints #### POST `/analyze` Analyze transaction against user's wallet. **Request:** ```json { "user_id": "u_alice", "merchant": "Whole Foods", "mcc": "5411", "amount_usd": 127.50 } ``` **Response:** ```json { "recommended_card": { "card_id": "c_amex_gold", "card_name": "American Express Gold", "rewards_earned": 5.10, "earn_rate": "4x points", "points_earned": 510 }, "all_cards_comparison": [ { "card_name": "Amex Gold", "rewards": 5.10, "rank": 1 }, { "card_name": "Citi Custom Cash", "rewards": 3.82, "rank": 2, "note": "Cap already hit this month" } ] } ``` ### Implementation ```python # smart_wallet_server.py from fastapi import FastAPI from sqlalchemy import create_engine from typing import List app = FastAPI(title="Smart Wallet MCP") class CardAnalyzer: def __init__(self, user_id: str): self.user_id = user_id self.cards = self.load_user_cards() def analyze_transaction(self, merchant: str, mcc: str, amount: float): """Calculate rewards for all cards""" results = [] for card in self.cards: # Get reward rate for this MCC reward_rate = self.get_reward_rate(card, mcc) # Check spending caps current_spending = self.get_monthly_spending(card, mcc) cap_remaining = card.monthly_cap - current_spending # Calculate rewards if cap_remaining >= amount: rewards = amount * reward_rate else: # Partial cap scenario rewards = (cap_remaining * reward_rate) + ((amount - cap_remaining) * card.base_rate) results.append({ "card": card, "rewards": rewards, "effective_rate": rewards / amount, "cap_status": { "current": current_spending, "limit": card.monthly_cap, "remaining": cap_remaining } }) # Sort by rewards (descending) results.sort(key=lambda x: x["rewards"], reverse=True) return results[0] # Return best card ``` --- ## MCP Server 3: Rewards RAG ### Purpose Semantic search across credit card benefit documents. ### Deployment - **URL:** https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space - **Stack:** FastAPI + LlamaIndex + ChromaDB - **Hosting:** Hugging Face Spaces ### API Endpoints #### POST `/query` Search card benefits with natural language. **Request:** ```json { "query": "Does Amex Gold work at Costco for groceries?", "card_name": "American Express Gold", "top_k": 3 } ``` **Response:** ```json { "answer": "No, American Express cards are not accepted at Costco warehouse locations due to Costco's exclusive Visa agreement. However, Amex Gold works at Costco.com for online orders.", "sources": [ { "card_name": "American Express Gold", "content": "Merchant acceptance: Not accepted at Costco warehouses...", "relevance_score": 0.92 } ] } ``` ### Implementation See `docs/llamaindex_setup.md` for detailed RAG implementation. --- ## MCP Server 4: Spend Forecast ### Purpose ML-based spending predictions and cap warnings. ### Deployment - **URL:** https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space - **Stack:** FastAPI + Scikit-learn + Redis - **Hosting:** Hugging Face Spaces ### API Endpoints #### POST `/predict` Predict spending for next period. **Request:** ```json { "user_id": "u_alice", "card_id": "c_amex_gold", "category": "Groceries", "horizon_days": 30 } ``` **Response:** ```json { "predicted_spending": 520.50, "confidence_interval": [480.00, 560.00], "warnings": [ { "type": "cap_warning", "message": "Likely to exceed $500 monthly cap", "probability": 0.78, "suggested_action": "Switch to Citi Custom Cash after $500" } ] } ``` ### Implementation ```python # forecast_server.py from fastapi import FastAPI from sklearn.ensemble import RandomForestRegressor import numpy as np app = FastAPI(title="Spend Forecast MCP") class SpendingForecaster: def __init__(self): self.model = RandomForestRegressor(n_estimators=100) def predict(self, user_id: str, category: str, horizon_days: int): """Predict spending for next N days""" # Load historical data history = self.load_user_history(user_id, category) # Feature engineering features = self.extract_features(history) # Predict prediction = self.model.predict(features) # Calculate confidence interval predictions = [tree.predict(features) for tree in self.model.estimators_] lower = np.percentile(predictions, 5) upper = np.percentile(predictions, 95) return { "predicted_spending": float(prediction[0]), "confidence_interval": [float(lower), float(upper)] } ``` --- ## Communication Flow ### Sequence Diagram ``` User -> Gradio: Enter transaction Gradio -> Orchestrator: POST /recommend Orchestrator -> Claude: Create execution plan Claude -> Orchestrator: {plan: call all 3 MCPs} Orchestrator -> Smart Wallet: POST /analyze Orchestrator -> RAG: POST /query Orchestrator -> Forecast: POST /predict Smart Wallet -> Orchestrator: {best_card: Amex Gold, rewards: 5.10} RAG -> Orchestrator: {benefits: "4x on groceries..."} Forecast -> Orchestrator: {warning: "Near cap"} Orchestrator -> Gemini: Synthesize results Gemini -> Orchestrator: {explanation: "Use Amex Gold because..."} Orchestrator -> Gradio: Final recommendation Gradio -> User: Display result ``` --- ## Deployment Instructions ### 1. Deploy Each MCP Server to Hugging Face ```bash # Clone template git clone https://huggingface.co/spaces/YOUR_USERNAME/rewardpilot-orchestrator # Add files cp orchestrator_server.py app.py cp requirements.txt . # Create Space on HF huggingface-cli repo create rewardpilot-orchestrator --type space --space_sdk gradio # Push git add . git commit -m "Deploy orchestrator" git push ``` ### 2. Set Environment Variables In each Space's settings, add: ```bash ANTHROPIC_API_KEY=sk-ant-xxxxx GEMINI_API_KEY=AIzaSyxxxxx OPENAI_API_KEY=sk-xxxxx ``` ### 3. Configure Endpoints In main `app.py`: ```python MCP_ENDPOINTS = { "orchestrator": "https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space", "smart_wallet": "https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space", "rewards_rag": "https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space", "forecast": "https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space" } ``` --- ## Error Handling ### Graceful Degradation ```python async def call_mcp_with_fallback(service_name: str, request_data: dict): """Call MCP server with timeout and fallback""" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( MCP_ENDPOINTS[service_name], json=request_data ) response.raise_for_status() return response.json() except httpx.TimeoutException: logger.error(f"{service_name} timeout") return get_fallback_response(service_name) except httpx.HTTPError as e: logger.error(f"{service_name} error: {e}") return get_fallback_response(service_name) ``` --- ## Monitoring ### Health Checks ```python @app.get("/health") async def health_check(): """Check status of all MCP servers""" statuses = {} for service, url in MCP_ENDPOINTS.items(): try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{url}/health") statuses[service] = { "status": "healthy" if response.status_code == 200 else "unhealthy", "latency_ms": response.elapsed.total_seconds() * 1000 } except Exception as e: statuses[service] = {"status": "down", "error": str(e)} return statuses ``` --- ## Performance Optimization ### Caching Strategy ```python from functools import lru_cache import redis # Redis cache for frequent queries redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True) @lru_cache(maxsize=1000) def get_card_benefits(card_name: str): """Cache card benefits for 1 hour""" cache_key = f"benefits:{card_name}" # Check cache cached = redis_client.get(cache_key) if cached: return json.loads(cached) # Fetch from RAG result = call_rewards_rag({"query": f"Get all benefits for {card_name}"}) # Cache for 1 hour redis_client.setex(cache_key, 3600, json.dumps(result)) return result ``` --- ## Testing ### Integration Tests ```python import pytest import httpx @pytest.mark.asyncio async def test_orchestrator_end_to_end(): """Test full recommendation flow""" async with httpx.AsyncClient() as client: response = await client.post( f"{MCP_ENDPOINTS['orchestrator']}/recommend", json={ "user_id": "test_user", "merchant": "Whole Foods", "amount_usd": 100.00 } ) assert response.status_code == 200 data = response.json() assert "recommended_card" in data assert "rewards" in data assert "reasoning" in data ``` --- ## Next Steps 1. **Scale MCP servers** - Add load balancing 2. **Add authentication** - JWT tokens for API access 3. **Implement webhooks** - Real-time transaction notifications 4. **Add more MCP servers** - Travel optimization, business expenses, etc. --- **Related Documentation:** - [Modal Deployment Guide](./modal_deployment.md) - [LlamaIndex RAG Setup](./llamaindex_setup.md) - [Agent Reasoning Flow](./agent_reasoning.md) ``` ---