File size: 3,046 Bytes
1e4fbbe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Progress Bus for SSE streaming of OCR job progress
Enables real-time progress updates without polling
"""
import asyncio
import json
import uuid
from typing import Dict, Optional


class ProgressBus:
    """Manages progress streams for async OCR jobs"""
    
    def __init__(self):
        self.queues: Dict[str, asyncio.Queue] = {}
        self.lock = asyncio.Lock()

    def new_job(self) -> str:
        """Create a new job and return its ID"""
        jid = uuid.uuid4().hex[:16]  # Short job ID
        asyncio.create_task(self._ensure_queue(jid))
        return jid

    async def _ensure_queue(self, jid: str):
        """Ensure queue exists for job"""
        async with self.lock:
            if jid not in self.queues:
                self.queues[jid] = asyncio.Queue()

    async def send(self, jid: str, **payload):
        """Send progress update for a job"""
        async with self.lock:
            q = self.queues.get(jid)
            if q:
                await q.put({"event": "progress", **payload})

    async def finalize(self, jid: str, **payload):
        """Finalize job and close stream"""
        async with self.lock:
            q = self.queues.get(jid)
            if q:
                await q.put({"event": "done", **payload})
                await q.put(None)  # Sentinel to close stream

    async def error(self, jid: str, error: str):
        """Send error for a job"""
        async with self.lock:
            q = self.queues.get(jid)
            if q:
                await q.put({"event": "error", "error": error})
                await q.put(None)  # Close stream on error

    async def stream(self, jid: str):
        """SSE stream for a job - yields SSE-formatted events"""
        # Ensure queue exists
        await self._ensure_queue(jid)
        
        q = self.queues.get(jid)
        if not q:
            yield f"data: {json.dumps({'event': 'error', 'error': 'Job not found'}, ensure_ascii=False)}\n\n"
            return
        
        try:
            while True:
                # Wait for next event with timeout
                try:
                    item = await asyncio.wait_for(q.get(), timeout=300.0)  # 5 minute timeout
                except asyncio.TimeoutError:
                    yield f"data: {json.dumps({'event': 'timeout', 'error': 'Stream timeout'}, ensure_ascii=False)}\n\n"
                    break
                
                if item is None:
                    break  # Sentinel - close stream
                
                # SSE format: data: {json}\n\n
                yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n"
        finally:
            # Cleanup queue
            async with self.lock:
                self.queues.pop(jid, None)

    def cleanup(self, jid: str):
        """Remove job from bus (after completion or timeout)"""
        async def _cleanup():
            async with self.lock:
                self.queues.pop(jid, None)
        asyncio.create_task(_cleanup())


# Global progress bus instance
bus = ProgressBus()