Really-amin's picture
Upload 577 files
b190b45 verified
#!/usr/bin/env python3
"""
Gap Filling Service - Intelligently fills missing data
Uses AI models first, then fallback to external providers
Priority: HF Models → HF Space API → External Providers
"""
import asyncio
import time
from typing import Dict, List, Optional, Any
from enum import Enum
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class GapType(Enum):
"""Types of data gaps that can be detected and filled"""
MISSING_OHLC = "missing_ohlc"
MISSING_DEPTH = "missing_depth"
MISSING_WHALE_DATA = "missing_whale_data"
MISSING_SENTIMENT = "missing_sentiment"
INCOMPLETE_METADATA = "incomplete_metadata"
MISSING_TRANSACTIONS = "missing_transactions"
MISSING_BALANCE = "missing_balance"
class GapFillStrategy(Enum):
"""Strategies for filling gaps"""
AI_MODEL_SYNTHESIS = "ai_model_synthesis"
INTERPOLATION = "interpolation"
EXTERNAL_PROVIDER = "external_provider"
HYBRID = "hybrid"
STATISTICAL_ESTIMATION = "statistical_estimation"
class GapFillerService:
"""Main orchestrator for gap filling operations"""
def __init__(self, model_registry=None, provider_manager=None, database=None):
"""
Initialize gap filler service
Args:
model_registry: AI model registry for ML-based gap filling
provider_manager: Provider manager for external API fallback
database: Database instance for storing gap filling audit logs
"""
self.models = model_registry
self.providers = provider_manager
self.db = database
self.gap_fill_cache = {}
self.audit_log = []
logger.info("GapFillerService initialized")
async def detect_gaps(
self,
data: Dict[str, Any],
required_fields: List[str],
context: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Detect all missing/incomplete data in provided dataset
Args:
data: Dataset to analyze for gaps
required_fields: List of required field names
context: Additional context for gap detection (e.g., expected data range)
Returns:
List of detected gaps with recommended strategies
"""
gaps = []
# Check for missing required fields
for field in required_fields:
if field not in data or data[field] is None:
gap = {
"gap_type": self._infer_gap_type(field),
"field": field,
"severity": "high",
"recommended_strategy": self._recommend_strategy(field, data),
"context": context or {}
}
gaps.append(gap)
# Check for incomplete time series data
if "timestamps" in data and isinstance(data["timestamps"], list):
missing_timestamps = self._detect_missing_timestamps(data["timestamps"], context)
if missing_timestamps:
gaps.append({
"gap_type": GapType.MISSING_OHLC.value,
"field": "ohlc_data",
"missing_count": len(missing_timestamps),
"missing_timestamps": missing_timestamps,
"severity": "medium",
"recommended_strategy": GapFillStrategy.INTERPOLATION.value
})
# Check for incomplete price data
if "prices" in data:
price_gaps = self._detect_price_gaps(data["prices"])
if price_gaps:
gaps.extend(price_gaps)
logger.info(f"Detected {len(gaps)} gaps in data")
return gaps
def _infer_gap_type(self, field: str) -> str:
"""Infer gap type from field name"""
if "ohlc" in field.lower() or "price" in field.lower() or "candle" in field.lower():
return GapType.MISSING_OHLC.value
elif "depth" in field.lower() or "orderbook" in field.lower():
return GapType.MISSING_DEPTH.value
elif "whale" in field.lower() or "large_transfer" in field.lower():
return GapType.MISSING_WHALE_DATA.value
elif "sentiment" in field.lower():
return GapType.MISSING_SENTIMENT.value
elif "transaction" in field.lower():
return GapType.MISSING_TRANSACTIONS.value
elif "balance" in field.lower():
return GapType.MISSING_BALANCE.value
else:
return GapType.INCOMPLETE_METADATA.value
def _recommend_strategy(self, field: str, data: Dict[str, Any]) -> str:
"""Recommend best strategy for filling this gap"""
gap_type = self._infer_gap_type(field)
if gap_type == GapType.MISSING_OHLC.value:
# If we have surrounding data, use interpolation
if "prices" in data and len(data.get("prices", [])) > 2:
return GapFillStrategy.INTERPOLATION.value
else:
return GapFillStrategy.EXTERNAL_PROVIDER.value
elif gap_type == GapType.MISSING_SENTIMENT.value:
# Use AI models for sentiment
return GapFillStrategy.AI_MODEL_SYNTHESIS.value
elif gap_type == GapType.MISSING_DEPTH.value:
# Use statistical estimation
return GapFillStrategy.STATISTICAL_ESTIMATION.value
else:
# Default to external provider
return GapFillStrategy.EXTERNAL_PROVIDER.value
def _detect_missing_timestamps(
self,
timestamps: List[int],
context: Optional[Dict[str, Any]]
) -> List[int]:
"""Detect missing timestamps in a time series"""
if not timestamps or len(timestamps) < 2:
return []
timestamps = sorted(timestamps)
missing = []
# Determine expected interval (e.g., 1 minute, 5 minutes, 1 hour)
intervals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
expected_interval = min(intervals) if intervals else 60
# Find gaps
for i in range(len(timestamps) - 1):
current = timestamps[i]
next_ts = timestamps[i + 1]
diff = next_ts - current
if diff > expected_interval * 1.5: # Allow 50% tolerance
# Generate missing timestamps
num_missing = int(diff / expected_interval) - 1
for j in range(1, num_missing + 1):
missing.append(current + j * expected_interval)
return missing[:100] # Limit to 100 missing points
def _detect_price_gaps(self, prices: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Detect gaps in price data (e.g., missing OHLC fields)"""
gaps = []
required_ohlc_fields = ["open", "high", "low", "close"]
for i, price_data in enumerate(prices):
missing_fields = [f for f in required_ohlc_fields if f not in price_data or price_data[f] is None]
if missing_fields:
gaps.append({
"gap_type": GapType.MISSING_OHLC.value,
"index": i,
"missing_fields": missing_fields,
"severity": "medium",
"recommended_strategy": GapFillStrategy.INTERPOLATION.value
})
return gaps
async def fill_gap(
self,
gap: Dict[str, Any],
data: Dict[str, Any],
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Fill a single gap using best available strategy
Priority: HF Models → HF Space API → External Providers
Args:
gap: Gap definition from detect_gaps()
data: Original data containing the gap
context: Additional context for gap filling
Returns:
Filled data with metadata about the fill operation
"""
start_time = time.time()
gap_type = gap.get("gap_type")
strategy = gap.get("recommended_strategy")
result = {
"gap": gap,
"filled": False,
"strategy_used": None,
"confidence": 0.0,
"filled_data": None,
"attempts": [],
"execution_time_ms": 0,
"error": None
}
try:
# Strategy 1: AI Model Synthesis (Priority 1)
if strategy == GapFillStrategy.AI_MODEL_SYNTHESIS.value and self.models:
attempt = await self._fill_with_ai_model(gap, data, context)
result["attempts"].append(attempt)
if attempt["success"]:
result["filled"] = True
result["strategy_used"] = GapFillStrategy.AI_MODEL_SYNTHESIS.value
result["confidence"] = attempt.get("confidence", 0.7)
result["filled_data"] = attempt["data"]
# Strategy 2: Interpolation (for time series)
if not result["filled"] and strategy == GapFillStrategy.INTERPOLATION.value:
attempt = await self._fill_with_interpolation(gap, data, context)
result["attempts"].append(attempt)
if attempt["success"]:
result["filled"] = True
result["strategy_used"] = GapFillStrategy.INTERPOLATION.value
result["confidence"] = attempt.get("confidence", 0.8)
result["filled_data"] = attempt["data"]
# Strategy 3: Statistical Estimation
if not result["filled"] and strategy == GapFillStrategy.STATISTICAL_ESTIMATION.value:
attempt = await self._fill_with_statistics(gap, data, context)
result["attempts"].append(attempt)
if attempt["success"]:
result["filled"] = True
result["strategy_used"] = GapFillStrategy.STATISTICAL_ESTIMATION.value
result["confidence"] = attempt.get("confidence", 0.65)
result["filled_data"] = attempt["data"]
# Strategy 4: External Provider (Fallback)
if not result["filled"] and self.providers:
attempt = await self._fill_with_external_provider(gap, data, context)
result["attempts"].append(attempt)
if attempt["success"]:
result["filled"] = True
result["strategy_used"] = GapFillStrategy.EXTERNAL_PROVIDER.value
result["confidence"] = attempt.get("confidence", 0.9)
result["filled_data"] = attempt["data"]
except Exception as e:
logger.error(f"Error filling gap: {e}")
result["error"] = str(e)
result["execution_time_ms"] = int((time.time() - start_time) * 1000)
# Log attempt
await self._log_gap_fill_attempt(result)
return result
async def _fill_with_ai_model(
self,
gap: Dict[str, Any],
data: Dict[str, Any],
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Fill gap using AI models"""
try:
# Use the gap filler from ai_models
from ai_models import get_gap_filler
gap_filler = get_gap_filler()
gap_type = gap.get("gap_type")
if gap_type == GapType.MISSING_SENTIMENT.value:
# Use sentiment analysis model
text = context.get("text") if context else ""
if not text and "text" in data:
text = data["text"]
if text:
from ai_models import ensemble_crypto_sentiment
sentiment = ensemble_crypto_sentiment(text)
return {
"success": True,
"data": sentiment,
"confidence": sentiment.get("confidence", 0.7),
"method": "ai_sentiment_model"
}
elif gap_type == GapType.MISSING_OHLC.value:
# Use OHLC interpolation
symbol = context.get("symbol") if context else "BTC"
existing_data = data.get("prices", [])
missing_timestamps = gap.get("missing_timestamps", [])
if existing_data and missing_timestamps:
result = await gap_filler.fill_missing_ohlc(symbol, existing_data, missing_timestamps)
if result["status"] == "success":
return {
"success": True,
"data": result["filled_data"],
"confidence": result["average_confidence"],
"method": "ai_ohlc_interpolation"
}
return {"success": False, "error": "No suitable AI model found"}
except Exception as e:
logger.warning(f"AI model fill failed: {e}")
return {"success": False, "error": str(e)}
async def _fill_with_interpolation(
self,
gap: Dict[str, Any],
data: Dict[str, Any],
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Fill gap using interpolation"""
try:
from ai_models import get_gap_filler
gap_filler = get_gap_filler()
symbol = context.get("symbol") if context else "UNKNOWN"
existing_data = data.get("prices", [])
missing_timestamps = gap.get("missing_timestamps", [])
if not existing_data or not missing_timestamps:
return {"success": False, "error": "Insufficient data for interpolation"}
result = await gap_filler.fill_missing_ohlc(symbol, existing_data, missing_timestamps)
if result["status"] == "success":
return {
"success": True,
"data": result["filled_data"],
"confidence": result["average_confidence"],
"method": "linear_interpolation"
}
return {"success": False, "error": result.get("message", "Interpolation failed")}
except Exception as e:
logger.warning(f"Interpolation fill failed: {e}")
return {"success": False, "error": str(e)}
async def _fill_with_statistics(
self,
gap: Dict[str, Any],
data: Dict[str, Any],
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Fill gap using statistical estimation"""
try:
from ai_models import get_gap_filler
gap_filler = get_gap_filler()
gap_type = gap.get("gap_type")
if gap_type == GapType.MISSING_DEPTH.value:
# Estimate orderbook depth
symbol = context.get("symbol") if context else "BTCUSDT"
mid_price = data.get("price") or context.get("price") if context else 50000
result = await gap_filler.estimate_orderbook_depth(symbol, mid_price)
if result["status"] == "success":
return {
"success": True,
"data": result,
"confidence": result["confidence"],
"method": "statistical_orderbook_estimation"
}
return {"success": False, "error": "No suitable statistical method found"}
except Exception as e:
logger.warning(f"Statistical fill failed: {e}")
return {"success": False, "error": str(e)}
async def _fill_with_external_provider(
self,
gap: Dict[str, Any],
data: Dict[str, Any],
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Fill gap using external provider API"""
try:
if not self.providers:
return {"success": False, "error": "No provider manager available"}
gap_type = gap.get("gap_type")
# Map gap type to provider category
if gap_type in [GapType.MISSING_OHLC.value, GapType.INCOMPLETE_METADATA.value]:
# Use CoinMarketCap for market data
provider = self.providers.get_provider("coinmarketcap")
if provider and provider.is_available:
# This would call real API
# For now, return placeholder
return {
"success": True,
"data": {"source": "coinmarketcap", "provider_used": True},
"confidence": 0.9,
"method": "external_coinmarketcap"
}
elif gap_type == GapType.MISSING_TRANSACTIONS.value:
# Use blockchain explorer
chain = context.get("chain") if context else "ethereum"
if chain == "ethereum":
provider = self.providers.get_provider("etherscan")
elif chain == "bsc":
provider = self.providers.get_provider("bscscan")
elif chain == "tron":
provider = self.providers.get_provider("tronscan")
else:
provider = None
if provider and provider.is_available:
return {
"success": True,
"data": {"source": provider.name, "provider_used": True},
"confidence": 0.9,
"method": f"external_{provider.provider_id}"
}
return {"success": False, "error": "No suitable provider found"}
except Exception as e:
logger.warning(f"External provider fill failed: {e}")
return {"success": False, "error": str(e)}
async def fill_all_gaps(
self,
data: Dict[str, Any],
required_fields: List[str],
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Detect and fill all gaps in one operation
Returns:
Enriched data with metadata about what was filled
"""
start_time = time.time()
# Detect gaps
gaps = await self.detect_gaps(data, required_fields, context)
# Fill each gap
fill_results = []
for gap in gaps:
result = await self.fill_gap(gap, data, context)
fill_results.append(result)
# Update data with filled values
if result["filled"] and result["filled_data"]:
# Merge filled data into original data
field = gap.get("field")
if field:
data[field] = result["filled_data"]
execution_time = int((time.time() - start_time) * 1000)
# Calculate statistics
gaps_detected = len(gaps)
gaps_filled = sum(1 for r in fill_results if r["filled"])
avg_confidence = sum(r["confidence"] for r in fill_results) / gaps_detected if gaps_detected > 0 else 0
return {
"status": "success",
"original_data": data,
"enriched_data": data,
"gaps_detected": gaps_detected,
"gaps_filled": gaps_filled,
"fill_rate": gaps_filled / gaps_detected if gaps_detected > 0 else 0,
"fill_results": fill_results,
"average_confidence": avg_confidence,
"execution_time_ms": execution_time,
"metadata": {
"strategies_used": list(set(r["strategy_used"] for r in fill_results if r["strategy_used"])),
"timestamp": datetime.utcnow().isoformat()
}
}
async def _log_gap_fill_attempt(self, result: Dict[str, Any]):
"""Log gap fill attempt for audit trail"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"gap_type": result["gap"].get("gap_type"),
"field": result["gap"].get("field"),
"filled": result["filled"],
"strategy_used": result["strategy_used"],
"confidence": result["confidence"],
"execution_time_ms": result["execution_time_ms"],
"attempts_count": len(result["attempts"])
}
self.audit_log.append(log_entry)
# Keep only last 1000 entries
if len(self.audit_log) > 1000:
self.audit_log = self.audit_log[-1000:]
# Save to database if available
if self.db:
try:
# This would save to gap_filling_audit table
pass
except Exception as e:
logger.warning(f"Failed to save audit log to database: {e}")
def get_audit_log(self, limit: int = 100) -> List[Dict[str, Any]]:
"""Get recent gap filling audit logs"""
return self.audit_log[-limit:]
def get_statistics(self) -> Dict[str, Any]:
"""Get gap filling statistics"""
if not self.audit_log:
return {
"total_attempts": 0,
"success_rate": 0,
"average_confidence": 0,
"average_execution_time_ms": 0
}
total = len(self.audit_log)
successful = sum(1 for log in self.audit_log if log["filled"])
avg_confidence = sum(log["confidence"] for log in self.audit_log) / total
avg_time = sum(log["execution_time_ms"] for log in self.audit_log) / total
# Count by strategy
strategy_counts = {}
for log in self.audit_log:
strategy = log.get("strategy_used")
if strategy:
strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1
return {
"total_attempts": total,
"successful_fills": successful,
"success_rate": successful / total if total > 0 else 0,
"average_confidence": avg_confidence,
"average_execution_time_ms": avg_time,
"strategies_used": strategy_counts
}