| """QueryExecutor — dispatches retrieval results to the appropriate executor by source_type.""" | |
| import asyncio | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src.middlewares.logging import get_logger | |
| from src.query.base import QueryResult | |
| from src.query.executors.db_executor import db_executor | |
| from src.query.executors.tabular import tabular_executor | |
| from src.rag.base import RetrievalResult | |
| logger = get_logger("query_executor") | |
| class QueryExecutor: | |
| async def execute( | |
| self, | |
| results: list[RetrievalResult], | |
| user_id: str, | |
| db: AsyncSession, | |
| question: str, | |
| limit: int = 100, | |
| ) -> list[QueryResult]: | |
| batches = await asyncio.gather( | |
| db_executor.execute(results, user_id, db, question, limit), | |
| tabular_executor.execute(results, user_id, db, question, limit), | |
| return_exceptions=True, | |
| ) | |
| query_results: list[QueryResult] = [] | |
| for batch in batches: | |
| if isinstance(batch, Exception): | |
| logger.error("executor failed", error=str(batch)) | |
| continue | |
| query_results.extend(batch) | |
| logger.info("query execution complete", total=len(query_results)) | |
| return query_results | |
| query_executor = QueryExecutor() | |