Spaces:
Sleeping
Sleeping
| """ | |
| Urdu text preprocessing utilities. | |
| Handles Unicode normalization, RTL text cleanup, and Urdu-specific processing. | |
| """ | |
| import re | |
| import unicodedata | |
| def normalize_unicode(text: str) -> str: | |
| """Apply NFC Unicode normalization to standardize Urdu characters.""" | |
| return unicodedata.normalize("NFC", text) | |
| def remove_diacritics(text: str) -> str: | |
| """Remove Arabic/Urdu diacritical marks (tashkeel/harakat).""" | |
| # Unicode range for Arabic diacritics | |
| diacritics_pattern = re.compile(r'[\u0610-\u061A\u064B-\u065F\u0670\u06D6-\u06DC\u06DF-\u06E4\u06E7-\u06E8\u06EA-\u06ED]') | |
| return diacritics_pattern.sub('', text) | |
| def normalize_whitespace(text: str) -> str: | |
| """Normalize multiple spaces, tabs, and unusual whitespace characters.""" | |
| # Replace multiple spaces with single space | |
| text = re.sub(r'[ \t]+', ' ', text) | |
| # Remove spaces at line beginnings/ends | |
| text = re.sub(r'^ +| +$', '', text, flags=re.MULTILINE) | |
| # Normalize line breaks | |
| text = re.sub(r'\r\n', '\n', text) | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text.strip() | |
| def normalize_urdu_characters(text: str) -> str: | |
| """Normalize variant forms of Urdu characters to standard forms.""" | |
| replacements = { | |
| # Arabic Kaf → Urdu Kaf | |
| '\u0643': '\u06A9', # ك → ک | |
| # Arabic Yeh → Urdu Yeh | |
| '\u064A': '\u06CC', # ي → ی | |
| # Arabic Heh → Urdu Heh | |
| '\u0647': '\u06BE', # This is context-dependent, keep both | |
| # Teh Marbuta variants | |
| '\u0629': '\u06C3', # ة → ۃ (context-dependent) | |
| # Arabic numerals to English numerals (optional, for legal sections) | |
| '٠': '0', '١': '1', '٢': '2', '٣': '3', '٤': '4', | |
| '٥': '5', '٦': '6', '٧': '7', '٨': '8', '٩': '9', | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| return text | |
| def clean_ocr_artifacts(text: str) -> str: | |
| """Remove common OCR artifacts and noise characters.""" | |
| # Remove control characters except newline and tab | |
| text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) | |
| # Remove isolated dots that aren't part of Urdu punctuation | |
| text = re.sub(r'(?<!\w)\.{3,}(?!\w)', '', text) | |
| # Remove unusual symbols that OCR might produce | |
| text = re.sub(r'[□■▪▫●○◆◇★☆♦♣♠♥]', '', text) | |
| return text | |
| def segment_into_chunks(text: str, max_bytes: int = 256, overlap_bytes: int = 64) -> list: | |
| """ | |
| Segment long text into overlapping byte-length chunks for ByT5 processing. | |
| Args: | |
| text: Input text to segment | |
| max_bytes: Maximum bytes per chunk (ByT5 input limit) | |
| overlap_bytes: Overlap between consecutive chunks | |
| Returns: | |
| List of (chunk_text, start_pos, end_pos) tuples | |
| """ | |
| text_bytes = text.encode('utf-8') | |
| total_bytes = len(text_bytes) | |
| if total_bytes <= max_bytes: | |
| return [(text, 0, len(text))] | |
| chunks = [] | |
| byte_start = 0 | |
| while byte_start < total_bytes: | |
| byte_end = min(byte_start + max_bytes, total_bytes) | |
| # Avoid cutting in the middle of a UTF-8 character | |
| if byte_end < total_bytes: | |
| while byte_end > byte_start and (text_bytes[byte_end] & 0xC0) == 0x80: | |
| byte_end -= 1 | |
| chunk_bytes = text_bytes[byte_start:byte_end] | |
| chunk_text = chunk_bytes.decode('utf-8', errors='ignore') | |
| # Calculate character positions | |
| char_start = len(text_bytes[:byte_start].decode('utf-8', errors='ignore')) | |
| char_end = char_start + len(chunk_text) | |
| chunks.append((chunk_text, char_start, char_end)) | |
| # Move forward with overlap | |
| byte_start = byte_end - overlap_bytes | |
| if byte_start >= total_bytes: | |
| break | |
| # Avoid cutting in the middle of a UTF-8 character | |
| while byte_start > 0 and (text_bytes[byte_start] & 0xC0) == 0x80: | |
| byte_start -= 1 | |
| return chunks | |
| def merge_chunks(chunks: list, overlap_chars: int = 32) -> str: | |
| """ | |
| Merge overlapping chunks back into a single text. | |
| Uses the middle portion of each chunk's overlap for best quality. | |
| Args: | |
| chunks: List of (corrected_text, start_pos, end_pos) tuples | |
| overlap_chars: Approximate character overlap between chunks | |
| Returns: | |
| Merged text string | |
| """ | |
| if not chunks: | |
| return "" | |
| if len(chunks) == 1: | |
| return chunks[0][0] | |
| merged = chunks[0][0] | |
| for i in range(1, len(chunks)): | |
| current_text = chunks[i][0] | |
| # Find the best overlap point | |
| # Take the second half of the overlap from the current chunk | |
| half_overlap = overlap_chars // 2 | |
| if len(current_text) > half_overlap: | |
| merged += current_text[half_overlap:] | |
| else: | |
| merged += current_text | |
| return merged | |
| def preprocess_for_byt5(text: str) -> str: | |
| """Full preprocessing pipeline for ByT5 input.""" | |
| text = normalize_unicode(text) | |
| text = clean_ocr_artifacts(text) | |
| text = normalize_whitespace(text) | |
| return text | |
| def preprocess_for_ner(text: str) -> str: | |
| """Full preprocessing pipeline for NER input.""" | |
| text = normalize_unicode(text) | |
| text = normalize_urdu_characters(text) | |
| text = normalize_whitespace(text) | |
| return text | |
| def extract_section_blocks(text: str) -> dict: | |
| """ | |
| Parse section-tagged text from VLM output into a structured dictionary. | |
| Args: | |
| text: Text with section tags like [HEADER], [INCIDENT_DETAILS], etc. | |
| Returns: | |
| Dictionary mapping section names to their text content | |
| """ | |
| sections = {} | |
| # Pattern to match section tags | |
| section_pattern = re.compile(r'\[([A-Z_]+)\]') | |
| current_section = "UNTAGGED" | |
| current_content = [] | |
| for line in text.split('\n'): | |
| match = section_pattern.search(line) | |
| if match: | |
| # Save previous section | |
| if current_content: | |
| content = '\n'.join(current_content).strip() | |
| if content: | |
| sections[current_section] = content | |
| # Start new section | |
| current_section = match.group(1) | |
| current_content = [] | |
| # Check if there's text after the tag on the same line | |
| remaining = line[match.end():].strip() | |
| if remaining: | |
| current_content.append(remaining) | |
| else: | |
| current_content.append(line) | |
| # Save last section | |
| if current_content: | |
| content = '\n'.join(current_content).strip() | |
| if content: | |
| sections[current_section] = content | |
| return sections | |
| def get_full_text_from_sections(sections: dict) -> str: | |
| """Reconstruct full text from section dictionary, removing tags.""" | |
| parts = [] | |
| for section_name, content in sections.items(): | |
| if content.strip(): | |
| parts.append(content.strip()) | |
| return '\n\n'.join(parts) | |