make789 commited on
Commit
1e4fbbe
·
verified ·
1 Parent(s): 20d4651

Upload progress_bus.py

Browse files
Files changed (1) hide show
  1. progress_bus.py +92 -0
progress_bus.py ADDED
@@ -0,0 +1,92 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Progress Bus for SSE streaming of OCR job progress
3
+ Enables real-time progress updates without polling
4
+ """
5
+ import asyncio
6
+ import json
7
+ import uuid
8
+ from typing import Dict, Optional
9
+
10
+
11
+ class ProgressBus:
12
+ """Manages progress streams for async OCR jobs"""
13
+
14
+ def __init__(self):
15
+ self.queues: Dict[str, asyncio.Queue] = {}
16
+ self.lock = asyncio.Lock()
17
+
18
+ def new_job(self) -> str:
19
+ """Create a new job and return its ID"""
20
+ jid = uuid.uuid4().hex[:16] # Short job ID
21
+ asyncio.create_task(self._ensure_queue(jid))
22
+ return jid
23
+
24
+ async def _ensure_queue(self, jid: str):
25
+ """Ensure queue exists for job"""
26
+ async with self.lock:
27
+ if jid not in self.queues:
28
+ self.queues[jid] = asyncio.Queue()
29
+
30
+ async def send(self, jid: str, **payload):
31
+ """Send progress update for a job"""
32
+ async with self.lock:
33
+ q = self.queues.get(jid)
34
+ if q:
35
+ await q.put({"event": "progress", **payload})
36
+
37
+ async def finalize(self, jid: str, **payload):
38
+ """Finalize job and close stream"""
39
+ async with self.lock:
40
+ q = self.queues.get(jid)
41
+ if q:
42
+ await q.put({"event": "done", **payload})
43
+ await q.put(None) # Sentinel to close stream
44
+
45
+ async def error(self, jid: str, error: str):
46
+ """Send error for a job"""
47
+ async with self.lock:
48
+ q = self.queues.get(jid)
49
+ if q:
50
+ await q.put({"event": "error", "error": error})
51
+ await q.put(None) # Close stream on error
52
+
53
+ async def stream(self, jid: str):
54
+ """SSE stream for a job - yields SSE-formatted events"""
55
+ # Ensure queue exists
56
+ await self._ensure_queue(jid)
57
+
58
+ q = self.queues.get(jid)
59
+ if not q:
60
+ yield f"data: {json.dumps({'event': 'error', 'error': 'Job not found'}, ensure_ascii=False)}\n\n"
61
+ return
62
+
63
+ try:
64
+ while True:
65
+ # Wait for next event with timeout
66
+ try:
67
+ item = await asyncio.wait_for(q.get(), timeout=300.0) # 5 minute timeout
68
+ except asyncio.TimeoutError:
69
+ yield f"data: {json.dumps({'event': 'timeout', 'error': 'Stream timeout'}, ensure_ascii=False)}\n\n"
70
+ break
71
+
72
+ if item is None:
73
+ break # Sentinel - close stream
74
+
75
+ # SSE format: data: {json}\n\n
76
+ yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n"
77
+ finally:
78
+ # Cleanup queue
79
+ async with self.lock:
80
+ self.queues.pop(jid, None)
81
+
82
+ def cleanup(self, jid: str):
83
+ """Remove job from bus (after completion or timeout)"""
84
+ async def _cleanup():
85
+ async with self.lock:
86
+ self.queues.pop(jid, None)
87
+ asyncio.create_task(_cleanup())
88
+
89
+
90
+ # Global progress bus instance
91
+ bus = ProgressBus()
92
+