Spaces:
Running
Running
| import os | |
| import shutil | |
| import logging | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| # Vector database fallback priority order | |
| VECTOR_DB_FALLBACK_ORDER = ["chroma", "faiss"] | |
| # Track which vector DB is currently active (for automatic fallback) | |
| _active_vector_db = {"type": "chroma", "fallback_count": 0} | |
| def get_active_vector_db() -> str: | |
| """Get the currently active vector database type.""" | |
| return _active_vector_db["type"] | |
| def set_active_vector_db(db_type: str): | |
| """Set the active vector database type.""" | |
| _active_vector_db["type"] = db_type | |
| logger.info(f"Active vector database set to: {db_type}") | |
| def get_next_fallback_db(current_db: str) -> Optional[str]: | |
| """Get the next fallback vector database in the priority order.""" | |
| try: | |
| current_idx = VECTOR_DB_FALLBACK_ORDER.index(current_db) | |
| if current_idx + 1 < len(VECTOR_DB_FALLBACK_ORDER): | |
| return VECTOR_DB_FALLBACK_ORDER[current_idx + 1] | |
| except ValueError: | |
| pass | |
| return None | |
| # Global ChromaDB client cache to avoid "different settings" error | |
| _chroma_clients = {} | |
| def reset_chroma_clients(): | |
| """Reset all cached ChromaDB clients. Call when database corruption is detected.""" | |
| global _chroma_clients | |
| _chroma_clients = {} | |
| logger.info("Reset ChromaDB client cache") | |
| def get_chroma_client(persist_directory: str): | |
| """Get or create a shared ChromaDB client for a given path. | |
| Includes automatic recovery for common ChromaDB errors: | |
| - tenant default_tenant connection errors | |
| - Database corruption | |
| - Version mismatch issues | |
| """ | |
| global _chroma_clients | |
| # Ensure directory exists | |
| os.makedirs(persist_directory, exist_ok=True) | |
| if persist_directory not in _chroma_clients: | |
| import chromadb | |
| from chromadb.config import Settings | |
| def create_client(): | |
| """Helper to create a new ChromaDB client.""" | |
| return chromadb.PersistentClient( | |
| path=persist_directory, | |
| settings=Settings( | |
| anonymized_telemetry=False, | |
| allow_reset=True | |
| ) | |
| ) | |
| def clear_and_recreate(): | |
| """Clear corrupted database and create fresh client.""" | |
| logger.warning(f"Clearing corrupted ChromaDB at {persist_directory} and recreating...") | |
| if os.path.exists(persist_directory): | |
| shutil.rmtree(persist_directory) | |
| os.makedirs(persist_directory, exist_ok=True) | |
| return create_client() | |
| def is_corruption_error(error: Exception) -> bool: | |
| """Check if error indicates database corruption.""" | |
| error_str = str(error).lower() | |
| corruption_indicators = [ | |
| 'tenant', # "Could not connect to tenant default_tenant" | |
| 'default_tenant', | |
| 'sqlite', # SQLite database issues | |
| 'database', | |
| 'corrupt', | |
| 'no such table', | |
| 'disk i/o error', | |
| 'malformed', | |
| 'locked', | |
| ] | |
| return any(indicator in error_str for indicator in corruption_indicators) | |
| try: | |
| _chroma_clients[persist_directory] = create_client() | |
| # Verify the client works by attempting a simple operation | |
| try: | |
| _chroma_clients[persist_directory].heartbeat() | |
| except Exception as verify_error: | |
| if is_corruption_error(verify_error): | |
| logger.error(f"ChromaDB verification failed: {verify_error}") | |
| del _chroma_clients[persist_directory] | |
| _chroma_clients[persist_directory] = clear_and_recreate() | |
| else: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to create ChromaDB client: {e}") | |
| if is_corruption_error(e): | |
| _chroma_clients[persist_directory] = clear_and_recreate() | |
| else: | |
| # For non-corruption errors, still try to recover | |
| try: | |
| _chroma_clients[persist_directory] = clear_and_recreate() | |
| except Exception as recovery_error: | |
| logger.error(f"Recovery also failed: {recovery_error}") | |
| raise recovery_error | |
| return _chroma_clients[persist_directory] | |