make789 commited on
Commit
20d4651
·
verified ·
1 Parent(s): 855f6ac

Upload ocr_service.py

Browse files
Files changed (1) hide show
  1. ocr_service.py +393 -7
ocr_service.py CHANGED
@@ -11,8 +11,9 @@ from time import monotonic
11
  from typing import Any, Deque, DefaultDict, Optional
12
 
13
  import numpy as np
14
- from fastapi import Depends, FastAPI, Form, HTTPException, Request, UploadFile, status
15
  from fastapi.middleware.cors import CORSMiddleware
 
16
  from fastapi.security import APIKeyHeader
17
  from PIL import Image
18
 
@@ -127,6 +128,11 @@ _ocr_model = None
127
  _ocr_tokenizer = None
128
  _model_lock = asyncio.Lock()
129
 
 
 
 
 
 
130
 
131
  def _download_and_patch_model_locally(model_id: str, revision: str) -> str:
132
  """
@@ -274,16 +280,37 @@ async def get_ocr_model():
274
  async def run_deepseek_ocr(
275
  image_path: str,
276
  prompt: str = "<image>\n<|grounding|>Convert the document to markdown with preserved layout.",
277
- use_grounding: bool = True
 
 
 
278
  ) -> dict:
279
  """
280
  Run DeepSeek-OCR on an image file with advanced grounding support.
 
 
 
 
 
 
 
281
  """
 
 
 
 
 
 
 
282
  model, tokenizer = await get_ocr_model()
283
 
284
  output_path = tempfile.mkdtemp()
285
 
286
  try:
 
 
 
 
287
  # OCR quality settings - Gundam preset recommended for CPU/Spaces
288
  torch = _get_torch()
289
  if USE_GPU and torch.cuda.is_available():
@@ -296,9 +323,23 @@ async def run_deepseek_ocr(
296
  actual_image_size = 640
297
  print(f" - Using CPU-optimized quality: base_size={actual_base_size}, image_size={actual_image_size}")
298
 
 
 
 
 
 
 
 
 
 
 
 
299
  # Use torch.inference_mode() to reduce overhead on CPU
 
300
  torch = _get_torch()
301
  with torch.inference_mode():
 
 
302
  result = model.infer(
303
  tokenizer,
304
  prompt=prompt,
@@ -311,6 +352,17 @@ async def run_deepseek_ocr(
311
  test_compress=False,
312
  )
313
 
 
 
 
 
 
 
 
 
 
 
 
314
  # Parse result - DeepSeek-OCR returns structured markdown output
315
  raw_text = result if isinstance(result, str) else str(result)
316
 
@@ -318,12 +370,69 @@ async def run_deepseek_ocr(
318
  # This parses grounding annotations to get bounding boxes
319
  lines = _parse_deepseek_output(raw_text)
320
 
 
 
 
 
321
  # Convert to clean markdown (remove tags, keep text)
322
  clean_markdown = _deepseek_to_markdown(raw_text)
323
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
324
  return {
325
  "text": clean_markdown, # Return clean markdown without tags
326
  "lines": lines, # Structured lines with bounding boxes
 
327
  }
328
  except Exception as e:
329
  print(f"DeepSeek-OCR error: {e}")
@@ -343,6 +452,38 @@ async def run_deepseek_ocr(
343
  pass
344
 
345
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
346
  def _deepseek_to_markdown(s: str) -> str:
347
  """
348
  Convert DeepSeek-OCR tagged output to clean Markdown.
@@ -750,10 +891,45 @@ async def predict_options():
750
  @app.post("/api/predict") # HuggingFace Spaces may auto-route POST requests here
751
  async def ocr_page(
752
  file: UploadFile,
 
 
753
  _: None = Depends(enforce_rate_limit),
754
  ):
755
- """OCR endpoint using DeepSeek-OCR"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
756
  img, img_path = await load_img(file)
 
757
  try:
758
  # Save PIL image to temporary file for DeepSeek-OCR
759
  with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file:
@@ -761,18 +937,69 @@ async def ocr_page(
761
  tmp_img_path = tmp_file.name
762
 
763
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
764
  # Use grounding prompt for better structure extraction
765
  result = await run_deepseek_ocr(
766
  tmp_img_path,
767
  prompt="<image>\n<|grounding|>Convert the document to markdown with preserved layout.",
768
- use_grounding=True
 
 
769
  )
770
- return result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
771
  except Exception as e:
772
- # Log the error but don't crash - return a helpful error message
773
  error_msg = str(e)
774
  print(f"OCR processing error: {error_msg}")
775
 
 
 
 
 
 
 
776
  # Check if it's a model loading issue
777
  if "matplotlib" in error_msg or "torchvision" in error_msg or "ImportError" in error_msg:
778
  raise HTTPException(
@@ -797,6 +1024,165 @@ async def ocr_page(
797
  os.unlink(img_path)
798
 
799
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
800
  @app.post("/split")
801
  async def split(
802
  file: UploadFile,
@@ -829,7 +1215,7 @@ async def split(
829
  try:
830
  # Use DeepSeek-OCR with grounding prompt for better structured extraction
831
  prompt = "<image>\n<|grounding|>Convert the document region to markdown with preserved layout."
832
- ocr_result = await run_deepseek_ocr(crop_path, prompt=prompt, use_grounding=True)
833
 
834
  # Parse OCR result to extract lines
835
  child_lines = ocr_result.get("lines", [])
 
11
  from typing import Any, Deque, DefaultDict, Optional
12
 
13
  import numpy as np
14
+ from fastapi import Depends, FastAPI, Form, HTTPException, Request, UploadFile, status, BackgroundTasks
15
  from fastapi.middleware.cors import CORSMiddleware
16
+ from fastapi.responses import StreamingResponse
17
  from fastapi.security import APIKeyHeader
18
  from PIL import Image
19
 
 
128
  _ocr_tokenizer = None
129
  _model_lock = asyncio.Lock()
130
 
131
+ # Job management for async processing and cancellation
132
+ _jobs: dict[str, dict] = {} # job_id -> {status, progress, result, error, cancelled}
133
+ _jobs_lock = asyncio.Lock()
134
+ _cancellation_tokens: dict[str, asyncio.Event] = {} # job_id -> cancellation event
135
+
136
 
137
  def _download_and_patch_model_locally(model_id: str, revision: str) -> str:
138
  """
 
280
  async def run_deepseek_ocr(
281
  image_path: str,
282
  prompt: str = "<image>\n<|grounding|>Convert the document to markdown with preserved layout.",
283
+ use_grounding: bool = True,
284
+ job_id: Optional[str] = None,
285
+ progress_callback = None,
286
+ detect_fields: bool = True
287
  ) -> dict:
288
  """
289
  Run DeepSeek-OCR on an image file with advanced grounding support.
290
+ Supports cancellation via job_id and progress updates via callback.
291
+
292
+ If detect_fields=True, also runs locator queries to detect specific fields:
293
+ - Recipe title
294
+ - Ingredients list
295
+ - Instructions/steps
296
+ Returns additional 'field_boxes' with highlighted locations.
297
  """
298
+ # Check for cancellation before starting
299
+ if job_id:
300
+ async with _jobs_lock:
301
+ cancel_event = _cancellation_tokens.get(job_id)
302
+ if cancel_event and cancel_event.is_set():
303
+ raise asyncio.CancelledError(f"Job {job_id} was cancelled")
304
+
305
  model, tokenizer = await get_ocr_model()
306
 
307
  output_path = tempfile.mkdtemp()
308
 
309
  try:
310
+ # Update progress: Preprocessing (0-10%)
311
+ if progress_callback:
312
+ await progress_callback(0.05, "Preprocessing image...")
313
+
314
  # OCR quality settings - Gundam preset recommended for CPU/Spaces
315
  torch = _get_torch()
316
  if USE_GPU and torch.cuda.is_available():
 
323
  actual_image_size = 640
324
  print(f" - Using CPU-optimized quality: base_size={actual_base_size}, image_size={actual_image_size}")
325
 
326
+ # Check for cancellation before inference
327
+ if job_id:
328
+ async with _jobs_lock:
329
+ cancel_event = _cancellation_tokens.get(job_id)
330
+ if cancel_event and cancel_event.is_set():
331
+ raise asyncio.CancelledError(f"Job {job_id} was cancelled")
332
+
333
+ # Update progress: Starting inference (10-90%)
334
+ if progress_callback:
335
+ await progress_callback(0.10, "Starting OCR inference...")
336
+
337
  # Use torch.inference_mode() to reduce overhead on CPU
338
+ # Note: We can't interrupt inference mid-process, but we can check before/after
339
  torch = _get_torch()
340
  with torch.inference_mode():
341
+ # Estimate inference takes ~80% of time (10-90%)
342
+ # We'll update progress during post-processing
343
  result = model.infer(
344
  tokenizer,
345
  prompt=prompt,
 
352
  test_compress=False,
353
  )
354
 
355
+ # Check for cancellation after inference
356
+ if job_id:
357
+ async with _jobs_lock:
358
+ cancel_event = _cancellation_tokens.get(job_id)
359
+ if cancel_event and cancel_event.is_set():
360
+ raise asyncio.CancelledError(f"Job {job_id} was cancelled")
361
+
362
+ # Update progress: Post-processing (90-95%)
363
+ if progress_callback:
364
+ await progress_callback(0.90, "Parsing OCR results...")
365
+
366
  # Parse result - DeepSeek-OCR returns structured markdown output
367
  raw_text = result if isinstance(result, str) else str(result)
368
 
 
370
  # This parses grounding annotations to get bounding boxes
371
  lines = _parse_deepseek_output(raw_text)
372
 
373
+ # Update progress: Cleaning output (95-98%)
374
+ if progress_callback:
375
+ await progress_callback(0.95, "Cleaning output...")
376
+
377
  # Convert to clean markdown (remove tags, keep text)
378
  clean_markdown = _deepseek_to_markdown(raw_text)
379
 
380
+ # Detect specific fields using locator pattern if requested
381
+ field_boxes = {}
382
+ if detect_fields:
383
+ if progress_callback:
384
+ await progress_callback(0.96, "Detecting recipe fields...")
385
+
386
+ # Define field detection prompts using locator pattern
387
+ field_prompts = {
388
+ "title": "<image>\nLocate <|ref|>Recipe title<|/ref|> in the image.",
389
+ "ingredients": "<image>\nLocate <|ref|>Ingredients list<|/ref|> in the image.",
390
+ "instructions": "<image>\nLocate <|ref|>Instructions or steps<|/ref|> in the image.",
391
+ "quantity": "<image>\nLocate <|ref|>Total amount or servings<|/ref|> in the image.",
392
+ "cooking_time": "<image>\nLocate <|ref|>Cooking time or prep time<|/ref|> in the image.",
393
+ }
394
+
395
+ torch = _get_torch()
396
+ for field_name, locator_prompt in field_prompts.items():
397
+ try:
398
+ # Check for cancellation
399
+ if job_id:
400
+ async with _jobs_lock:
401
+ cancel_event = _cancellation_tokens.get(job_id)
402
+ if cancel_event and cancel_event.is_set():
403
+ break
404
+
405
+ # Run locator query for this field
406
+ with torch.inference_mode():
407
+ locator_result = model.infer(
408
+ tokenizer,
409
+ prompt=locator_prompt,
410
+ image_file=image_path,
411
+ output_path=output_path,
412
+ base_size=actual_base_size,
413
+ image_size=actual_image_size,
414
+ crop_mode=CROP_MODE,
415
+ save_results=False,
416
+ test_compress=False,
417
+ )
418
+
419
+ # Parse locator boxes from result
420
+ locator_text = locator_result if isinstance(locator_result, str) else str(locator_result)
421
+ locator_boxes = _parse_locator_boxes(locator_text, field_name)
422
+ if locator_boxes:
423
+ field_boxes[field_name] = locator_boxes
424
+ except Exception as e:
425
+ print(f" ⚠️ Field detection for {field_name} failed: {e}")
426
+ continue # Continue with other fields
427
+
428
+ # Update progress: Done (100%)
429
+ if progress_callback:
430
+ await progress_callback(1.0, "Complete")
431
+
432
  return {
433
  "text": clean_markdown, # Return clean markdown without tags
434
  "lines": lines, # Structured lines with bounding boxes
435
+ "field_boxes": field_boxes if detect_fields else {}, # Field-specific highlight boxes
436
  }
437
  except Exception as e:
438
  print(f"DeepSeek-OCR error: {e}")
 
452
  pass
453
 
454
 
455
+ def _parse_locator_boxes(locator_text: str, field_name: str) -> list:
456
+ """
457
+ Parse bounding boxes from locator pattern output.
458
+ Locator returns: <|ref|>FIELD_NAME<|/ref|><|det|>[x1,y1,x2,y2]<|/det|>
459
+ """
460
+ import re
461
+
462
+ boxes = []
463
+
464
+ # Pattern: <|ref|>FIELD<|/ref|><|det|>[x1,y1,x2,y2]<|/det|>
465
+ # Note: Locator uses [x1,y1,x2,y2] format (not [x,y,w,h])
466
+ locator_pattern = re.compile(
467
+ r'<\|ref\|>[^<]*<\|\/ref\|><\|det\|>\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]<\|\/det\|>',
468
+ re.DOTALL
469
+ )
470
+
471
+ for match in locator_pattern.finditer(locator_text):
472
+ x1 = int(match.group(1))
473
+ y1 = int(match.group(2))
474
+ x2 = int(match.group(3))
475
+ y2 = int(match.group(4))
476
+
477
+ # Convert to [x0, y0, x1, y1] format (top-left to bottom-right)
478
+ boxes.append({
479
+ "bbox": [x1, y1, x2, y2],
480
+ "field": field_name,
481
+ "confidence": 0.95
482
+ })
483
+
484
+ return boxes
485
+
486
+
487
  def _deepseek_to_markdown(s: str) -> str:
488
  """
489
  Convert DeepSeek-OCR tagged output to clean Markdown.
 
891
  @app.post("/api/predict") # HuggingFace Spaces may auto-route POST requests here
892
  async def ocr_page(
893
  file: UploadFile,
894
+ job_id: Optional[str] = Form(None),
895
+ background_tasks: BackgroundTasks = None,
896
  _: None = Depends(enforce_rate_limit),
897
  ):
898
+ """OCR endpoint using DeepSeek-OCR - supports async job processing with SSE streaming"""
899
+ # Import progress bus
900
+ try:
901
+ from progress_bus import bus
902
+ except ImportError:
903
+ # Fallback if progress_bus not available
904
+ bus = None
905
+
906
+ # Generate job_id if not provided
907
+ if not job_id:
908
+ if bus:
909
+ job_id = bus.new_job()
910
+ else:
911
+ job_id = secrets.token_urlsafe(16)
912
+
913
+ # Initialize job status (for polling compatibility)
914
+ async with _jobs_lock:
915
+ _jobs[job_id] = {
916
+ "status": "processing",
917
+ "progress": 0.0,
918
+ "message": "Initializing...",
919
+ "result": None,
920
+ "error": None
921
+ }
922
+ _cancellation_tokens[job_id] = asyncio.Event()
923
+
924
+ # Start background task for async processing
925
+ if background_tasks and bus:
926
+ # Async mode: return job_id immediately, process in background
927
+ background_tasks.add_task(run_ocr_job_async, job_id, file, bus)
928
+ return {"job_id": job_id, "status": "processing", "message": "Job started - use /progress/{job_id} for SSE or /jobs/{job_id}/status for polling"}
929
+
930
+ # Synchronous mode: process immediately
931
  img, img_path = await load_img(file)
932
+
933
  try:
934
  # Save PIL image to temporary file for DeepSeek-OCR
935
  with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file:
 
937
  tmp_img_path = tmp_file.name
938
 
939
  try:
940
+ # Progress callback to update job status (async-safe)
941
+ async def update_progress(progress: float, message: str):
942
+ async with _jobs_lock:
943
+ if job_id in _jobs:
944
+ _jobs[job_id]["progress"] = progress
945
+ _jobs[job_id]["message"] = message
946
+
947
+ # Also send to SSE bus if available
948
+ if bus:
949
+ await bus.send(job_id, pct=progress * 100, stage=message.lower().replace(" ", "_"))
950
+
951
+ # Start OCR processing (can be cancelled)
952
+ await update_progress(0.0, "Starting OCR...")
953
+
954
+ # Check for cancellation before processing
955
+ cancel_event = _cancellation_tokens.get(job_id)
956
+ if cancel_event and cancel_event.is_set():
957
+ async with _jobs_lock:
958
+ _jobs[job_id]["status"] = "cancelled"
959
+ _jobs[job_id]["message"] = "Job was cancelled"
960
+ raise HTTPException(status_code=499, detail="Job was cancelled")
961
+
962
  # Use grounding prompt for better structure extraction
963
  result = await run_deepseek_ocr(
964
  tmp_img_path,
965
  prompt="<image>\n<|grounding|>Convert the document to markdown with preserved layout.",
966
+ use_grounding=True,
967
+ job_id=job_id,
968
+ progress_callback=update_progress
969
  )
970
+
971
+ # Update job with result
972
+ async with _jobs_lock:
973
+ if job_id in _jobs:
974
+ _jobs[job_id]["status"] = "completed"
975
+ _jobs[job_id]["progress"] = 1.0
976
+ _jobs[job_id]["result"] = result
977
+ _jobs[job_id]["message"] = "Complete"
978
+
979
+ # Finalize SSE stream if available
980
+ if bus:
981
+ await bus.finalize(job_id, pct=100, stage="done", **result)
982
+
983
+ return {"job_id": job_id, **result}
984
+ except asyncio.CancelledError as e:
985
+ # Job was cancelled
986
+ async with _jobs_lock:
987
+ if job_id in _jobs:
988
+ _jobs[job_id]["status"] = "cancelled"
989
+ _jobs[job_id]["message"] = "Job was cancelled"
990
+ _cancellation_tokens.pop(job_id, None)
991
+ raise HTTPException(status_code=499, detail="Job was cancelled")
992
  except Exception as e:
993
+ # Log the error and update job status
994
  error_msg = str(e)
995
  print(f"OCR processing error: {error_msg}")
996
 
997
+ async with _jobs_lock:
998
+ if job_id in _jobs:
999
+ _jobs[job_id]["status"] = "failed"
1000
+ _jobs[job_id]["error"] = error_msg
1001
+ _jobs[job_id]["message"] = f"Error: {error_msg}"
1002
+
1003
  # Check if it's a model loading issue
1004
  if "matplotlib" in error_msg or "torchvision" in error_msg or "ImportError" in error_msg:
1005
  raise HTTPException(
 
1024
  os.unlink(img_path)
1025
 
1026
 
1027
+ async def run_ocr_job_async(job_id: str, file: UploadFile, bus):
1028
+ """Background task to run OCR job with SSE updates"""
1029
+ img_path = None
1030
+ tmp_img_path = None
1031
+
1032
+ try:
1033
+ # Update progress: Decode (0-5%)
1034
+ await bus.send(job_id, pct=1, stage="queued")
1035
+
1036
+ img, img_path = await load_img(file)
1037
+ await bus.send(job_id, pct=5, stage="decode")
1038
+
1039
+ # Save PIL image to temporary file for DeepSeek-OCR
1040
+ with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file:
1041
+ img.save(tmp_file, 'JPEG', quality=95)
1042
+ tmp_img_path = tmp_file.name
1043
+
1044
+ # Update progress: Preprocess (5-20%)
1045
+ async with _jobs_lock:
1046
+ if job_id not in _jobs:
1047
+ return # Job was cancelled before starting
1048
+ _jobs[job_id]["progress"] = 0.05
1049
+ _jobs[job_id]["message"] = "Preprocessing image..."
1050
+
1051
+ await bus.send(job_id, pct=20, stage="preprocess")
1052
+
1053
+ # Progress callback that updates both job status and SSE
1054
+ async def update_progress(progress: float, message: str):
1055
+ # Update job status
1056
+ async with _jobs_lock:
1057
+ if job_id in _jobs:
1058
+ _jobs[job_id]["progress"] = progress
1059
+ _jobs[job_id]["message"] = message
1060
+
1061
+ # Send to SSE stream
1062
+ pct = progress * 100
1063
+ stage_map = {
1064
+ "preprocessing": "preprocess",
1065
+ "starting ocr inference": "encoding",
1066
+ "parsing ocr results": "postprocess",
1067
+ "cleaning output": "postprocess",
1068
+ "complete": "done"
1069
+ }
1070
+ stage = stage_map.get(message.lower(), message.lower().replace(" ", "_"))
1071
+ await bus.send(job_id, pct=pct, stage=stage, msg=message)
1072
+
1073
+ # Check for cancellation
1074
+ async with _jobs_lock:
1075
+ cancel_event = _cancellation_tokens.get(job_id)
1076
+ if cancel_event and cancel_event.is_set():
1077
+ await bus.error(job_id, "Job was cancelled")
1078
+ return
1079
+
1080
+ # Run OCR
1081
+ result = await run_deepseek_ocr(
1082
+ tmp_img_path,
1083
+ prompt="<image>\n<|grounding|>Convert the document to markdown with preserved layout.",
1084
+ use_grounding=True,
1085
+ job_id=job_id,
1086
+ progress_callback=update_progress
1087
+ )
1088
+
1089
+ # Update job status
1090
+ async with _jobs_lock:
1091
+ if job_id in _jobs:
1092
+ _jobs[job_id]["status"] = "completed"
1093
+ _jobs[job_id]["progress"] = 1.0
1094
+ _jobs[job_id]["result"] = result
1095
+ _jobs[job_id]["message"] = "Complete"
1096
+
1097
+ # Finalize SSE stream
1098
+ await bus.finalize(job_id, pct=100, stage="done", **result)
1099
+
1100
+ except asyncio.CancelledError:
1101
+ async with _jobs_lock:
1102
+ if job_id in _jobs:
1103
+ _jobs[job_id]["status"] = "cancelled"
1104
+ _jobs[job_id]["message"] = "Job was cancelled"
1105
+ await bus.error(job_id, "Job was cancelled")
1106
+ except Exception as e:
1107
+ error_msg = str(e)
1108
+ async with _jobs_lock:
1109
+ if job_id in _jobs:
1110
+ _jobs[job_id]["status"] = "failed"
1111
+ _jobs[job_id]["error"] = error_msg
1112
+ _jobs[job_id]["message"] = f"Error: {error_msg}"
1113
+ await bus.error(job_id, error_msg)
1114
+ finally:
1115
+ # Cleanup temp files
1116
+ if tmp_img_path and os.path.exists(tmp_img_path):
1117
+ os.unlink(tmp_img_path)
1118
+ if img_path and os.path.exists(img_path):
1119
+ os.unlink(img_path)
1120
+
1121
+
1122
+ @app.get("/progress/{job_id}")
1123
+ async def get_progress_stream(job_id: str):
1124
+ """SSE stream for real-time OCR progress updates"""
1125
+ try:
1126
+ from progress_bus import bus
1127
+ except ImportError:
1128
+ raise HTTPException(status_code=503, detail="SSE streaming not available")
1129
+
1130
+ return StreamingResponse(
1131
+ bus.stream(job_id),
1132
+ media_type="text/event-stream",
1133
+ headers={
1134
+ "Cache-Control": "no-cache",
1135
+ "Connection": "keep-alive",
1136
+ "X-Accel-Buffering": "no", # Disable nginx buffering
1137
+ }
1138
+ )
1139
+
1140
+
1141
+ @app.get("/jobs/{job_id}/status")
1142
+ async def get_job_status(job_id: str):
1143
+ """Get status of an OCR job (polling endpoint)"""
1144
+ async with _jobs_lock:
1145
+ if job_id not in _jobs:
1146
+ raise HTTPException(status_code=404, detail="Job not found")
1147
+ job = _jobs[job_id]
1148
+ return {
1149
+ "job_id": job_id,
1150
+ "status": job["status"], # processing, completed, failed, cancelled
1151
+ "progress": job["progress"], # 0.0 to 1.0
1152
+ "message": job["message"],
1153
+ "result": job.get("result"),
1154
+ "error": job.get("error")
1155
+ }
1156
+
1157
+
1158
+ @app.post("/jobs/{job_id}/cancel")
1159
+ async def cancel_job(job_id: str):
1160
+ """Cancel a running OCR job"""
1161
+ async with _jobs_lock:
1162
+ if job_id not in _jobs:
1163
+ raise HTTPException(status_code=404, detail="Job not found")
1164
+
1165
+ job = _jobs[job_id]
1166
+ if job["status"] in ("completed", "failed", "cancelled"):
1167
+ return {"message": f"Job already {job['status']}"}
1168
+
1169
+ # Set cancellation flag
1170
+ if job_id in _cancellation_tokens:
1171
+ _cancellation_tokens[job_id].set()
1172
+
1173
+ job["status"] = "cancelling"
1174
+ job["message"] = "Cancellation requested..."
1175
+
1176
+ # Send cancellation to SSE stream
1177
+ try:
1178
+ from progress_bus import bus
1179
+ await bus.error(job_id, "Cancellation requested")
1180
+ except ImportError:
1181
+ pass
1182
+
1183
+ return {"message": "Cancellation requested", "job_id": job_id}
1184
+
1185
+
1186
  @app.post("/split")
1187
  async def split(
1188
  file: UploadFile,
 
1215
  try:
1216
  # Use DeepSeek-OCR with grounding prompt for better structured extraction
1217
  prompt = "<image>\n<|grounding|>Convert the document region to markdown with preserved layout."
1218
+ ocr_result = await run_deepseek_ocr(crop_path, prompt=prompt, use_grounding=True, detect_fields=False)
1219
 
1220
  # Parse OCR result to extract lines
1221
  child_lines = ocr_result.get("lines", [])