|
|
|
|
|
"""
|
|
|
Gap Filling Service - Intelligently fills missing data
|
|
|
Uses AI models first, then fallback to external providers
|
|
|
Priority: HF Models → HF Space API → External Providers
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import time
|
|
|
from typing import Dict, List, Optional, Any
|
|
|
from enum import Enum
|
|
|
from datetime import datetime
|
|
|
import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class GapType(Enum):
|
|
|
"""Types of data gaps that can be detected and filled"""
|
|
|
MISSING_OHLC = "missing_ohlc"
|
|
|
MISSING_DEPTH = "missing_depth"
|
|
|
MISSING_WHALE_DATA = "missing_whale_data"
|
|
|
MISSING_SENTIMENT = "missing_sentiment"
|
|
|
INCOMPLETE_METADATA = "incomplete_metadata"
|
|
|
MISSING_TRANSACTIONS = "missing_transactions"
|
|
|
MISSING_BALANCE = "missing_balance"
|
|
|
|
|
|
|
|
|
class GapFillStrategy(Enum):
|
|
|
"""Strategies for filling gaps"""
|
|
|
AI_MODEL_SYNTHESIS = "ai_model_synthesis"
|
|
|
INTERPOLATION = "interpolation"
|
|
|
EXTERNAL_PROVIDER = "external_provider"
|
|
|
HYBRID = "hybrid"
|
|
|
STATISTICAL_ESTIMATION = "statistical_estimation"
|
|
|
|
|
|
|
|
|
class GapFillerService:
|
|
|
"""Main orchestrator for gap filling operations"""
|
|
|
|
|
|
def __init__(self, model_registry=None, provider_manager=None, database=None):
|
|
|
"""
|
|
|
Initialize gap filler service
|
|
|
|
|
|
Args:
|
|
|
model_registry: AI model registry for ML-based gap filling
|
|
|
provider_manager: Provider manager for external API fallback
|
|
|
database: Database instance for storing gap filling audit logs
|
|
|
"""
|
|
|
self.models = model_registry
|
|
|
self.providers = provider_manager
|
|
|
self.db = database
|
|
|
self.gap_fill_cache = {}
|
|
|
self.audit_log = []
|
|
|
|
|
|
logger.info("GapFillerService initialized")
|
|
|
|
|
|
async def detect_gaps(
|
|
|
self,
|
|
|
data: Dict[str, Any],
|
|
|
required_fields: List[str],
|
|
|
context: Optional[Dict[str, Any]] = None
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Detect all missing/incomplete data in provided dataset
|
|
|
|
|
|
Args:
|
|
|
data: Dataset to analyze for gaps
|
|
|
required_fields: List of required field names
|
|
|
context: Additional context for gap detection (e.g., expected data range)
|
|
|
|
|
|
Returns:
|
|
|
List of detected gaps with recommended strategies
|
|
|
"""
|
|
|
gaps = []
|
|
|
|
|
|
|
|
|
for field in required_fields:
|
|
|
if field not in data or data[field] is None:
|
|
|
gap = {
|
|
|
"gap_type": self._infer_gap_type(field),
|
|
|
"field": field,
|
|
|
"severity": "high",
|
|
|
"recommended_strategy": self._recommend_strategy(field, data),
|
|
|
"context": context or {}
|
|
|
}
|
|
|
gaps.append(gap)
|
|
|
|
|
|
|
|
|
if "timestamps" in data and isinstance(data["timestamps"], list):
|
|
|
missing_timestamps = self._detect_missing_timestamps(data["timestamps"], context)
|
|
|
if missing_timestamps:
|
|
|
gaps.append({
|
|
|
"gap_type": GapType.MISSING_OHLC.value,
|
|
|
"field": "ohlc_data",
|
|
|
"missing_count": len(missing_timestamps),
|
|
|
"missing_timestamps": missing_timestamps,
|
|
|
"severity": "medium",
|
|
|
"recommended_strategy": GapFillStrategy.INTERPOLATION.value
|
|
|
})
|
|
|
|
|
|
|
|
|
if "prices" in data:
|
|
|
price_gaps = self._detect_price_gaps(data["prices"])
|
|
|
if price_gaps:
|
|
|
gaps.extend(price_gaps)
|
|
|
|
|
|
logger.info(f"Detected {len(gaps)} gaps in data")
|
|
|
return gaps
|
|
|
|
|
|
def _infer_gap_type(self, field: str) -> str:
|
|
|
"""Infer gap type from field name"""
|
|
|
if "ohlc" in field.lower() or "price" in field.lower() or "candle" in field.lower():
|
|
|
return GapType.MISSING_OHLC.value
|
|
|
elif "depth" in field.lower() or "orderbook" in field.lower():
|
|
|
return GapType.MISSING_DEPTH.value
|
|
|
elif "whale" in field.lower() or "large_transfer" in field.lower():
|
|
|
return GapType.MISSING_WHALE_DATA.value
|
|
|
elif "sentiment" in field.lower():
|
|
|
return GapType.MISSING_SENTIMENT.value
|
|
|
elif "transaction" in field.lower():
|
|
|
return GapType.MISSING_TRANSACTIONS.value
|
|
|
elif "balance" in field.lower():
|
|
|
return GapType.MISSING_BALANCE.value
|
|
|
else:
|
|
|
return GapType.INCOMPLETE_METADATA.value
|
|
|
|
|
|
def _recommend_strategy(self, field: str, data: Dict[str, Any]) -> str:
|
|
|
"""Recommend best strategy for filling this gap"""
|
|
|
gap_type = self._infer_gap_type(field)
|
|
|
|
|
|
if gap_type == GapType.MISSING_OHLC.value:
|
|
|
|
|
|
if "prices" in data and len(data.get("prices", [])) > 2:
|
|
|
return GapFillStrategy.INTERPOLATION.value
|
|
|
else:
|
|
|
return GapFillStrategy.EXTERNAL_PROVIDER.value
|
|
|
|
|
|
elif gap_type == GapType.MISSING_SENTIMENT.value:
|
|
|
|
|
|
return GapFillStrategy.AI_MODEL_SYNTHESIS.value
|
|
|
|
|
|
elif gap_type == GapType.MISSING_DEPTH.value:
|
|
|
|
|
|
return GapFillStrategy.STATISTICAL_ESTIMATION.value
|
|
|
|
|
|
else:
|
|
|
|
|
|
return GapFillStrategy.EXTERNAL_PROVIDER.value
|
|
|
|
|
|
def _detect_missing_timestamps(
|
|
|
self,
|
|
|
timestamps: List[int],
|
|
|
context: Optional[Dict[str, Any]]
|
|
|
) -> List[int]:
|
|
|
"""Detect missing timestamps in a time series"""
|
|
|
if not timestamps or len(timestamps) < 2:
|
|
|
return []
|
|
|
|
|
|
timestamps = sorted(timestamps)
|
|
|
missing = []
|
|
|
|
|
|
|
|
|
intervals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
|
|
|
expected_interval = min(intervals) if intervals else 60
|
|
|
|
|
|
|
|
|
for i in range(len(timestamps) - 1):
|
|
|
current = timestamps[i]
|
|
|
next_ts = timestamps[i + 1]
|
|
|
diff = next_ts - current
|
|
|
|
|
|
if diff > expected_interval * 1.5:
|
|
|
|
|
|
num_missing = int(diff / expected_interval) - 1
|
|
|
for j in range(1, num_missing + 1):
|
|
|
missing.append(current + j * expected_interval)
|
|
|
|
|
|
return missing[:100]
|
|
|
|
|
|
def _detect_price_gaps(self, prices: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
"""Detect gaps in price data (e.g., missing OHLC fields)"""
|
|
|
gaps = []
|
|
|
required_ohlc_fields = ["open", "high", "low", "close"]
|
|
|
|
|
|
for i, price_data in enumerate(prices):
|
|
|
missing_fields = [f for f in required_ohlc_fields if f not in price_data or price_data[f] is None]
|
|
|
if missing_fields:
|
|
|
gaps.append({
|
|
|
"gap_type": GapType.MISSING_OHLC.value,
|
|
|
"index": i,
|
|
|
"missing_fields": missing_fields,
|
|
|
"severity": "medium",
|
|
|
"recommended_strategy": GapFillStrategy.INTERPOLATION.value
|
|
|
})
|
|
|
|
|
|
return gaps
|
|
|
|
|
|
async def fill_gap(
|
|
|
self,
|
|
|
gap: Dict[str, Any],
|
|
|
data: Dict[str, Any],
|
|
|
context: Optional[Dict[str, Any]] = None
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Fill a single gap using best available strategy
|
|
|
Priority: HF Models → HF Space API → External Providers
|
|
|
|
|
|
Args:
|
|
|
gap: Gap definition from detect_gaps()
|
|
|
data: Original data containing the gap
|
|
|
context: Additional context for gap filling
|
|
|
|
|
|
Returns:
|
|
|
Filled data with metadata about the fill operation
|
|
|
"""
|
|
|
start_time = time.time()
|
|
|
gap_type = gap.get("gap_type")
|
|
|
strategy = gap.get("recommended_strategy")
|
|
|
|
|
|
result = {
|
|
|
"gap": gap,
|
|
|
"filled": False,
|
|
|
"strategy_used": None,
|
|
|
"confidence": 0.0,
|
|
|
"filled_data": None,
|
|
|
"attempts": [],
|
|
|
"execution_time_ms": 0,
|
|
|
"error": None
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
|
|
|
if strategy == GapFillStrategy.AI_MODEL_SYNTHESIS.value and self.models:
|
|
|
attempt = await self._fill_with_ai_model(gap, data, context)
|
|
|
result["attempts"].append(attempt)
|
|
|
|
|
|
if attempt["success"]:
|
|
|
result["filled"] = True
|
|
|
result["strategy_used"] = GapFillStrategy.AI_MODEL_SYNTHESIS.value
|
|
|
result["confidence"] = attempt.get("confidence", 0.7)
|
|
|
result["filled_data"] = attempt["data"]
|
|
|
|
|
|
|
|
|
if not result["filled"] and strategy == GapFillStrategy.INTERPOLATION.value:
|
|
|
attempt = await self._fill_with_interpolation(gap, data, context)
|
|
|
result["attempts"].append(attempt)
|
|
|
|
|
|
if attempt["success"]:
|
|
|
result["filled"] = True
|
|
|
result["strategy_used"] = GapFillStrategy.INTERPOLATION.value
|
|
|
result["confidence"] = attempt.get("confidence", 0.8)
|
|
|
result["filled_data"] = attempt["data"]
|
|
|
|
|
|
|
|
|
if not result["filled"] and strategy == GapFillStrategy.STATISTICAL_ESTIMATION.value:
|
|
|
attempt = await self._fill_with_statistics(gap, data, context)
|
|
|
result["attempts"].append(attempt)
|
|
|
|
|
|
if attempt["success"]:
|
|
|
result["filled"] = True
|
|
|
result["strategy_used"] = GapFillStrategy.STATISTICAL_ESTIMATION.value
|
|
|
result["confidence"] = attempt.get("confidence", 0.65)
|
|
|
result["filled_data"] = attempt["data"]
|
|
|
|
|
|
|
|
|
if not result["filled"] and self.providers:
|
|
|
attempt = await self._fill_with_external_provider(gap, data, context)
|
|
|
result["attempts"].append(attempt)
|
|
|
|
|
|
if attempt["success"]:
|
|
|
result["filled"] = True
|
|
|
result["strategy_used"] = GapFillStrategy.EXTERNAL_PROVIDER.value
|
|
|
result["confidence"] = attempt.get("confidence", 0.9)
|
|
|
result["filled_data"] = attempt["data"]
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error filling gap: {e}")
|
|
|
result["error"] = str(e)
|
|
|
|
|
|
result["execution_time_ms"] = int((time.time() - start_time) * 1000)
|
|
|
|
|
|
|
|
|
await self._log_gap_fill_attempt(result)
|
|
|
|
|
|
return result
|
|
|
|
|
|
async def _fill_with_ai_model(
|
|
|
self,
|
|
|
gap: Dict[str, Any],
|
|
|
data: Dict[str, Any],
|
|
|
context: Optional[Dict[str, Any]]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Fill gap using AI models"""
|
|
|
try:
|
|
|
|
|
|
from ai_models import get_gap_filler
|
|
|
gap_filler = get_gap_filler()
|
|
|
|
|
|
gap_type = gap.get("gap_type")
|
|
|
|
|
|
if gap_type == GapType.MISSING_SENTIMENT.value:
|
|
|
|
|
|
text = context.get("text") if context else ""
|
|
|
if not text and "text" in data:
|
|
|
text = data["text"]
|
|
|
|
|
|
if text:
|
|
|
from ai_models import ensemble_crypto_sentiment
|
|
|
sentiment = ensemble_crypto_sentiment(text)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"data": sentiment,
|
|
|
"confidence": sentiment.get("confidence", 0.7),
|
|
|
"method": "ai_sentiment_model"
|
|
|
}
|
|
|
|
|
|
elif gap_type == GapType.MISSING_OHLC.value:
|
|
|
|
|
|
symbol = context.get("symbol") if context else "BTC"
|
|
|
existing_data = data.get("prices", [])
|
|
|
missing_timestamps = gap.get("missing_timestamps", [])
|
|
|
|
|
|
if existing_data and missing_timestamps:
|
|
|
result = await gap_filler.fill_missing_ohlc(symbol, existing_data, missing_timestamps)
|
|
|
if result["status"] == "success":
|
|
|
return {
|
|
|
"success": True,
|
|
|
"data": result["filled_data"],
|
|
|
"confidence": result["average_confidence"],
|
|
|
"method": "ai_ohlc_interpolation"
|
|
|
}
|
|
|
|
|
|
return {"success": False, "error": "No suitable AI model found"}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"AI model fill failed: {e}")
|
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
async def _fill_with_interpolation(
|
|
|
self,
|
|
|
gap: Dict[str, Any],
|
|
|
data: Dict[str, Any],
|
|
|
context: Optional[Dict[str, Any]]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Fill gap using interpolation"""
|
|
|
try:
|
|
|
from ai_models import get_gap_filler
|
|
|
gap_filler = get_gap_filler()
|
|
|
|
|
|
symbol = context.get("symbol") if context else "UNKNOWN"
|
|
|
existing_data = data.get("prices", [])
|
|
|
missing_timestamps = gap.get("missing_timestamps", [])
|
|
|
|
|
|
if not existing_data or not missing_timestamps:
|
|
|
return {"success": False, "error": "Insufficient data for interpolation"}
|
|
|
|
|
|
result = await gap_filler.fill_missing_ohlc(symbol, existing_data, missing_timestamps)
|
|
|
|
|
|
if result["status"] == "success":
|
|
|
return {
|
|
|
"success": True,
|
|
|
"data": result["filled_data"],
|
|
|
"confidence": result["average_confidence"],
|
|
|
"method": "linear_interpolation"
|
|
|
}
|
|
|
|
|
|
return {"success": False, "error": result.get("message", "Interpolation failed")}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Interpolation fill failed: {e}")
|
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
async def _fill_with_statistics(
|
|
|
self,
|
|
|
gap: Dict[str, Any],
|
|
|
data: Dict[str, Any],
|
|
|
context: Optional[Dict[str, Any]]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Fill gap using statistical estimation"""
|
|
|
try:
|
|
|
from ai_models import get_gap_filler
|
|
|
gap_filler = get_gap_filler()
|
|
|
|
|
|
gap_type = gap.get("gap_type")
|
|
|
|
|
|
if gap_type == GapType.MISSING_DEPTH.value:
|
|
|
|
|
|
symbol = context.get("symbol") if context else "BTCUSDT"
|
|
|
mid_price = data.get("price") or context.get("price") if context else 50000
|
|
|
|
|
|
result = await gap_filler.estimate_orderbook_depth(symbol, mid_price)
|
|
|
|
|
|
if result["status"] == "success":
|
|
|
return {
|
|
|
"success": True,
|
|
|
"data": result,
|
|
|
"confidence": result["confidence"],
|
|
|
"method": "statistical_orderbook_estimation"
|
|
|
}
|
|
|
|
|
|
return {"success": False, "error": "No suitable statistical method found"}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Statistical fill failed: {e}")
|
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
async def _fill_with_external_provider(
|
|
|
self,
|
|
|
gap: Dict[str, Any],
|
|
|
data: Dict[str, Any],
|
|
|
context: Optional[Dict[str, Any]]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Fill gap using external provider API"""
|
|
|
try:
|
|
|
if not self.providers:
|
|
|
return {"success": False, "error": "No provider manager available"}
|
|
|
|
|
|
gap_type = gap.get("gap_type")
|
|
|
|
|
|
|
|
|
if gap_type in [GapType.MISSING_OHLC.value, GapType.INCOMPLETE_METADATA.value]:
|
|
|
|
|
|
provider = self.providers.get_provider("coinmarketcap")
|
|
|
if provider and provider.is_available:
|
|
|
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"data": {"source": "coinmarketcap", "provider_used": True},
|
|
|
"confidence": 0.9,
|
|
|
"method": "external_coinmarketcap"
|
|
|
}
|
|
|
|
|
|
elif gap_type == GapType.MISSING_TRANSACTIONS.value:
|
|
|
|
|
|
chain = context.get("chain") if context else "ethereum"
|
|
|
if chain == "ethereum":
|
|
|
provider = self.providers.get_provider("etherscan")
|
|
|
elif chain == "bsc":
|
|
|
provider = self.providers.get_provider("bscscan")
|
|
|
elif chain == "tron":
|
|
|
provider = self.providers.get_provider("tronscan")
|
|
|
else:
|
|
|
provider = None
|
|
|
|
|
|
if provider and provider.is_available:
|
|
|
return {
|
|
|
"success": True,
|
|
|
"data": {"source": provider.name, "provider_used": True},
|
|
|
"confidence": 0.9,
|
|
|
"method": f"external_{provider.provider_id}"
|
|
|
}
|
|
|
|
|
|
return {"success": False, "error": "No suitable provider found"}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"External provider fill failed: {e}")
|
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
async def fill_all_gaps(
|
|
|
self,
|
|
|
data: Dict[str, Any],
|
|
|
required_fields: List[str],
|
|
|
context: Optional[Dict[str, Any]] = None
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Detect and fill all gaps in one operation
|
|
|
|
|
|
Returns:
|
|
|
Enriched data with metadata about what was filled
|
|
|
"""
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
|
gaps = await self.detect_gaps(data, required_fields, context)
|
|
|
|
|
|
|
|
|
fill_results = []
|
|
|
for gap in gaps:
|
|
|
result = await self.fill_gap(gap, data, context)
|
|
|
fill_results.append(result)
|
|
|
|
|
|
|
|
|
if result["filled"] and result["filled_data"]:
|
|
|
|
|
|
field = gap.get("field")
|
|
|
if field:
|
|
|
data[field] = result["filled_data"]
|
|
|
|
|
|
execution_time = int((time.time() - start_time) * 1000)
|
|
|
|
|
|
|
|
|
gaps_detected = len(gaps)
|
|
|
gaps_filled = sum(1 for r in fill_results if r["filled"])
|
|
|
avg_confidence = sum(r["confidence"] for r in fill_results) / gaps_detected if gaps_detected > 0 else 0
|
|
|
|
|
|
return {
|
|
|
"status": "success",
|
|
|
"original_data": data,
|
|
|
"enriched_data": data,
|
|
|
"gaps_detected": gaps_detected,
|
|
|
"gaps_filled": gaps_filled,
|
|
|
"fill_rate": gaps_filled / gaps_detected if gaps_detected > 0 else 0,
|
|
|
"fill_results": fill_results,
|
|
|
"average_confidence": avg_confidence,
|
|
|
"execution_time_ms": execution_time,
|
|
|
"metadata": {
|
|
|
"strategies_used": list(set(r["strategy_used"] for r in fill_results if r["strategy_used"])),
|
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
async def _log_gap_fill_attempt(self, result: Dict[str, Any]):
|
|
|
"""Log gap fill attempt for audit trail"""
|
|
|
log_entry = {
|
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
|
"gap_type": result["gap"].get("gap_type"),
|
|
|
"field": result["gap"].get("field"),
|
|
|
"filled": result["filled"],
|
|
|
"strategy_used": result["strategy_used"],
|
|
|
"confidence": result["confidence"],
|
|
|
"execution_time_ms": result["execution_time_ms"],
|
|
|
"attempts_count": len(result["attempts"])
|
|
|
}
|
|
|
|
|
|
self.audit_log.append(log_entry)
|
|
|
|
|
|
|
|
|
if len(self.audit_log) > 1000:
|
|
|
self.audit_log = self.audit_log[-1000:]
|
|
|
|
|
|
|
|
|
if self.db:
|
|
|
try:
|
|
|
|
|
|
pass
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to save audit log to database: {e}")
|
|
|
|
|
|
def get_audit_log(self, limit: int = 100) -> List[Dict[str, Any]]:
|
|
|
"""Get recent gap filling audit logs"""
|
|
|
return self.audit_log[-limit:]
|
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
|
"""Get gap filling statistics"""
|
|
|
if not self.audit_log:
|
|
|
return {
|
|
|
"total_attempts": 0,
|
|
|
"success_rate": 0,
|
|
|
"average_confidence": 0,
|
|
|
"average_execution_time_ms": 0
|
|
|
}
|
|
|
|
|
|
total = len(self.audit_log)
|
|
|
successful = sum(1 for log in self.audit_log if log["filled"])
|
|
|
avg_confidence = sum(log["confidence"] for log in self.audit_log) / total
|
|
|
avg_time = sum(log["execution_time_ms"] for log in self.audit_log) / total
|
|
|
|
|
|
|
|
|
strategy_counts = {}
|
|
|
for log in self.audit_log:
|
|
|
strategy = log.get("strategy_used")
|
|
|
if strategy:
|
|
|
strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1
|
|
|
|
|
|
return {
|
|
|
"total_attempts": total,
|
|
|
"successful_fills": successful,
|
|
|
"success_rate": successful / total if total > 0 else 0,
|
|
|
"average_confidence": avg_confidence,
|
|
|
"average_execution_time_ms": avg_time,
|
|
|
"strategies_used": strategy_counts
|
|
|
}
|
|
|
|