""" Progress Bus for SSE streaming of OCR job progress Enables real-time progress updates without polling """ import asyncio import json import uuid from typing import Dict, Optional class ProgressBus: """Manages progress streams for async OCR jobs""" def __init__(self): self.queues: Dict[str, asyncio.Queue] = {} self.lock = asyncio.Lock() def new_job(self) -> str: """Create a new job and return its ID""" jid = uuid.uuid4().hex[:16] # Short job ID asyncio.create_task(self._ensure_queue(jid)) return jid async def _ensure_queue(self, jid: str): """Ensure queue exists for job""" async with self.lock: if jid not in self.queues: self.queues[jid] = asyncio.Queue() async def send(self, jid: str, **payload): """Send progress update for a job""" async with self.lock: q = self.queues.get(jid) if q: await q.put({"event": "progress", **payload}) async def finalize(self, jid: str, **payload): """Finalize job and close stream""" async with self.lock: q = self.queues.get(jid) if q: await q.put({"event": "done", **payload}) await q.put(None) # Sentinel to close stream async def error(self, jid: str, error: str): """Send error for a job""" async with self.lock: q = self.queues.get(jid) if q: await q.put({"event": "error", "error": error}) await q.put(None) # Close stream on error async def stream(self, jid: str): """SSE stream for a job - yields SSE-formatted events""" # Ensure queue exists await self._ensure_queue(jid) q = self.queues.get(jid) if not q: yield f"data: {json.dumps({'event': 'error', 'error': 'Job not found'}, ensure_ascii=False)}\n\n" return try: while True: # Wait for next event with timeout try: item = await asyncio.wait_for(q.get(), timeout=300.0) # 5 minute timeout except asyncio.TimeoutError: yield f"data: {json.dumps({'event': 'timeout', 'error': 'Stream timeout'}, ensure_ascii=False)}\n\n" break if item is None: break # Sentinel - close stream # SSE format: data: {json}\n\n yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n" finally: # Cleanup queue async with self.lock: self.queues.pop(jid, None) def cleanup(self, jid: str): """Remove job from bus (after completion or timeout)""" async def _cleanup(): async with self.lock: self.queues.pop(jid, None) asyncio.create_task(_cleanup()) # Global progress bus instance bus = ProgressBus()