| """ |
| Retry utilities for SPARKNET |
| Provides robust retry mechanisms for LLM calls and external services |
| Following FAANG best practices for fault tolerance |
| """ |
|
|
| import asyncio |
| from functools import wraps |
| from typing import Callable, Type, Tuple, Optional, Any |
| from loguru import logger |
| from tenacity import ( |
| retry, |
| stop_after_attempt, |
| wait_exponential, |
| retry_if_exception_type, |
| before_sleep_log, |
| RetryError, |
| ) |
|
|
|
|
| |
| TRANSIENT_EXCEPTIONS: Tuple[Type[Exception], ...] = ( |
| ConnectionError, |
| TimeoutError, |
| asyncio.TimeoutError, |
| ) |
|
|
|
|
| def with_retry( |
| max_attempts: int = 3, |
| min_wait: float = 1.0, |
| max_wait: float = 60.0, |
| exceptions: Optional[Tuple[Type[Exception], ...]] = None, |
| ): |
| """ |
| Decorator for adding retry logic to functions. |
| |
| Uses exponential backoff with jitter for optimal retry behavior. |
| |
| Args: |
| max_attempts: Maximum number of retry attempts |
| min_wait: Minimum wait time between retries (seconds) |
| max_wait: Maximum wait time between retries (seconds) |
| exceptions: Tuple of exception types to retry on |
| |
| Returns: |
| Decorated function with retry logic |
| |
| Example: |
| @with_retry(max_attempts=3) |
| async def call_llm(prompt: str) -> str: |
| ... |
| """ |
| retry_exceptions = exceptions or TRANSIENT_EXCEPTIONS |
|
|
| def decorator(func: Callable) -> Callable: |
| @wraps(func) |
| @retry( |
| stop=stop_after_attempt(max_attempts), |
| wait=wait_exponential(multiplier=1, min=min_wait, max=max_wait), |
| retry=retry_if_exception_type(retry_exceptions), |
| before_sleep=before_sleep_log(logger, log_level="WARNING"), |
| reraise=True, |
| ) |
| async def async_wrapper(*args, **kwargs): |
| return await func(*args, **kwargs) |
|
|
| @wraps(func) |
| @retry( |
| stop=stop_after_attempt(max_attempts), |
| wait=wait_exponential(multiplier=1, min=min_wait, max=max_wait), |
| retry=retry_if_exception_type(retry_exceptions), |
| before_sleep=before_sleep_log(logger, log_level="WARNING"), |
| reraise=True, |
| ) |
| def sync_wrapper(*args, **kwargs): |
| return func(*args, **kwargs) |
|
|
| if asyncio.iscoroutinefunction(func): |
| return async_wrapper |
| return sync_wrapper |
|
|
| return decorator |
|
|
|
|
| def with_fallback( |
| fallback_value: Any, |
| exceptions: Optional[Tuple[Type[Exception], ...]] = None, |
| log_level: str = "WARNING", |
| ): |
| """ |
| Decorator that returns a fallback value on exception. |
| |
| Args: |
| fallback_value: Value to return if function raises |
| exceptions: Exception types to catch (default: all) |
| log_level: Log level for exception logging |
| |
| Returns: |
| Decorated function with fallback behavior |
| |
| Example: |
| @with_fallback(fallback_value=[], exceptions=(ValueError,)) |
| def get_results() -> List[str]: |
| ... |
| """ |
| catch_exceptions = exceptions or (Exception,) |
|
|
| def decorator(func: Callable) -> Callable: |
| @wraps(func) |
| async def async_wrapper(*args, **kwargs): |
| try: |
| return await func(*args, **kwargs) |
| except catch_exceptions as e: |
| getattr(logger, log_level.lower())( |
| f"{func.__name__} failed with {type(e).__name__}: {e}, returning fallback" |
| ) |
| return fallback_value |
|
|
| @wraps(func) |
| def sync_wrapper(*args, **kwargs): |
| try: |
| return func(*args, **kwargs) |
| except catch_exceptions as e: |
| getattr(logger, log_level.lower())( |
| f"{func.__name__} failed with {type(e).__name__}: {e}, returning fallback" |
| ) |
| return fallback_value |
|
|
| if asyncio.iscoroutinefunction(func): |
| return async_wrapper |
| return sync_wrapper |
|
|
| return decorator |
|
|
|
|
| class CircuitBreaker: |
| """ |
| Circuit breaker pattern implementation for protecting external services. |
| |
| States: |
| - CLOSED: Normal operation, requests pass through |
| - OPEN: Failures exceeded threshold, requests fail immediately |
| - HALF_OPEN: Testing if service recovered |
| |
| Example: |
| breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30) |
| |
| async with breaker: |
| result = await call_external_service() |
| """ |
|
|
| CLOSED = "CLOSED" |
| OPEN = "OPEN" |
| HALF_OPEN = "HALF_OPEN" |
|
|
| def __init__( |
| self, |
| failure_threshold: int = 5, |
| recovery_timeout: float = 30.0, |
| name: str = "default", |
| ): |
| self.failure_threshold = failure_threshold |
| self.recovery_timeout = recovery_timeout |
| self.name = name |
|
|
| self._state = self.CLOSED |
| self._failure_count = 0 |
| self._last_failure_time: Optional[float] = None |
| self._lock = asyncio.Lock() |
|
|
| @property |
| def state(self) -> str: |
| return self._state |
|
|
| async def __aenter__(self): |
| async with self._lock: |
| if self._state == self.OPEN: |
| |
| if self._last_failure_time: |
| elapsed = asyncio.get_event_loop().time() - self._last_failure_time |
| if elapsed >= self.recovery_timeout: |
| self._state = self.HALF_OPEN |
| logger.info(f"Circuit breaker '{self.name}' entering HALF_OPEN state") |
| else: |
| raise CircuitBreakerError( |
| f"Circuit breaker '{self.name}' is OPEN, retry in {self.recovery_timeout - elapsed:.1f}s" |
| ) |
| return self |
|
|
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| async with self._lock: |
| if exc_type is not None: |
| self._failure_count += 1 |
| self._last_failure_time = asyncio.get_event_loop().time() |
|
|
| if self._failure_count >= self.failure_threshold: |
| self._state = self.OPEN |
| logger.warning( |
| f"Circuit breaker '{self.name}' OPENED after {self._failure_count} failures" |
| ) |
| else: |
| |
| if self._state == self.HALF_OPEN: |
| self._state = self.CLOSED |
| self._failure_count = 0 |
| logger.info(f"Circuit breaker '{self.name}' recovered to CLOSED state") |
| elif self._state == self.CLOSED: |
| self._failure_count = 0 |
|
|
| return False |
|
|
|
|
| class CircuitBreakerError(Exception): |
| """Raised when circuit breaker is open.""" |
| pass |
|
|