| """ |
| Document Parser |
| |
| Main orchestrator for document parsing pipeline. |
| Coordinates OCR, layout detection, and chunk generation. |
| """ |
|
|
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Dict, Iterator, List, Optional, Tuple, Union |
|
|
| import numpy as np |
|
|
| from ..chunks.models import ( |
| BoundingBox, |
| ChunkType, |
| DocumentChunk, |
| PageResult, |
| ParseResult, |
| TableChunk, |
| ChartChunk, |
| ) |
| from ..io import ( |
| DocumentFormat, |
| DocumentInfo, |
| RenderOptions, |
| load_document, |
| get_document_cache, |
| ) |
| from ..models import ( |
| OCRModel, |
| OCRResult, |
| LayoutModel, |
| LayoutResult, |
| LayoutRegion, |
| LayoutRegionType, |
| TableModel, |
| TableStructure, |
| ChartModel, |
| ChartStructure, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ParserConfig: |
| """Configuration for document parser.""" |
|
|
| |
| render_dpi: int = 200 |
| max_pages: Optional[int] = None |
|
|
| |
| ocr_enabled: bool = True |
| ocr_languages: List[str] = field(default_factory=lambda: ["en"]) |
| ocr_min_confidence: float = 0.5 |
|
|
| |
| layout_enabled: bool = True |
| reading_order_enabled: bool = True |
|
|
| |
| table_extraction_enabled: bool = True |
| chart_extraction_enabled: bool = True |
|
|
| |
| merge_adjacent_text: bool = True |
| min_chunk_chars: int = 10 |
| max_chunk_chars: int = 4000 |
|
|
| |
| cache_enabled: bool = True |
|
|
| |
| include_markdown: bool = True |
| include_raw_ocr: bool = False |
|
|
|
|
| class DocumentParser: |
| """ |
| Main document parsing orchestrator. |
| |
| Coordinates the full pipeline: |
| 1. Load document and render pages |
| 2. Run OCR on each page |
| 3. Detect layout regions |
| 4. Extract tables and charts |
| 5. Generate semantic chunks |
| 6. Build reading order |
| 7. Produce final ParseResult |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[ParserConfig] = None, |
| ocr_model: Optional[OCRModel] = None, |
| layout_model: Optional[LayoutModel] = None, |
| table_model: Optional[TableModel] = None, |
| chart_model: Optional[ChartModel] = None, |
| ): |
| self.config = config or ParserConfig() |
| self.ocr_model = ocr_model |
| self.layout_model = layout_model |
| self.table_model = table_model |
| self.chart_model = chart_model |
|
|
| self._cache = get_document_cache() if self.config.cache_enabled else None |
|
|
| def parse( |
| self, |
| path: Union[str, Path], |
| page_range: Optional[Tuple[int, int]] = None, |
| ) -> ParseResult: |
| """ |
| Parse a document and return structured results. |
| |
| Args: |
| path: Path to document file |
| page_range: Optional (start, end) page range (1-indexed, inclusive) |
| |
| Returns: |
| ParseResult with chunks and metadata |
| """ |
| path = Path(path) |
| start_time = time.time() |
|
|
| logger.info(f"Parsing document: {path}") |
|
|
| |
| loader, renderer = load_document(path) |
| doc_info = loader.info |
|
|
| |
| doc_id = doc_info.doc_id |
|
|
| |
| start_page = page_range[0] if page_range else 1 |
| end_page = page_range[1] if page_range else doc_info.num_pages |
|
|
| if self.config.max_pages: |
| end_page = min(end_page, start_page + self.config.max_pages - 1) |
|
|
| page_numbers = list(range(start_page, end_page + 1)) |
|
|
| logger.info(f"Processing pages {start_page}-{end_page} of {doc_info.num_pages}") |
|
|
| |
| page_results: List[PageResult] = [] |
| all_chunks: List[DocumentChunk] = [] |
| markdown_by_page: Dict[int, str] = {} |
| sequence_index = 0 |
|
|
| render_options = RenderOptions(dpi=self.config.render_dpi) |
|
|
| for page_num, page_image in renderer.render_pages(page_numbers, render_options): |
| logger.debug(f"Processing page {page_num}") |
|
|
| |
| page_result, page_chunks = self._process_page( |
| page_image=page_image, |
| page_number=page_num, |
| doc_id=doc_id, |
| sequence_start=sequence_index, |
| ) |
|
|
| page_results.append(page_result) |
| all_chunks.extend(page_chunks) |
| sequence_index += len(page_chunks) |
|
|
| |
| if self.config.include_markdown: |
| markdown_by_page[page_num] = self._generate_page_markdown(page_chunks) |
|
|
| |
| loader.close() |
|
|
| |
| markdown_full = "\n\n---\n\n".join( |
| f"## Page {p}\n\n{md}" |
| for p, md in sorted(markdown_by_page.items()) |
| ) |
|
|
| processing_time = time.time() - start_time |
| logger.info(f"Parsed {len(all_chunks)} chunks in {processing_time:.2f}s") |
|
|
| return ParseResult( |
| doc_id=doc_id, |
| source_path=str(path.absolute()), |
| filename=path.name, |
| num_pages=doc_info.num_pages, |
| pages=page_results, |
| chunks=all_chunks, |
| markdown_full=markdown_full, |
| markdown_by_page=markdown_by_page, |
| processing_time_ms=processing_time * 1000, |
| metadata={ |
| "format": doc_info.format.value, |
| "has_text_layer": doc_info.has_text_layer, |
| "is_scanned": doc_info.is_scanned, |
| "render_dpi": self.config.render_dpi, |
| } |
| ) |
|
|
| def _process_page( |
| self, |
| page_image: np.ndarray, |
| page_number: int, |
| doc_id: str, |
| sequence_start: int, |
| ) -> Tuple[PageResult, List[DocumentChunk]]: |
| """Process a single page.""" |
| height, width = page_image.shape[:2] |
| chunks: List[DocumentChunk] = [] |
| sequence_index = sequence_start |
|
|
| |
| ocr_result: Optional[OCRResult] = None |
| if self.config.ocr_enabled and self.ocr_model: |
| ocr_result = self.ocr_model.recognize(page_image) |
|
|
| |
| layout_result: Optional[LayoutResult] = None |
| if self.config.layout_enabled and self.layout_model: |
| layout_result = self.layout_model.detect(page_image) |
|
|
| |
| if layout_result and layout_result.regions: |
| for region in layout_result.get_ordered_regions(): |
| region_chunks = self._process_region( |
| page_image=page_image, |
| region=region, |
| ocr_result=ocr_result, |
| page_number=page_number, |
| doc_id=doc_id, |
| sequence_index=sequence_index, |
| image_size=(width, height), |
| ) |
| chunks.extend(region_chunks) |
| sequence_index += len(region_chunks) |
|
|
| elif ocr_result and ocr_result.blocks: |
| |
| for block in ocr_result.blocks: |
| chunk = self._create_text_chunk( |
| text=block.text, |
| bbox=block.bbox, |
| confidence=block.confidence, |
| page_number=page_number, |
| doc_id=doc_id, |
| sequence_index=sequence_index, |
| chunk_type=ChunkType.PARAGRAPH, |
| ) |
| chunks.append(chunk) |
| sequence_index += 1 |
|
|
| |
| if self.config.merge_adjacent_text: |
| chunks = self._merge_adjacent_chunks(chunks) |
|
|
| |
| page_result = PageResult( |
| page_number=page_number, |
| width=width, |
| height=height, |
| chunks=[c.chunk_id for c in chunks], |
| ocr_confidence=ocr_result.confidence if ocr_result else None, |
| ) |
|
|
| return page_result, chunks |
|
|
| def _process_region( |
| self, |
| page_image: np.ndarray, |
| region: LayoutRegion, |
| ocr_result: Optional[OCRResult], |
| page_number: int, |
| doc_id: str, |
| sequence_index: int, |
| image_size: Tuple[int, int], |
| ) -> List[DocumentChunk]: |
| """Process a single layout region.""" |
| chunks: List[DocumentChunk] = [] |
| width, height = image_size |
|
|
| |
| bbox = region.bbox |
| if not bbox.normalized: |
| bbox = bbox.to_normalized(width, height) |
|
|
| |
| if region.region_type == LayoutRegionType.TABLE: |
| table_chunk = self._extract_table( |
| page_image=page_image, |
| region=region, |
| page_number=page_number, |
| doc_id=doc_id, |
| sequence_index=sequence_index, |
| ) |
| if table_chunk: |
| chunks.append(table_chunk) |
|
|
| elif region.region_type in {LayoutRegionType.CHART, LayoutRegionType.FIGURE}: |
| |
| chart_chunk = self._extract_chart( |
| page_image=page_image, |
| region=region, |
| page_number=page_number, |
| doc_id=doc_id, |
| sequence_index=sequence_index, |
| ) |
| if chart_chunk: |
| chunks.append(chart_chunk) |
| else: |
| |
| text = self._get_region_text(region, ocr_result) or "[Figure]" |
| chunk = self._create_text_chunk( |
| text=text, |
| bbox=bbox, |
| confidence=region.confidence, |
| page_number=page_number, |
| doc_id=doc_id, |
| sequence_index=sequence_index, |
| chunk_type=ChunkType.FIGURE, |
| ) |
| chunks.append(chunk) |
|
|
| else: |
| |
| text = self._get_region_text(region, ocr_result) |
| if text and len(text.strip()) >= self.config.min_chunk_chars: |
| chunk_type = region.region_type.to_chunk_type() |
| chunk = self._create_text_chunk( |
| text=text, |
| bbox=bbox, |
| confidence=region.confidence, |
| page_number=page_number, |
| doc_id=doc_id, |
| sequence_index=sequence_index, |
| chunk_type=chunk_type, |
| ) |
| chunks.append(chunk) |
|
|
| return chunks |
|
|
| def _get_region_text( |
| self, |
| region: LayoutRegion, |
| ocr_result: Optional[OCRResult], |
| ) -> str: |
| """Get text for a region from OCR result.""" |
| if not ocr_result: |
| return "" |
|
|
| return ocr_result.get_text_in_region(region.bbox, threshold=0.3) |
|
|
| def _extract_table( |
| self, |
| page_image: np.ndarray, |
| region: LayoutRegion, |
| page_number: int, |
| doc_id: str, |
| sequence_index: int, |
| ) -> Optional[TableChunk]: |
| """Extract table structure from a region.""" |
| if not self.config.table_extraction_enabled or not self.table_model: |
| return None |
|
|
| try: |
| table_structure = self.table_model.extract_structure( |
| page_image, |
| region.bbox |
| ) |
|
|
| if table_structure.num_rows > 0: |
| return table_structure.to_table_chunk( |
| doc_id=doc_id, |
| page=page_number, |
| sequence_index=sequence_index, |
| ) |
| except Exception as e: |
| logger.warning(f"Table extraction failed: {e}") |
|
|
| return None |
|
|
| def _extract_chart( |
| self, |
| page_image: np.ndarray, |
| region: LayoutRegion, |
| page_number: int, |
| doc_id: str, |
| sequence_index: int, |
| ) -> Optional[ChartChunk]: |
| """Extract chart data from a region.""" |
| if not self.config.chart_extraction_enabled or not self.chart_model: |
| return None |
|
|
| try: |
| chart_structure = self.chart_model.extract_chart( |
| page_image, |
| region.bbox |
| ) |
|
|
| if chart_structure.chart_type.value != "unknown": |
| return chart_structure.to_chart_chunk( |
| doc_id=doc_id, |
| page=page_number, |
| sequence_index=sequence_index, |
| ) |
| except Exception as e: |
| logger.warning(f"Chart extraction failed: {e}") |
|
|
| return None |
|
|
| def _create_text_chunk( |
| self, |
| text: str, |
| bbox: BoundingBox, |
| confidence: float, |
| page_number: int, |
| doc_id: str, |
| sequence_index: int, |
| chunk_type: ChunkType, |
| ) -> DocumentChunk: |
| """Create a text chunk.""" |
| chunk_id = DocumentChunk.generate_chunk_id( |
| doc_id=doc_id, |
| page=page_number, |
| bbox=bbox, |
| chunk_type_str=chunk_type.value, |
| ) |
|
|
| return DocumentChunk( |
| chunk_id=chunk_id, |
| doc_id=doc_id, |
| chunk_type=chunk_type, |
| text=text, |
| page=page_number, |
| bbox=bbox, |
| confidence=confidence, |
| sequence_index=sequence_index, |
| ) |
|
|
| def _merge_adjacent_chunks( |
| self, |
| chunks: List[DocumentChunk], |
| ) -> List[DocumentChunk]: |
| """Merge adjacent text chunks of the same type.""" |
| if len(chunks) <= 1: |
| return chunks |
|
|
| merged: List[DocumentChunk] = [] |
| current: Optional[DocumentChunk] = None |
|
|
| mergeable_types = { |
| ChunkType.TEXT, |
| ChunkType.PARAGRAPH, |
| } |
|
|
| for chunk in chunks: |
| if current is None: |
| current = chunk |
| continue |
|
|
| |
| can_merge = ( |
| current.chunk_type in mergeable_types and |
| chunk.chunk_type in mergeable_types and |
| current.chunk_type == chunk.chunk_type and |
| current.page == chunk.page and |
| self._chunks_adjacent(current, chunk) |
| ) |
|
|
| if can_merge: |
| |
| merged_text = current.text + "\n" + chunk.text |
| if len(merged_text) <= self.config.max_chunk_chars: |
| current = DocumentChunk( |
| chunk_id=current.chunk_id, |
| doc_id=current.doc_id, |
| chunk_type=current.chunk_type, |
| text=merged_text, |
| page=current.page, |
| bbox=self._merge_bboxes(current.bbox, chunk.bbox), |
| confidence=min(current.confidence, chunk.confidence), |
| sequence_index=current.sequence_index, |
| ) |
| else: |
| merged.append(current) |
| current = chunk |
| else: |
| merged.append(current) |
| current = chunk |
|
|
| if current: |
| merged.append(current) |
|
|
| return merged |
|
|
| def _chunks_adjacent( |
| self, |
| chunk1: DocumentChunk, |
| chunk2: DocumentChunk, |
| gap_threshold: float = 0.05, |
| ) -> bool: |
| """Check if two chunks are vertically adjacent.""" |
| |
| gap = chunk2.bbox.y_min - chunk1.bbox.y_max |
| return 0 <= gap <= gap_threshold |
|
|
| def _merge_bboxes( |
| self, |
| bbox1: BoundingBox, |
| bbox2: BoundingBox, |
| ) -> BoundingBox: |
| """Merge two bounding boxes.""" |
| return BoundingBox( |
| x_min=min(bbox1.x_min, bbox2.x_min), |
| y_min=min(bbox1.y_min, bbox2.y_min), |
| x_max=max(bbox1.x_max, bbox2.x_max), |
| y_max=max(bbox1.y_max, bbox2.y_max), |
| normalized=bbox1.normalized, |
| ) |
|
|
| def _generate_page_markdown( |
| self, |
| chunks: List[DocumentChunk], |
| ) -> str: |
| """Generate markdown for page chunks.""" |
| lines: List[str] = [] |
|
|
| for chunk in chunks: |
| |
| lines.append(f"<!-- chunk:{chunk.chunk_id} -->") |
|
|
| |
| if chunk.chunk_type == ChunkType.TITLE: |
| lines.append(f"# {chunk.text}") |
| elif chunk.chunk_type == ChunkType.HEADING: |
| lines.append(f"## {chunk.text}") |
| elif chunk.chunk_type == ChunkType.TABLE: |
| if isinstance(chunk, TableChunk): |
| lines.append(chunk.to_markdown()) |
| else: |
| lines.append(chunk.text) |
| elif chunk.chunk_type == ChunkType.LIST: |
| |
| for item in chunk.text.split("\n"): |
| if item.strip(): |
| lines.append(f"- {item.strip()}") |
| elif chunk.chunk_type == ChunkType.CODE: |
| lines.append(f"```\n{chunk.text}\n```") |
| elif chunk.chunk_type == ChunkType.FIGURE: |
| lines.append(f"[Figure: {chunk.text}]") |
| elif chunk.chunk_type == ChunkType.CHART: |
| if isinstance(chunk, ChartChunk): |
| lines.append(f"[Chart: {chunk.title or chunk.chart_type}]") |
| lines.append(chunk.text) |
| else: |
| lines.append(f"[Chart: {chunk.text}]") |
| else: |
| lines.append(chunk.text) |
|
|
| lines.append("") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def parse_document( |
| path: Union[str, Path], |
| config: Optional[ParserConfig] = None, |
| ) -> ParseResult: |
| """ |
| Convenience function to parse a document. |
| |
| Args: |
| path: Path to document |
| config: Optional parser configuration |
| |
| Returns: |
| ParseResult with extracted chunks |
| """ |
| parser = DocumentParser(config=config) |
| return parser.parse(path) |
|
|