| """ |
| SPARKNET Backend API - GPU-Accelerated Document Processing |
| |
| This FastAPI service runs on a GPU server (e.g., Lytos) and provides: |
| - Document processing with PaddleOCR |
| - Layout detection |
| - RAG indexing and querying |
| - Embedding generation |
| - LLM inference via Ollama |
| |
| Deploy this on your GPU server and connect Streamlit Cloud to it. |
| """ |
|
|
| from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
| from typing import Optional, List, Dict, Any |
| import hashlib |
| import tempfile |
| import os |
| import sys |
| from pathlib import Path |
| from datetime import datetime |
| import asyncio |
|
|
| |
| PROJECT_ROOT = Path(__file__).parent.parent |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| app = FastAPI( |
| title="SPARKNET Backend API", |
| description="GPU-accelerated document processing for Technology Transfer Office automation", |
| version="1.0.0", |
| docs_url="/docs", |
| redoc_url="/redoc", |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
|
|
| class HealthResponse(BaseModel): |
| status: str |
| timestamp: str |
| version: str = "1.0.0" |
|
|
|
|
| class SystemStatus(BaseModel): |
| ollama_available: bool |
| ollama_models: List[str] = [] |
| gpu_available: bool = False |
| gpu_name: Optional[str] = None |
| rag_ready: bool = False |
| indexed_chunks: int = 0 |
| embedding_model: Optional[str] = None |
| llm_model: Optional[str] = None |
|
|
|
|
| class ProcessRequest(BaseModel): |
| filename: str |
| options: Dict[str, Any] = Field(default_factory=dict) |
|
|
|
|
| class ProcessResponse(BaseModel): |
| success: bool |
| doc_id: str |
| filename: str |
| raw_text: str = "" |
| chunks: List[Dict[str, Any]] = [] |
| page_count: int = 0 |
| ocr_regions: List[Dict[str, Any]] = [] |
| layout_regions: List[Dict[str, Any]] = [] |
| ocr_confidence: float = 0.0 |
| layout_confidence: float = 0.0 |
| processing_time: float = 0.0 |
| error: Optional[str] = None |
|
|
|
|
| class IndexRequest(BaseModel): |
| doc_id: str |
| text: str |
| chunks: List[Dict[str, Any]] = [] |
| metadata: Dict[str, Any] = Field(default_factory=dict) |
|
|
|
|
| class IndexResponse(BaseModel): |
| success: bool |
| doc_id: str |
| num_chunks: int = 0 |
| error: Optional[str] = None |
|
|
|
|
| class QueryRequest(BaseModel): |
| question: str |
| filters: Optional[Dict[str, Any]] = None |
| top_k: int = 5 |
|
|
|
|
| class QueryResponse(BaseModel): |
| success: bool |
| answer: str = "" |
| sources: List[Dict[str, Any]] = [] |
| confidence: float = 0.0 |
| latency_ms: float = 0.0 |
| validated: bool = False |
| error: Optional[str] = None |
|
|
|
|
| class SearchRequest(BaseModel): |
| query: str |
| top_k: int = 5 |
| doc_filter: Optional[str] = None |
|
|
|
|
| class DocumentInfo(BaseModel): |
| doc_id: str |
| filename: str = "" |
| chunk_count: int = 0 |
| indexed_at: Optional[str] = None |
|
|
|
|
| |
| |
| |
|
|
| _rag_system = None |
| _processing_queue = {} |
|
|
|
|
| def get_rag_system(): |
| """Initialize and return the RAG system.""" |
| global _rag_system |
|
|
| if _rag_system is not None: |
| return _rag_system |
|
|
| try: |
| from src.rag.agentic import AgenticRAG, RAGConfig |
| from src.rag.store import get_vector_store, VectorStoreConfig, reset_vector_store |
| from src.rag.embeddings import get_embedding_adapter, EmbeddingConfig, reset_embedding_adapter |
|
|
| |
| ollama_ok, models = check_ollama_sync() |
| if not ollama_ok: |
| return None |
|
|
| |
| EMBEDDING_MODELS = ["nomic-embed-text", "mxbai-embed-large:latest", "mxbai-embed-large"] |
| LLM_MODELS = ["llama3.2:latest", "llama3.1:8b", "mistral:latest", "qwen2.5:14b"] |
|
|
| embed_model = next((m for m in EMBEDDING_MODELS if m in models), EMBEDDING_MODELS[0]) |
| llm_model = next((m for m in LLM_MODELS if m in models), LLM_MODELS[0]) |
|
|
| |
| reset_vector_store() |
| reset_embedding_adapter() |
|
|
| |
| embed_config = EmbeddingConfig( |
| ollama_model=embed_model, |
| ollama_base_url="http://localhost:11434", |
| ) |
| embedder = get_embedding_adapter(config=embed_config) |
|
|
| |
| store_config = VectorStoreConfig( |
| persist_directory="data/sparknet_unified_rag", |
| collection_name="sparknet_documents", |
| similarity_threshold=0.0, |
| ) |
| store = get_vector_store(config=store_config) |
|
|
| |
| rag_config = RAGConfig( |
| model=llm_model, |
| base_url="http://localhost:11434", |
| max_revision_attempts=1, |
| enable_query_planning=True, |
| enable_reranking=True, |
| enable_validation=True, |
| retrieval_top_k=10, |
| final_top_k=5, |
| min_confidence=0.3, |
| verbose=False, |
| ) |
|
|
| |
| rag = AgenticRAG( |
| config=rag_config, |
| vector_store=store, |
| embedding_adapter=embedder, |
| ) |
|
|
| _rag_system = { |
| "rag": rag, |
| "store": store, |
| "embedder": embedder, |
| "embed_model": embed_model, |
| "llm_model": llm_model, |
| } |
|
|
| return _rag_system |
|
|
| except Exception as e: |
| print(f"RAG init error: {e}") |
| return None |
|
|
|
|
| def check_ollama_sync(): |
| """Check Ollama availability synchronously.""" |
| try: |
| import httpx |
| with httpx.Client(timeout=3.0) as client: |
| resp = client.get("http://localhost:11434/api/tags") |
| if resp.status_code == 200: |
| models = [m["name"] for m in resp.json().get("models", [])] |
| return True, models |
| except: |
| pass |
| return False, [] |
|
|
|
|
| def check_gpu(): |
| """Check GPU availability.""" |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| return True, torch.cuda.get_device_name(0) |
| except: |
| pass |
| return False, None |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/", response_model=HealthResponse) |
| async def root(): |
| """Health check endpoint.""" |
| return HealthResponse( |
| status="healthy", |
| timestamp=datetime.now().isoformat(), |
| ) |
|
|
|
|
| @app.get("/api/health", response_model=HealthResponse) |
| async def health(): |
| """Health check endpoint.""" |
| return HealthResponse( |
| status="healthy", |
| timestamp=datetime.now().isoformat(), |
| ) |
|
|
|
|
| @app.get("/api/status", response_model=SystemStatus) |
| async def get_status(): |
| """Get system status including Ollama, GPU, and RAG availability.""" |
| ollama_ok, models = check_ollama_sync() |
| gpu_ok, gpu_name = check_gpu() |
|
|
| rag = get_rag_system() |
| rag_ready = rag is not None |
|
|
| indexed_chunks = 0 |
| embed_model = None |
| llm_model = None |
|
|
| if rag: |
| try: |
| indexed_chunks = rag["store"].count() |
| embed_model = rag.get("embed_model") |
| llm_model = rag.get("llm_model") |
| except: |
| pass |
|
|
| return SystemStatus( |
| ollama_available=ollama_ok, |
| ollama_models=models, |
| gpu_available=gpu_ok, |
| gpu_name=gpu_name, |
| rag_ready=rag_ready, |
| indexed_chunks=indexed_chunks, |
| embedding_model=embed_model, |
| llm_model=llm_model, |
| ) |
|
|
|
|
| @app.post("/api/process", response_model=ProcessResponse) |
| async def process_document( |
| file: UploadFile = File(...), |
| ocr_engine: str = Form(default="paddleocr"), |
| max_pages: int = Form(default=10), |
| enable_layout: bool = Form(default=True), |
| preserve_tables: bool = Form(default=True), |
| ): |
| """ |
| Process a document with OCR and layout detection. |
| |
| This endpoint uses GPU-accelerated PaddleOCR for text extraction. |
| """ |
| import time |
| start_time = time.time() |
|
|
| |
| file_bytes = await file.read() |
| filename = file.filename |
|
|
| |
| content_hash = hashlib.md5(file_bytes[:1000]).hexdigest()[:8] |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| doc_id = hashlib.md5(f"{filename}_{timestamp}_{content_hash}".encode()).hexdigest()[:12] |
|
|
| |
| suffix = Path(filename).suffix |
| with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: |
| tmp.write(file_bytes) |
| tmp_path = tmp.name |
|
|
| try: |
| |
| try: |
| from src.document.pipeline.processor import DocumentProcessor, PipelineConfig |
| from src.document.ocr import OCRConfig |
| from src.document.layout import LayoutConfig |
| from src.document.chunking.chunker import ChunkerConfig |
|
|
| chunker_config = ChunkerConfig( |
| preserve_table_structure=preserve_tables, |
| detect_table_headers=True, |
| chunk_tables=True, |
| chunk_figures=True, |
| include_captions=True, |
| ) |
|
|
| layout_config = LayoutConfig( |
| method="rule_based", |
| detect_tables=True, |
| detect_figures=True, |
| detect_headers=True, |
| detect_titles=True, |
| detect_lists=True, |
| min_confidence=0.3, |
| heading_font_ratio=1.1, |
| ) |
|
|
| config = PipelineConfig( |
| ocr=OCRConfig(engine=ocr_engine), |
| layout=layout_config, |
| chunking=chunker_config, |
| max_pages=max_pages, |
| include_ocr_regions=True, |
| include_layout_regions=enable_layout, |
| generate_full_text=True, |
| ) |
|
|
| processor = DocumentProcessor(config) |
| processor.initialize() |
| result = processor.process(tmp_path) |
|
|
| |
| chunks_list = [] |
| for chunk in result.chunks: |
| chunks_list.append({ |
| "chunk_id": chunk.chunk_id, |
| "text": chunk.text, |
| "page": chunk.page, |
| "chunk_type": chunk.chunk_type.value, |
| "confidence": chunk.confidence, |
| "bbox": chunk.bbox.to_xyxy() if chunk.bbox else None, |
| }) |
|
|
| ocr_regions = [] |
| for region in result.ocr_regions: |
| ocr_regions.append({ |
| "text": region.text, |
| "confidence": region.confidence, |
| "page": region.page, |
| "bbox": region.bbox.to_xyxy() if region.bbox else None, |
| }) |
|
|
| layout_regions = [] |
| for region in result.layout_regions: |
| layout_regions.append({ |
| "id": region.id, |
| "type": region.type.value, |
| "confidence": region.confidence, |
| "page": region.page, |
| "bbox": region.bbox.to_xyxy() if region.bbox else None, |
| }) |
|
|
| processing_time = time.time() - start_time |
|
|
| return ProcessResponse( |
| success=True, |
| doc_id=doc_id, |
| filename=filename, |
| raw_text=result.full_text, |
| chunks=chunks_list, |
| page_count=result.metadata.num_pages, |
| ocr_regions=ocr_regions, |
| layout_regions=layout_regions, |
| ocr_confidence=result.metadata.ocr_confidence_avg or 0.0, |
| layout_confidence=result.metadata.layout_confidence_avg or 0.0, |
| processing_time=processing_time, |
| ) |
|
|
| except Exception as e: |
| |
| return await process_document_fallback(file_bytes, filename, doc_id, max_pages, str(e), start_time) |
|
|
| finally: |
| |
| if os.path.exists(tmp_path): |
| os.unlink(tmp_path) |
|
|
|
|
| async def process_document_fallback( |
| file_bytes: bytes, |
| filename: str, |
| doc_id: str, |
| max_pages: int, |
| reason: str, |
| start_time: float |
| ) -> ProcessResponse: |
| """Fallback document processing using PyMuPDF.""" |
| import time |
|
|
| text = "" |
| page_count = 1 |
| suffix = Path(filename).suffix.lower() |
|
|
| if suffix == ".pdf": |
| try: |
| import fitz |
| import io |
| pdf_stream = io.BytesIO(file_bytes) |
| doc = fitz.open(stream=pdf_stream, filetype="pdf") |
| page_count = len(doc) |
| max_p = min(max_pages, page_count) |
|
|
| text_parts = [] |
| for page_num in range(max_p): |
| page = doc[page_num] |
| text_parts.append(f"--- Page {page_num + 1} ---\n{page.get_text()}") |
| text = "\n\n".join(text_parts) |
| doc.close() |
| except Exception as e: |
| text = f"PDF extraction failed: {e}" |
| elif suffix in [".txt", ".md"]: |
| try: |
| text = file_bytes.decode("utf-8") |
| except: |
| text = file_bytes.decode("latin-1", errors="ignore") |
| else: |
| text = f"Unsupported file type: {suffix}" |
|
|
| |
| chunk_size = 500 |
| overlap = 50 |
| chunks = [] |
|
|
| for i in range(0, len(text), chunk_size - overlap): |
| chunk_text = text[i:i + chunk_size] |
| if len(chunk_text.strip()) > 20: |
| chunks.append({ |
| "chunk_id": f"{doc_id}_chunk_{len(chunks)}", |
| "text": chunk_text, |
| "page": 0, |
| "chunk_type": "text", |
| "confidence": 0.9, |
| "bbox": None, |
| }) |
|
|
| processing_time = time.time() - start_time |
|
|
| return ProcessResponse( |
| success=True, |
| doc_id=doc_id, |
| filename=filename, |
| raw_text=text, |
| chunks=chunks, |
| page_count=page_count, |
| ocr_regions=[], |
| layout_regions=[], |
| ocr_confidence=0.9, |
| layout_confidence=0.0, |
| processing_time=processing_time, |
| error=f"Fallback mode: {reason}", |
| ) |
|
|
|
|
| @app.post("/api/index", response_model=IndexResponse) |
| async def index_document(request: IndexRequest): |
| """Index a document into the RAG vector store.""" |
| rag = get_rag_system() |
|
|
| if not rag: |
| return IndexResponse( |
| success=False, |
| doc_id=request.doc_id, |
| error="RAG system not available. Check Ollama status.", |
| ) |
|
|
| try: |
| store = rag["store"] |
| embedder = rag["embedder"] |
|
|
| chunk_dicts = [] |
| embeddings = [] |
|
|
| for i, chunk in enumerate(request.chunks): |
| chunk_text = chunk.get("text", "") if isinstance(chunk, dict) else str(chunk) |
|
|
| if len(chunk_text.strip()) < 20: |
| continue |
|
|
| chunk_id = chunk.get("chunk_id", f"{request.doc_id}_chunk_{i}") |
| chunk_dict = { |
| "chunk_id": chunk_id, |
| "document_id": request.doc_id, |
| "text": chunk_text, |
| "page": chunk.get("page", 0) if isinstance(chunk, dict) else 0, |
| "chunk_type": "text", |
| "source_path": request.metadata.get("filename", ""), |
| "sequence_index": i, |
| } |
| chunk_dicts.append(chunk_dict) |
|
|
| embedding = embedder.embed_text(chunk_text) |
| embeddings.append(embedding) |
|
|
| if not chunk_dicts: |
| return IndexResponse( |
| success=False, |
| doc_id=request.doc_id, |
| error="No valid chunks to index", |
| ) |
|
|
| store.add_chunks(chunk_dicts, embeddings) |
|
|
| return IndexResponse( |
| success=True, |
| doc_id=request.doc_id, |
| num_chunks=len(chunk_dicts), |
| ) |
|
|
| except Exception as e: |
| return IndexResponse( |
| success=False, |
| doc_id=request.doc_id, |
| error=str(e), |
| ) |
|
|
|
|
| @app.post("/api/query", response_model=QueryResponse) |
| async def query_rag(request: QueryRequest): |
| """Query the RAG system.""" |
| import time |
| start_time = time.time() |
|
|
| rag = get_rag_system() |
|
|
| if not rag: |
| return QueryResponse( |
| success=False, |
| error="RAG system not available. Check Ollama status.", |
| ) |
|
|
| try: |
| response = rag["rag"].query(request.question, filters=request.filters) |
| latency_ms = (time.time() - start_time) * 1000 |
|
|
| sources = [] |
| if hasattr(response, 'citations') and response.citations: |
| for cite in response.citations: |
| sources.append({ |
| "index": cite.index if hasattr(cite, 'index') else 0, |
| "text_snippet": cite.text_snippet if hasattr(cite, 'text_snippet') else str(cite), |
| "relevance_score": cite.relevance_score if hasattr(cite, 'relevance_score') else 0.0, |
| "document_id": cite.document_id if hasattr(cite, 'document_id') else "", |
| "page": cite.page if hasattr(cite, 'page') else 0, |
| }) |
|
|
| return QueryResponse( |
| success=True, |
| answer=response.answer, |
| sources=sources, |
| confidence=response.confidence, |
| latency_ms=latency_ms, |
| validated=response.validated, |
| ) |
|
|
| except Exception as e: |
| return QueryResponse( |
| success=False, |
| error=str(e), |
| ) |
|
|
|
|
| @app.post("/api/search") |
| async def search_similar(request: SearchRequest): |
| """Search for similar chunks.""" |
| rag = get_rag_system() |
|
|
| if not rag: |
| return {"success": False, "error": "RAG system not available", "results": []} |
|
|
| try: |
| embedder = rag["embedder"] |
| store = rag["store"] |
|
|
| query_embedding = embedder.embed_text(request.query) |
|
|
| filters = None |
| if request.doc_filter: |
| filters = {"document_id": request.doc_filter} |
|
|
| results = store.search( |
| query_embedding=query_embedding, |
| top_k=request.top_k, |
| filters=filters, |
| ) |
|
|
| return { |
| "success": True, |
| "results": [ |
| { |
| "chunk_id": r.chunk_id, |
| "document_id": r.document_id, |
| "text": r.text, |
| "similarity": r.similarity, |
| "page": r.page, |
| "metadata": r.metadata, |
| } |
| for r in results |
| ] |
| } |
|
|
| except Exception as e: |
| return {"success": False, "error": str(e), "results": []} |
|
|
|
|
| @app.get("/api/documents", response_model=List[DocumentInfo]) |
| async def list_documents(): |
| """List all indexed documents.""" |
| rag = get_rag_system() |
|
|
| if not rag: |
| return [] |
|
|
| try: |
| store = rag["store"] |
| collection = store._collection |
|
|
| results = collection.get(include=["metadatas"]) |
| if not results or not results.get("metadatas"): |
| return [] |
|
|
| doc_info = {} |
| for meta in results["metadatas"]: |
| doc_id = meta.get("document_id", "unknown") |
| if doc_id not in doc_info: |
| doc_info[doc_id] = { |
| "doc_id": doc_id, |
| "filename": meta.get("source_path", ""), |
| "chunk_count": 0, |
| } |
| doc_info[doc_id]["chunk_count"] += 1 |
|
|
| return [DocumentInfo(**info) for info in doc_info.values()] |
|
|
| except Exception as e: |
| return [] |
|
|
|
|
| @app.delete("/api/documents/{doc_id}") |
| async def delete_document(doc_id: str): |
| """Delete a document from the index.""" |
| rag = get_rag_system() |
|
|
| if not rag: |
| return {"success": False, "error": "RAG system not available"} |
|
|
| try: |
| store = rag["store"] |
| collection = store._collection |
|
|
| |
| results = collection.get( |
| where={"document_id": doc_id}, |
| include=[] |
| ) |
|
|
| if results and results.get("ids"): |
| collection.delete(ids=results["ids"]) |
| return {"success": True, "deleted_chunks": len(results["ids"])} |
|
|
| return {"success": False, "error": "Document not found"} |
|
|
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|