| import os |
| import json |
| from typing import List, Dict, Any |
| import pdfplumber |
| from docx import Document |
| from config.settings import Config |
|
|
| class DocumentProcessor: |
| def __init__(self): |
| self.config = Config() |
| |
| def validate_file_size(self, file_path: str) -> bool: |
| """Validate file size is within limits.""" |
| file_size_mb = os.path.getsize(file_path) / (1024 * 1024) |
| return file_size_mb <= self.config.MAX_FILE_SIZE_MB |
| |
| def load_document(self, file_path: str) -> str: |
| """Load document content based on file extension.""" |
| if not self.validate_file_size(file_path): |
| raise ValueError(f"File size exceeds {self.config.MAX_FILE_SIZE_MB}MB limit") |
| |
| file_ext = os.path.splitext(file_path)[1].lower() |
| |
| if file_ext == '.pdf': |
| return self._load_pdf(file_path) |
| elif file_ext == '.docx': |
| return self._load_docx(file_path) |
| elif file_ext == '.txt': |
| return self._load_txt(file_path) |
| elif file_ext == '.json': |
| return self._load_json(file_path) |
| else: |
| raise ValueError(f"Unsupported file format: {file_ext}") |
| |
| def _load_pdf(self, file_path: str) -> str: |
| """Load PDF content.""" |
| text = "" |
| with pdfplumber.open(file_path) as pdf: |
| for page in pdf.pages: |
| page_text = page.extract_text() |
| if page_text: |
| text += page_text + "\n" |
| return text |
| |
| def _load_docx(self, file_path: str) -> str: |
| """Load DOCX content.""" |
| doc = Document(file_path) |
| text = "" |
| for paragraph in doc.paragraphs: |
| text += paragraph.text + "\n" |
| return text |
| |
| def _load_txt(self, file_path: str) -> str: |
| """Load TXT content.""" |
| with open(file_path, 'r', encoding='utf-8') as file: |
| return file.read() |
| |
| def _load_json(self, file_path: str) -> str: |
| """Load JSON content and convert to text.""" |
| with open(file_path, 'r', encoding='utf-8') as file: |
| data = json.load(file) |
| return json.dumps(data, indent=2) |
| |
| def chunk_text(self, text: str) -> List[str]: |
| """Split text into overlapping chunks for processing.""" |
| if len(text) <= self.config.CHUNK_SIZE: |
| return [text] |
| |
| chunks = [] |
| start = 0 |
| |
| while start < len(text): |
| end = start + self.config.CHUNK_SIZE |
| |
| |
| if end < len(text): |
| |
| sentence_end = text.rfind('.', start, end) |
| if sentence_end == -1: |
| sentence_end = text.rfind('!', start, end) |
| if sentence_end == -1: |
| sentence_end = text.rfind('?', start, end) |
| |
| if sentence_end != -1 and sentence_end > start + self.config.CHUNK_SIZE // 2: |
| end = sentence_end + 1 |
| |
| chunk = text[start:end].strip() |
| if chunk: |
| chunks.append(chunk) |
| |
| start = end - self.config.CHUNK_OVERLAP |
| if start >= len(text): |
| break |
| |
| return chunks |
| |
| def process_documents(self, file_paths: List[str], batch_mode: bool = False) -> List[Dict[str, Any]]: |
| """Process multiple documents.""" |
| results = [] |
| |
| for file_path in file_paths: |
| try: |
| content = self.load_document(file_path) |
| chunks = self.chunk_text(content) |
| |
| results.append({ |
| 'file_path': file_path, |
| 'content': content, |
| 'chunks': chunks, |
| 'status': 'success' |
| }) |
| |
| if not batch_mode: |
| break |
| |
| except Exception as e: |
| results.append({ |
| 'file_path': file_path, |
| 'content': '', |
| 'chunks': [], |
| 'status': 'error', |
| 'error': str(e) |
| }) |
| |
| return results |
|
|