A newer version of the Gradio SDK is available:
6.1.0
# MCP Server Implementation Guide
## Overview
RewardPilot implements a multi-agent MCP (Model Context Protocol) architecture with 4 independent microservices that work together to provide intelligent credit card recommendations.
## Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β User Interface β β (Gradio 6.0 App) β ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ β βΌ βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β Orchestrator Agent β β (Claude 3.5 Sonnet) β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β Phase 1: Planning β β β β - Analyze transaction context β β β β - Determine required MCP servers β β β β - Create execution strategy β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ β ββββββββββββββΌβββββββββββββ βΌ βΌ βΌ βββββββββββββββββ ββββββββββββ ββββββββββββββ β Smart Wallet β β RAG β β Forecast β β MCP Server β β MCP β β MCP Server β βββββββββ¬ββββββββ ββββββ¬ββββββ βββββββ¬βββββββ β β β βΌ βΌ βΌ ββββββββββββββββββββββββββββββββββββββββββββ β Gemini 2.0 Flash β β (Reasoning & Synthesis) β ββββββββββββββββ¬ββββββββββββββββββββββββββββ β βΌ βββββββββββββββββ β Final Responseβ βββββββββββββββββ
---
## MCP Server 1: Orchestrator
### Purpose
Coordinates all MCP servers and manages the agent workflow.
### Deployment
- **URL:** https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space
- **Stack:** FastAPI + Claude 3.5 Sonnet
- **Hosting:** Hugging Face Spaces
### API Endpoints
#### POST `/recommend`
Get card recommendation for a transaction.
**Request:**
```json
{
"user_id": "u_alice",
"merchant": "Whole Foods",
"mcc": "5411",
"amount_usd": 127.50,
"category": "Groceries"
}
Response:
{
"recommended_card": {
"card_id": "c_amex_gold",
"card_name": "American Express Gold",
"issuer": "American Express"
},
"rewards": {
"points_earned": 510,
"cash_value": 5.10,
"earn_rate": "4x points"
},
"reasoning": "Amex Gold offers 4x points on U.S. supermarkets...",
"confidence": 0.95,
"alternatives": [
{
"card_name": "Citi Custom Cash",
"rewards": 3.82,
"reason": "5% but monthly cap already hit"
}
],
"warnings": [
"You're at $450/$1500 monthly cap. 3 more grocery trips available."
]
}
Implementation
# orchestrator_server.py
from fastapi import FastAPI, HTTPException
from anthropic import Anthropic
import httpx
import asyncio
app = FastAPI(title="RewardPilot Orchestrator")
anthropic = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
@app.post("/recommend")
async def recommend_card(request: TransactionRequest):
# Phase 1: Planning with Claude
plan = await create_execution_plan(request)
# Phase 2: Parallel MCP calls
mcp_results = await execute_mcp_calls(plan)
# Phase 3: Reasoning with Gemini
explanation = await synthesize_reasoning(request, mcp_results)
# Phase 4: Format response
return format_recommendation(mcp_results, explanation)
async def create_execution_plan(request: TransactionRequest):
"""Claude analyzes transaction and plans MCP calls"""
prompt = f"""
Analyze this transaction and determine which MCP servers to call:
Transaction:
- Merchant: {request.merchant}
- Category: {request.category}
- Amount: ${request.amount_usd}
Available MCP servers:
1. smart_wallet - Card recommendations and reward calculations
2. rewards_rag - Semantic search of card benefits
3. spend_forecast - Spending predictions and cap warnings
Return a JSON plan with:
- strategy: optimization approach
- mcp_calls: list of servers to call (priority order)
- confidence_threshold: minimum confidence for recommendation
"""
response = anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
async def execute_mcp_calls(plan: dict):
"""Call MCP servers in parallel"""
tasks = []
for mcp_call in plan["mcp_calls"]:
if mcp_call["service"] == "smart_wallet":
tasks.append(call_smart_wallet(request))
elif mcp_call["service"] == "rewards_rag":
tasks.append(call_rewards_rag(request))
elif mcp_call["service"] == "spend_forecast":
tasks.append(call_forecast(request))
results = await asyncio.gather(*tasks)
return dict(zip([c["service"] for c in plan["mcp_calls"]], results))
MCP Server 2: Smart Wallet
Purpose
Analyzes user's credit cards and calculates optimal rewards.
Deployment
- URL: https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space
- Stack: FastAPI + Python + PostgreSQL
- Hosting: Hugging Face Spaces
API Endpoints
POST /analyze
Analyze transaction against user's wallet.
Request:
{
"user_id": "u_alice",
"merchant": "Whole Foods",
"mcc": "5411",
"amount_usd": 127.50
}
Response:
{
"recommended_card": {
"card_id": "c_amex_gold",
"card_name": "American Express Gold",
"rewards_earned": 5.10,
"earn_rate": "4x points",
"points_earned": 510
},
"all_cards_comparison": [
{
"card_name": "Amex Gold",
"rewards": 5.10,
"rank": 1
},
{
"card_name": "Citi Custom Cash",
"rewards": 3.82,
"rank": 2,
"note": "Cap already hit this month"
}
]
}
Implementation
# smart_wallet_server.py
from fastapi import FastAPI
from sqlalchemy import create_engine
from typing import List
app = FastAPI(title="Smart Wallet MCP")
class CardAnalyzer:
def __init__(self, user_id: str):
self.user_id = user_id
self.cards = self.load_user_cards()
def analyze_transaction(self, merchant: str, mcc: str, amount: float):
"""Calculate rewards for all cards"""
results = []
for card in self.cards:
# Get reward rate for this MCC
reward_rate = self.get_reward_rate(card, mcc)
# Check spending caps
current_spending = self.get_monthly_spending(card, mcc)
cap_remaining = card.monthly_cap - current_spending
# Calculate rewards
if cap_remaining >= amount:
rewards = amount * reward_rate
else:
# Partial cap scenario
rewards = (cap_remaining * reward_rate) +
((amount - cap_remaining) * card.base_rate)
results.append({
"card": card,
"rewards": rewards,
"effective_rate": rewards / amount,
"cap_status": {
"current": current_spending,
"limit": card.monthly_cap,
"remaining": cap_remaining
}
})
# Sort by rewards (descending)
results.sort(key=lambda x: x["rewards"], reverse=True)
return results[0] # Return best card
MCP Server 3: Rewards RAG
Purpose
Semantic search across credit card benefit documents.
Deployment
- URL: https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space
- Stack: FastAPI + LlamaIndex + ChromaDB
- Hosting: Hugging Face Spaces
API Endpoints
POST /query
Search card benefits with natural language.
Request:
{
"query": "Does Amex Gold work at Costco for groceries?",
"card_name": "American Express Gold",
"top_k": 3
}
Response:
{
"answer": "No, American Express cards are not accepted at Costco warehouse locations due to Costco's exclusive Visa agreement. However, Amex Gold works at Costco.com for online orders.",
"sources": [
{
"card_name": "American Express Gold",
"content": "Merchant acceptance: Not accepted at Costco warehouses...",
"relevance_score": 0.92
}
]
}
Implementation
See docs/llamaindex_setup.md for detailed RAG implementation.
MCP Server 4: Spend Forecast
Purpose
ML-based spending predictions and cap warnings.
Deployment
- URL: https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space
- Stack: FastAPI + Scikit-learn + Redis
- Hosting: Hugging Face Spaces
API Endpoints
POST /predict
Predict spending for next period.
Request:
{
"user_id": "u_alice",
"card_id": "c_amex_gold",
"category": "Groceries",
"horizon_days": 30
}
Response:
{
"predicted_spending": 520.50,
"confidence_interval": [480.00, 560.00],
"warnings": [
{
"type": "cap_warning",
"message": "Likely to exceed $500 monthly cap",
"probability": 0.78,
"suggested_action": "Switch to Citi Custom Cash after $500"
}
]
}
Implementation
# forecast_server.py
from fastapi import FastAPI
from sklearn.ensemble import RandomForestRegressor
import numpy as np
app = FastAPI(title="Spend Forecast MCP")
class SpendingForecaster:
def __init__(self):
self.model = RandomForestRegressor(n_estimators=100)
def predict(self, user_id: str, category: str, horizon_days: int):
"""Predict spending for next N days"""
# Load historical data
history = self.load_user_history(user_id, category)
# Feature engineering
features = self.extract_features(history)
# Predict
prediction = self.model.predict(features)
# Calculate confidence interval
predictions = [tree.predict(features) for tree in self.model.estimators_]
lower = np.percentile(predictions, 5)
upper = np.percentile(predictions, 95)
return {
"predicted_spending": float(prediction[0]),
"confidence_interval": [float(lower), float(upper)]
}
Communication Flow
Sequence Diagram
User -> Gradio: Enter transaction
Gradio -> Orchestrator: POST /recommend
Orchestrator -> Claude: Create execution plan
Claude -> Orchestrator: {plan: call all 3 MCPs}
Orchestrator -> Smart Wallet: POST /analyze
Orchestrator -> RAG: POST /query
Orchestrator -> Forecast: POST /predict
Smart Wallet -> Orchestrator: {best_card: Amex Gold, rewards: 5.10}
RAG -> Orchestrator: {benefits: "4x on groceries..."}
Forecast -> Orchestrator: {warning: "Near cap"}
Orchestrator -> Gemini: Synthesize results
Gemini -> Orchestrator: {explanation: "Use Amex Gold because..."}
Orchestrator -> Gradio: Final recommendation
Gradio -> User: Display result
Deployment Instructions
1. Deploy Each MCP Server to Hugging Face
# Clone template
git clone https://huggingface.co/spaces/YOUR_USERNAME/rewardpilot-orchestrator
# Add files
cp orchestrator_server.py app.py
cp requirements.txt .
# Create Space on HF
huggingface-cli repo create rewardpilot-orchestrator --type space --space_sdk gradio
# Push
git add .
git commit -m "Deploy orchestrator"
git push
2. Set Environment Variables
In each Space's settings, add:
ANTHROPIC_API_KEY=sk-ant-xxxxx
GEMINI_API_KEY=AIzaSyxxxxx
OPENAI_API_KEY=sk-xxxxx
3. Configure Endpoints
In main app.py:
MCP_ENDPOINTS = {
"orchestrator": "https://mcp-1st-birthday-rewardpilot-orchestrator.hf.space",
"smart_wallet": "https://mcp-1st-birthday-rewardpilot-smart-wallet.hf.space",
"rewards_rag": "https://mcp-1st-birthday-rewardpilot-rewards-rag.hf.space",
"forecast": "https://mcp-1st-birthday-rewardpilot-spend-forecast.hf.space"
}
Error Handling
Graceful Degradation
async def call_mcp_with_fallback(service_name: str, request_data: dict):
"""Call MCP server with timeout and fallback"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
MCP_ENDPOINTS[service_name],
json=request_data
)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.error(f"{service_name} timeout")
return get_fallback_response(service_name)
except httpx.HTTPError as e:
logger.error(f"{service_name} error: {e}")
return get_fallback_response(service_name)
Monitoring
Health Checks
@app.get("/health")
async def health_check():
"""Check status of all MCP servers"""
statuses = {}
for service, url in MCP_ENDPOINTS.items():
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{url}/health")
statuses[service] = {
"status": "healthy" if response.status_code == 200 else "unhealthy",
"latency_ms": response.elapsed.total_seconds() * 1000
}
except Exception as e:
statuses[service] = {"status": "down", "error": str(e)}
return statuses
Performance Optimization
Caching Strategy
from functools import lru_cache
import redis
# Redis cache for frequent queries
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
@lru_cache(maxsize=1000)
def get_card_benefits(card_name: str):
"""Cache card benefits for 1 hour"""
cache_key = f"benefits:{card_name}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from RAG
result = call_rewards_rag({"query": f"Get all benefits for {card_name}"})
# Cache for 1 hour
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
Testing
Integration Tests
import pytest
import httpx
@pytest.mark.asyncio
async def test_orchestrator_end_to_end():
"""Test full recommendation flow"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{MCP_ENDPOINTS['orchestrator']}/recommend",
json={
"user_id": "test_user",
"merchant": "Whole Foods",
"amount_usd": 100.00
}
)
assert response.status_code == 200
data = response.json()
assert "recommended_card" in data
assert "rewards" in data
assert "reasoning" in data
Next Steps
- Scale MCP servers - Add load balancing
- Add authentication - JWT tokens for API access
- Implement webhooks - Real-time transaction notifications
- Add more MCP servers - Travel optimization, business expenses, etc.
Related Documentation:
---