import json import logging from copy import deepcopy from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional class LocalResourceService: """Centralized loader for the unified fallback registry.""" def __init__(self, resource_path: Path): self.resource_path = Path(resource_path) self._raw_data: Optional[Dict[str, Any]] = None self._assets: Dict[str, Dict[str, Any]] = {} self._market_overview: Dict[str, Any] = {} self._logger = logging.getLogger(__name__) # --------------------------------------------------------------------- # # Loading helpers # --------------------------------------------------------------------- # def _ensure_loaded(self) -> None: if self._raw_data is not None: return try: with self.resource_path.open("r", encoding="utf-8") as handle: data = json.load(handle) except FileNotFoundError: self._logger.warning("Fallback registry %s not found", self.resource_path) data = {} except json.JSONDecodeError as exc: self._logger.error("Invalid fallback registry JSON: %s", exc) data = {} fallback_data = data.get("fallback_data") or {} assets = fallback_data.get("assets") or {} normalized_assets: Dict[str, Dict[str, Any]] = {} for key, details in assets.items(): symbol = str(details.get("symbol") or key).upper() asset_copy = deepcopy(details) asset_copy["symbol"] = symbol normalized_assets[symbol] = asset_copy self._raw_data = data self._assets = normalized_assets self._market_overview = deepcopy(fallback_data.get("market_overview") or {}) def refresh(self) -> None: """Force reload from disk (used in tests).""" self._raw_data = None self._assets = {} self._market_overview = {} self._ensure_loaded() # --------------------------------------------------------------------- # # Registry level helpers # --------------------------------------------------------------------- # def get_registry(self) -> Dict[str, Any]: self._ensure_loaded() return deepcopy(self._raw_data or {}) def get_supported_symbols(self) -> List[str]: self._ensure_loaded() return sorted(self._assets.keys()) def has_fallback_data(self) -> bool: self._ensure_loaded() return bool(self._assets) # --------------------------------------------------------------------- # # Market data helpers # --------------------------------------------------------------------- # def _asset_to_market_record(self, asset: Dict[str, Any]) -> Dict[str, Any]: price = asset.get("price", {}) return { "id": asset.get("slug") or asset.get("symbol", "").lower(), "symbol": asset.get("symbol"), "name": asset.get("name"), "current_price": price.get("current_price"), "market_cap": price.get("market_cap"), "market_cap_rank": asset.get("market_cap_rank"), "total_volume": price.get("total_volume"), "price_change_24h": price.get("price_change_24h"), "price_change_percentage_24h": price.get("price_change_percentage_24h"), "high_24h": price.get("high_24h"), "low_24h": price.get("low_24h"), "last_updated": price.get("last_updated"), } def get_top_prices(self, limit: int = 10) -> List[Dict[str, Any]]: self._ensure_loaded() if not self._assets: return [] sorted_assets = sorted( self._assets.values(), key=lambda x: (x.get("market_cap_rank") or 9999, -(x.get("price", {}).get("market_cap") or 0)), ) selected = sorted_assets[: max(1, limit)] return [self._asset_to_market_record(asset) for asset in selected] def get_prices_for_symbols(self, symbols: List[str]) -> List[Dict[str, Any]]: self._ensure_loaded() if not symbols or not self._assets: return [] results: List[Dict[str, Any]] = [] for raw_symbol in symbols: symbol = str(raw_symbol or "").upper() asset = self._assets.get(symbol) if asset: results.append(self._asset_to_market_record(asset)) return results def get_ticker_snapshot(self, symbol: str) -> Optional[Dict[str, Any]]: self._ensure_loaded() asset = self._assets.get(str(symbol or "").upper()) if not asset: return None price = asset.get("price", {}) return { "symbol": asset.get("symbol"), "price": price.get("current_price"), "price_change_24h": price.get("price_change_24h"), "price_change_percent_24h": price.get("price_change_percentage_24h"), "high_24h": price.get("high_24h"), "low_24h": price.get("low_24h"), "volume_24h": price.get("total_volume"), "quote_volume_24h": price.get("total_volume"), } def get_market_overview(self) -> Dict[str, Any]: self._ensure_loaded() if not self._assets: return {} overview = deepcopy(self._market_overview) if not overview: total_market_cap = sum( (asset.get("price", {}) or {}).get("market_cap") or 0 for asset in self._assets.values() ) total_volume = sum( (asset.get("price", {}) or {}).get("total_volume") or 0 for asset in self._assets.values() ) btc = self._assets.get("BTC", {}) btc_cap = (btc.get("price", {}) or {}).get("market_cap") or 0 overview = { "total_market_cap": total_market_cap, "total_volume_24h": total_volume, "btc_dominance": (btc_cap / total_market_cap * 100) if total_market_cap else 0, "active_cryptocurrencies": len(self._assets), "markets": 500, "market_cap_change_percentage_24h": 0, } # Enrich with derived leaderboards gainers = sorted( self._assets.values(), key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, reverse=True, )[:5] losers = sorted( self._assets.values(), key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, )[:5] volumes = sorted( self._assets.values(), key=lambda asset: (asset.get("price", {}) or {}).get("total_volume") or 0, reverse=True, )[:5] overview["top_gainers"] = [self._asset_to_market_record(asset) for asset in gainers] overview["top_losers"] = [self._asset_to_market_record(asset) for asset in losers] overview["top_by_volume"] = [self._asset_to_market_record(asset) for asset in volumes] overview["timestamp"] = overview.get("timestamp") or datetime.utcnow().isoformat() return overview def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: self._ensure_loaded() asset = self._assets.get(str(symbol or "").upper()) if not asset: return [] ohlcv = (asset.get("ohlcv") or {}).get(interval) or [] if not ohlcv and interval != "1h": # Provide 1h data for other intervals when nothing else is present ohlcv = (asset.get("ohlcv") or {}).get("1h") or [] if limit and ohlcv: return deepcopy(ohlcv[-limit:]) return deepcopy(ohlcv) # --------------------------------------------------------------------- # # Convenience helpers for testing / diagnostics # --------------------------------------------------------------------- # def describe(self) -> Dict[str, Any]: """Simple snapshot used in diagnostics/tests.""" self._ensure_loaded() return { "resource_path": str(self.resource_path), "assets": len(self._assets), "supported_symbols": self.get_supported_symbols(), }