Spaces:
Runtime error
Runtime error
File size: 7,110 Bytes
6f08d64 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import gradio as gr
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch
import pandas as pd
from datetime import datetime
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import re
# Azure Storage Account details
STORAGE_ACCOUNT_NAME = "piointernaldestrg"
STORAGE_ACCOUNT_KEY = "Pd91QXwgXkiRyd4njM06B9rRFSvtMBijk99N9s7n1M405Kmn4vWzMUmm0vstoYtLLepFmKb9iBaJ+ASt6q+jwg=="
CONTAINER_NAME = "invoices"
# Initialize model and processor
model = Qwen2VLForConditionalGeneration.from_pretrained("Qwen/Qwen2-VL-2B-Instruct-AWQ", torch_dtype="auto")
if torch.cuda.is_available():
model.to("cuda")
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct-AWQ")
# Function to process a batch of images
def process_image_batch(model, processor, image_paths):
results = []
for image_path in image_paths:
try:
prompt = (
"Please extract the following details from the invoice:\n"
"- 'invoice_number'\n"
"- 'date'\n"
"- 'place of invoice (city)'\n"
"- 'total amount'\n"
"- 'category of invoice (like food, stay, travel, other)'"
)
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image_path},
{"type": "text", "text": prompt},
],
}
]
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
generated_ids = model.generate(**inputs, max_new_tokens=128)
generated_ids_trimmed = [out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]
output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)
structured_data = {
"invoice_number": None,
"date": None,
"place_of_invoice": None,
"total_amount": None,
"category_of_invoice": None,
}
total_amount_found = False
for line in output_text[0].split("\n"):
# Invoice number mapping logic
if any(keyword in line.lower() for keyword in ["invoice_number", "number in bold", "number", "bill number", "estimate number"]):
structured_data["invoice_number"] = line.split(":")[-1].strip()
# Date mapping logic
elif "date" in line.lower():
date = line.split(":")[-1].strip()
structured_data["date"] = process_date(date)
# Place of invoice mapping logic
elif "place of invoice" in line.lower():
structured_data["place_of_invoice"] = line.split(":")[-1].strip()
# Total amount mapping logic
elif any(keyword in line.lower() for keyword in ["total", "total amount", "grand total", "final amount", "balance due"]):
amounts = re.findall(r"\d+\.\d{2}", line)
if amounts:
structured_data["total_amount"] = amounts[-1]
total_amount_found = True
elif not total_amount_found and re.match(r"^\s*TOTAL\s*:\s*\d+\.\d{2}\s*$", line, re.IGNORECASE):
structured_data["total_amount"] = re.findall(r"\d+\.\d{2}", line)[0]
total_amount_found = True
# Category of invoice mapping logic
elif "category of invoice" in line.lower():
structured_data["category_of_invoice"] = line.split(":")[-1].strip()
results.append(structured_data)
except Exception as e:
results.append({
"invoice_number": "Error",
"date": "Error",
"place_of_invoice": "Error",
"total_amount": "Error",
"category_of_invoice": str(e),
})
return pd.DataFrame(results)
# Function to process and format dates
def process_date(date_str):
try:
if re.match(r"\d{2}/\d{2}/\d{4}", date_str):
return date_str
elif re.match(r"\d{2} \w+ \d{4}", date_str):
date_obj = datetime.strptime(date_str, "%d %b %Y")
return date_obj.strftime("%d/%m/%Y")
elif re.match(r"\d{2} \w+", date_str):
date_obj = datetime.strptime(date_str, "%d %b")
return date_obj.strftime("%d/%m") + "/YYYY"
else:
return date_str
except:
return date_str
# Upload extracted data to Azure Blob Storage as a Parquet file
def upload_to_azure_blob(df):
try:
# Convert DataFrame to Parquet format
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, index=False)
# Create the BlobServiceClient object
blob_service_client = BlobServiceClient(
account_url=f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
credential=STORAGE_ACCOUNT_KEY,
)
# Get the BlobClient object
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=f"invoice_data_{timestamp}.parquet")
# Upload the Parquet file
blob_client.upload_blob(parquet_buffer.getvalue(), overwrite=True)
# Return the file URL
return f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{CONTAINER_NAME}/invoice_data_{timestamp}.parquet"
except Exception as e:
return {"error": str(e)}
# Gradio interface function
def gradio_interface(username, email, image_files):
df = process_image_batch(model, processor, image_files)
file_url = upload_to_azure_blob(df)
user_info = f"Username: {username}\nEmail: {email}"
return user_info, df, f"Parquet File URL: {file_url}"
# Define the Gradio interface
grpc_interface = gr.Interface(
fn=gradio_interface,
inputs=[
gr.Textbox(label="Username"),
gr.Textbox(label="Email"),
gr.Files(label="Upload Invoice Images", type="filepath"),
],
outputs=[
gr.Textbox(label="User Info"),
gr.Dataframe(label="Extracted Invoice Data"),
gr.Textbox(label="Parquet File URL"),
],
title="Invoice Extraction System",
description="Upload invoices, extract details, and save to Azure Blob Storage.",
)
# Launch the Gradio interface
if __name__ == "__main__":
grpc_interface.launch(share=True)
|