Really-amin's picture
Upload 325 files
b66240d verified
"""Async collectors that power the FastAPI endpoints."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import httpx
from config import CACHE_TTL, COIN_SYMBOL_MAPPING, USER_AGENT, get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class CollectorError(RuntimeError):
"""Raised when a provider fails to return data."""
def __init__(self, message: str, provider: Optional[str] = None, status_code: Optional[int] = None):
super().__init__(message)
self.provider = provider
self.status_code = status_code
@dataclass
class CacheEntry:
value: Any
expires_at: float
class TTLCache:
"""Simple in-memory TTL cache safe for async usage."""
def __init__(self, ttl: int = CACHE_TTL) -> None:
self.ttl = ttl or CACHE_TTL
self._store: Dict[str, CacheEntry] = {}
self._lock = asyncio.Lock()
async def get(self, key: str) -> Any:
async with self._lock:
entry = self._store.get(key)
if not entry:
return None
if entry.expires_at < time.time():
self._store.pop(key, None)
return None
return entry.value
async def set(self, key: str, value: Any) -> None:
async with self._lock:
self._store[key] = CacheEntry(value=value, expires_at=time.time() + self.ttl)
class ProvidersRegistry:
"""Utility that loads provider definitions from disk."""
def __init__(self, path: Optional[Path] = None) -> None:
self.path = Path(path or settings.providers_config_path)
self._providers: Dict[str, Any] = {}
self._load()
def _load(self) -> None:
if not self.path.exists():
logger.warning("Providers config not found at %s", self.path)
self._providers = {}
return
with self.path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
self._providers = data.get("providers", {})
@property
def providers(self) -> Dict[str, Any]:
return self._providers
class MarketDataCollector:
"""Fetch market data from public providers with caching and fallbacks."""
def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None:
self.registry = registry or ProvidersRegistry()
self.cache = TTLCache(settings.cache_ttl)
self._symbol_map = {symbol.lower(): coin_id for coin_id, symbol in COIN_SYMBOL_MAPPING.items()}
self.headers = {"User-Agent": settings.user_agent or USER_AGENT}
self.timeout = 15.0
self._last_error_log: Dict[str, float] = {} # Track last error log time per provider
self._error_log_throttle = 60.0 # Only log same error once per 60 seconds
async def _request(self, provider_key: str, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
provider = self.registry.providers.get(provider_key)
if not provider:
raise CollectorError(f"Provider {provider_key} not configured", provider=provider_key)
url = provider["base_url"].rstrip("/") + path
# Rate limit tracking per provider
if not hasattr(self, '_rate_limit_timestamps'):
self._rate_limit_timestamps: Dict[str, List[float]] = {}
if provider_key not in self._rate_limit_timestamps:
self._rate_limit_timestamps[provider_key] = []
# Get rate limits from provider config
rate_limit_rpm = provider.get("rate_limit", {}).get("requests_per_minute", 30)
if rate_limit_rpm and len(self._rate_limit_timestamps[provider_key]) >= rate_limit_rpm:
# Check if oldest request is older than 1 minute
oldest_time = self._rate_limit_timestamps[provider_key][0]
if time.time() - oldest_time < 60:
wait_time = 60 - (time.time() - oldest_time) + 1
if self._should_log_error(provider_key, "rate_limit_wait"):
logger.warning(f"Rate limiting {provider_key}, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
# Clean old timestamps
cutoff = time.time() - 60
self._rate_limit_timestamps[provider_key] = [
ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff
]
async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client:
response = await client.get(url, params=params)
# Record request timestamp
self._rate_limit_timestamps[provider_key].append(time.time())
# Keep only last minute of timestamps
cutoff = time.time() - 60
self._rate_limit_timestamps[provider_key] = [
ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff
]
# Handle HTTP 429 (Rate Limit) with exponential backoff
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", "60"))
error_msg = f"{provider_key} rate limited (HTTP 429), retry after {retry_after}s"
if self._should_log_error(provider_key, "HTTP 429"):
logger.warning(error_msg)
raise CollectorError(
error_msg,
provider=provider_key,
status_code=429,
)
if response.status_code != 200:
raise CollectorError(
f"{provider_key} request failed with HTTP {response.status_code}",
provider=provider_key,
status_code=response.status_code,
)
return response.json()
def _should_log_error(self, provider: str, error_msg: str) -> bool:
"""Check if error should be logged (throttle repeated errors)."""
error_key = f"{provider}:{error_msg}"
now = time.time()
last_log_time = self._last_error_log.get(error_key, 0)
if now - last_log_time > self._error_log_throttle:
self._last_error_log[error_key] = now
# Clean up old entries (keep only last hour)
cutoff = now - 3600
self._last_error_log = {k: v for k, v in self._last_error_log.items() if v > cutoff}
return True
return False
async def get_top_coins(self, limit: int = 10) -> List[Dict[str, Any]]:
cache_key = f"top_coins:{limit}"
cached = await self.cache.get(cache_key)
if cached:
return cached
# Provider list with priority order (add more fallbacks from resource files)
providers = ["coingecko", "coincap", "coinpaprika"]
last_error: Optional[Exception] = None
last_error_details: Optional[str] = None
for provider in providers:
try:
if provider == "coingecko":
data = await self._request(
"coingecko",
"/coins/markets",
{
"vs_currency": "usd",
"order": "market_cap_desc",
"per_page": limit,
"page": 1,
"sparkline": "false",
"price_change_percentage": "24h",
},
)
coins = [
{
"name": item.get("name"),
"symbol": item.get("symbol", "").upper(),
"price": item.get("current_price"),
"change_24h": item.get("price_change_percentage_24h"),
"market_cap": item.get("market_cap"),
"volume_24h": item.get("total_volume"),
"rank": item.get("market_cap_rank"),
"last_updated": item.get("last_updated"),
}
for item in data
]
await self.cache.set(cache_key, coins)
return coins
if provider == "coincap":
data = await self._request("coincap", "/assets", {"limit": limit})
coins = [
{
"name": item.get("name"),
"symbol": item.get("symbol", "").upper(),
"price": float(item.get("priceUsd", 0)),
"change_24h": float(item.get("changePercent24Hr", 0)),
"market_cap": float(item.get("marketCapUsd", 0)),
"volume_24h": float(item.get("volumeUsd24Hr", 0)),
"rank": int(item.get("rank", 0)),
}
for item in data.get("data", [])
]
await self.cache.set(cache_key, coins)
return coins
if provider == "coinpaprika":
data = await self._request("coinpaprika", "/tickers", {"quotes": "USD", "limit": limit})
coins = [
{
"name": item.get("name"),
"symbol": item.get("symbol", "").upper(),
"price": float(item.get("quotes", {}).get("USD", {}).get("price", 0)),
"change_24h": float(item.get("quotes", {}).get("USD", {}).get("percent_change_24h", 0)),
"market_cap": float(item.get("quotes", {}).get("USD", {}).get("market_cap", 0)),
"volume_24h": float(item.get("quotes", {}).get("USD", {}).get("volume_24h", 0)),
"rank": int(item.get("rank", 0)),
"last_updated": item.get("last_updated"),
}
for item in data[:limit] if item.get("quotes", {}).get("USD")
]
await self.cache.set(cache_key, coins)
return coins
except Exception as exc: # pragma: no cover - network heavy
last_error = exc
error_msg = str(exc) if str(exc) else repr(exc)
error_type = type(exc).__name__
# Extract HTTP status code if available
if hasattr(exc, 'status_code'):
status_code = exc.status_code
error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}"
elif isinstance(exc, CollectorError) and hasattr(exc, 'status_code') and exc.status_code:
status_code = exc.status_code
error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}"
# Ensure we always have a meaningful error message
if not error_msg or error_msg.strip() == "":
error_msg = f"{error_type} (no details available)"
last_error_details = f"{error_type}: {error_msg}"
# Throttle error logging to prevent spam
error_key_for_logging = error_msg or error_type
if self._should_log_error(provider, error_key_for_logging):
logger.warning(
"Provider %s failed: %s (error logged, will suppress similar errors for 60s)",
provider,
last_error_details
)
raise CollectorError(f"Unable to fetch top coins from any provider. Last error: {last_error_details or 'Unknown'}", provider=str(last_error) if last_error else None)
async def _coin_id(self, symbol: str) -> str:
symbol_lower = symbol.lower()
if symbol_lower in self._symbol_map:
return self._symbol_map[symbol_lower]
cache_key = "coingecko:symbols"
cached = await self.cache.get(cache_key)
if cached:
mapping = cached
else:
data = await self._request("coingecko", "/coins/list")
mapping = {item["symbol"].lower(): item["id"] for item in data}
await self.cache.set(cache_key, mapping)
if symbol_lower not in mapping:
raise CollectorError(f"Unknown symbol: {symbol}")
return mapping[symbol_lower]
async def get_coin_details(self, symbol: str) -> Dict[str, Any]:
coin_id = await self._coin_id(symbol)
cache_key = f"coin:{coin_id}"
cached = await self.cache.get(cache_key)
if cached:
return cached
data = await self._request(
"coingecko",
f"/coins/{coin_id}",
{"localization": "false", "tickers": "false", "market_data": "true"},
)
market_data = data.get("market_data", {})
coin = {
"id": coin_id,
"name": data.get("name"),
"symbol": data.get("symbol", "").upper(),
"description": data.get("description", {}).get("en"),
"homepage": data.get("links", {}).get("homepage", [None])[0],
"price": market_data.get("current_price", {}).get("usd"),
"market_cap": market_data.get("market_cap", {}).get("usd"),
"volume_24h": market_data.get("total_volume", {}).get("usd"),
"change_24h": market_data.get("price_change_percentage_24h"),
"high_24h": market_data.get("high_24h", {}).get("usd"),
"low_24h": market_data.get("low_24h", {}).get("usd"),
"circulating_supply": market_data.get("circulating_supply"),
"total_supply": market_data.get("total_supply"),
"ath": market_data.get("ath", {}).get("usd"),
"atl": market_data.get("atl", {}).get("usd"),
"last_updated": data.get("last_updated"),
}
await self.cache.set(cache_key, coin)
return coin
async def get_market_stats(self) -> Dict[str, Any]:
cache_key = "market:stats"
cached = await self.cache.get(cache_key)
if cached:
return cached
global_data = await self._request("coingecko", "/global")
stats = global_data.get("data", {})
market = {
"total_market_cap": stats.get("total_market_cap", {}).get("usd"),
"total_volume_24h": stats.get("total_volume", {}).get("usd"),
"market_cap_change_percentage_24h": stats.get("market_cap_change_percentage_24h_usd"),
"btc_dominance": stats.get("market_cap_percentage", {}).get("btc"),
"eth_dominance": stats.get("market_cap_percentage", {}).get("eth"),
"active_cryptocurrencies": stats.get("active_cryptocurrencies"),
"markets": stats.get("markets"),
"updated_at": stats.get("updated_at"),
}
await self.cache.set(cache_key, market)
return market
async def get_price_history(self, symbol: str, timeframe: str = "7d") -> List[Dict[str, Any]]:
coin_id = await self._coin_id(symbol)
mapping = {"1d": 1, "7d": 7, "30d": 30, "90d": 90}
days = mapping.get(timeframe, 7)
cache_key = f"history:{coin_id}:{days}"
cached = await self.cache.get(cache_key)
if cached:
return cached
data = await self._request(
"coingecko",
f"/coins/{coin_id}/market_chart",
{"vs_currency": "usd", "days": days},
)
prices = [
{
"timestamp": datetime.fromtimestamp(point[0] / 1000, tz=timezone.utc).isoformat(),
"price": round(point[1], 4),
}
for point in data.get("prices", [])
]
await self.cache.set(cache_key, prices)
return prices
async def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]:
"""Return OHLCV data from Binance with caching and validation."""
cache_key = f"ohlcv:{symbol.upper()}:{interval}:{limit}"
cached = await self.cache.get(cache_key)
if cached:
return cached
params = {"symbol": symbol.upper(), "interval": interval, "limit": min(max(limit, 1), 1000)}
data = await self._request("binance", "/klines", params)
candles: List[Dict[str, Any]] = []
for item in data:
try:
candles.append(
{
"timestamp": datetime.fromtimestamp(item[0] / 1000, tz=timezone.utc).isoformat(),
"open": float(item[1]),
"high": float(item[2]),
"low": float(item[3]),
"close": float(item[4]),
"volume": float(item[5]),
}
)
except (TypeError, ValueError): # pragma: no cover - defensive
continue
if not candles:
raise CollectorError(f"No OHLCV data returned for {symbol}", provider="binance")
await self.cache.set(cache_key, candles)
return candles
class NewsCollector:
"""Fetch latest crypto news."""
def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None:
self.registry = registry or ProvidersRegistry()
self.cache = TTLCache(settings.cache_ttl)
self.headers = {"User-Agent": settings.user_agent or USER_AGENT}
self.timeout = 15.0
async def get_latest_news(self, limit: int = 10) -> List[Dict[str, Any]]:
cache_key = f"news:{limit}"
cached = await self.cache.get(cache_key)
if cached:
return cached
url = "https://min-api.cryptocompare.com/data/v2/news/"
params = {"lang": "EN"}
async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client:
response = await client.get(url, params=params)
if response.status_code != 200:
raise CollectorError(f"News provider error: HTTP {response.status_code}")
payload = response.json()
items = []
for entry in payload.get("Data", [])[:limit]:
published = datetime.fromtimestamp(entry.get("published_on", 0), tz=timezone.utc)
items.append(
{
"id": entry.get("id"),
"title": entry.get("title"),
"body": entry.get("body"),
"url": entry.get("url"),
"source": entry.get("source"),
"categories": entry.get("categories"),
"published_at": published.isoformat(),
}
)
await self.cache.set(cache_key, items)
return items
class ProviderStatusCollector:
"""Perform lightweight health checks against configured providers."""
def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None:
self.registry = registry or ProvidersRegistry()
self.cache = TTLCache(max(settings.cache_ttl, 600))
self.headers = {"User-Agent": settings.user_agent or USER_AGENT}
self.timeout = 8.0
async def _check_provider(self, client: httpx.AsyncClient, provider_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
url = data.get("health_check") or data.get("base_url")
start = time.perf_counter()
try:
response = await client.get(url, timeout=self.timeout)
latency = round((time.perf_counter() - start) * 1000, 2)
status = "online" if response.status_code < 400 else "degraded"
return {
"provider_id": provider_id,
"name": data.get("name", provider_id),
"category": data.get("category"),
"status": status,
"status_code": response.status_code,
"latency_ms": latency,
}
except Exception as exc: # pragma: no cover - network heavy
error_msg = str(exc)
error_type = type(exc).__name__
logger.warning("Provider %s health check failed: %s: %s", provider_id, error_type, error_msg)
return {
"provider_id": provider_id,
"name": data.get("name", provider_id),
"category": data.get("category"),
"status": "offline",
"status_code": None,
"latency_ms": None,
"error": str(exc),
}
async def get_providers_status(self) -> List[Dict[str, Any]]:
cached = await self.cache.get("providers_status")
if cached:
return cached
providers = self.registry.providers
if not providers:
return []
results: List[Dict[str, Any]] = []
async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client:
tasks = [self._check_provider(client, pid, data) for pid, data in providers.items()]
for chunk in asyncio.as_completed(tasks):
results.append(await chunk)
await self.cache.set("providers_status", results)
return results
__all__ = [
"CollectorError",
"MarketDataCollector",
"NewsCollector",
"ProviderStatusCollector",
]