support drilldown data to PDF

This commit is contained in:
Blade He 2024-11-08 11:22:35 -06:00
parent 0349033eaf
commit 81f855f725
3 changed files with 405 additions and 27 deletions

249
main.py
View File

@ -4,9 +4,15 @@ import pandas as pd
from glob import glob
from tqdm import tqdm
import time
import fitz
import re
from io import BytesIO
from traceback import print_exc
from utils.logger import logger
from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.sql_query_util import query_document_fund_mapping
from utils.pdf_util import PDFUtil
from utils.biz_utils import add_slash_to_text_as_regex
from core.page_filter import FilterPages
from core.data_extraction import DataExtraction
from core.data_mapping import DataMapping
@ -21,6 +27,7 @@ class EMEA_AR_Parsing:
output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
extract_way: str = "text",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
) -> None:
self.doc_id = doc_id
self.pdf_folder = pdf_folder
@ -67,6 +74,11 @@ class EMEA_AR_Parsing:
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
self.datapoints = self.get_datapoints_from_datapoint_page_info()
if drilldown_folder is None or len(drilldown_folder) == 0:
drilldown_folder = r"/data/emea_ar/output/drilldown/"
os.makedirs(drilldown_folder, exist_ok=True)
self.drilldown_folder = drilldown_folder
def download_pdf(self) -> str:
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
return pdf_file
@ -85,6 +97,7 @@ class EMEA_AR_Parsing:
self,
re_run: bool = False,
) -> list:
found_data = False
if not re_run:
output_data_json_folder = os.path.join(
self.output_extract_data_folder, "json/"
@ -97,8 +110,9 @@ class EMEA_AR_Parsing:
)
with open(json_file, "r", encoding="utf-8") as f:
data_from_gpt = json.load(f)
return data_from_gpt
found_data = True
if not found_data:
try:
data_extraction = DataExtraction(
self.doc_id,
@ -115,8 +129,128 @@ class EMEA_AR_Parsing:
except Exception as e:
logger.error(f"Error: {e}")
data_from_gpt = {"data": []}
# Drilldown data to relevant PDF document
self.drilldown_pdf_document(data_from_gpt)
return data_from_gpt
def drilldown_pdf_document(self, data_from_gpt: list) -> list:
logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}")
pdf_util = PDFUtil(self.pdf_file)
pdf_doc = self.get_pdf_doc(self.pdf_file)
highlight_annotation = False
for data in data_from_gpt:
page_index = data.get("page_index", -1)
if page_index == -1:
continue
extract_data_list = data.get("extract_data", {}).get("data", [])
dp_reported_name_dict = data.get("extract_data", {}).get("dp_reported_name", {})
highlighted_value_list = []
for extract_data in extract_data_list:
for data_point, value in extract_data.items():
if value in highlighted_value_list:
continue
if data_point in ["ter", "ogc", "performance_fee"]:
continue
drilldown_data = self.highlight_pdf_doc(pdf_doc=pdf_doc,
page_index=page_index,
highlight_value=value,
data_point=data_point,
pdf_util=pdf_util)
if len(drilldown_data.get("matching_val_area", [])) > 0:
highlight_annotation = True
highlighted_value_list.append(value)
for data_point, reported_name in dp_reported_name_dict.items():
if reported_name in highlighted_value_list:
continue
data_point = f"{data_point}_reported_name"
drilldown_data = self.highlight_pdf_doc(pdf_doc=pdf_doc,
page_index=page_index,
highlight_value=reported_name,
data_point=data_point,
pdf_util=pdf_util)
if len(drilldown_data.get("matching_val_area", [])) > 0:
highlight_annotation = True
highlighted_value_list.append(reported_name)
if highlight_annotation:
annotated_pdf_file = self.save_annotated_pdf(pdf_doc)
return annotated_pdf_file
def highlight_pdf_doc(self,
pdf_doc: fitz.Document,
page_index: int,
highlight_value: str,
data_point: str = None,
pdf_util: PDFUtil = None,):
page = pdf_doc[page_index]
page_text = page.get_text()
highlight_value = str(highlight_value)
highlight_value_regex = add_slash_to_text_as_regex(highlight_value)
highlight_value_search = re.search(highlight_value_regex, page_text)
highlight_value_search_text = None
if highlight_value_search is not None:
highlight_value_search_text = highlight_value_search.group()
drilldown_data = {"DocumentId": self.doc_id,
"page_index": page_index,
"data_point": data_point,
"value": highlight_value,
"matching_val_area": []}
if highlight_value_search_text is not None:
content = {
"data_point": data_point,
"data_value": highlight_value
}
matching_val_area = pdf_util.highlight_matching_data(
page=page,
text_block=highlight_value_search_text,
content=content,
title=data_point,
only_hightlight_first=False,
merge_nearby_lines=False
)
bbox_list = []
for area in matching_val_area:
bbox = [area.x0, area.y0, area.x1, area.y1]
bbox_list.append(bbox)
# order bbox_list by y0, x0, y1, x1
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
drilldown_data["matching_val_area"] = bbox_list
return drilldown_data
def get_pdf_doc(self, pdf_file):
pdf_doc = fitz.open(pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
return pdf_doc
def save_annotated_pdf(self, pdf_doc: fitz.Document):
try:
if pdf_doc is None and pdf_doc.is_closed:
return
pdf_file_name = os.path.basename(self.pdf_file)
pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf")
output_pdf_dir = os.path.join(self.drilldown_folder, "pdf/")
os.makedirs(output_pdf_dir, exist_ok=True)
pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name)
output_buffer = BytesIO()
pdf_doc.save(output_buffer)
# Save the output buffer to the output file
with open(pdf_file_path, mode="wb") as f:
f.write(output_buffer.getbuffer())
pdf_doc.close()
logger.info(f"File saved to {pdf_file_path}")
return pdf_file_path
except Exception as e:
print_exc()
logger.error(f"Error when save output file: {e}")
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
if not re_run:
output_data_json_folder = os.path.join(
@ -988,12 +1122,121 @@ if __name__ == "__main__":
"539794746"
]
special_doc_id_list = check_db_mapping_doc_id_list
# special_doc_id_list = []
special_doc_id_list = ["532500349",
"535324239",
"532442891",
"543243650",
"528588598",
"532437639",
"527525440",
"534987291",
"534112055",
"533482585",
"544208174",
"534547266",
"544713166",
"526463547",
"534535569",
"534106067",
"532486560",
"532781760",
"533727067",
"527256381",
"533392425",
"532179676",
"534300608",
"539233950",
"532438414",
"533681744",
"537654645",
"533594905",
"537926443",
"533499655",
"533862814",
"544918611",
"539087870",
"536343790",
"479742284",
"501380497",
"501380553",
"501380775",
"501380801",
"501600428",
"501600429",
"501600541",
"501600549",
"503659548",
"506326520",
"507720522",
"507928179",
"508981020",
"509133771",
"509743502",
"514636951",
"514636952",
"514636953",
"514636954",
"514636955",
"514636957",
"514636958",
"514636959",
"514636985",
"514636988",
"514636990",
"514636993",
"514636994",
"539794746",
"292989214",
"316237292",
"321733631",
"323390570",
"327956364",
"332223498",
"333207452",
"334718372",
"344636875",
"362246081",
"366179419",
"380945052",
"382366116",
"387202452",
"389171486",
"391456740",
"391736837",
"394778487",
"401684600",
"402113224",
"402181770",
"402397014",
"405803396",
"445102363",
"445256897",
"448265376",
"449555622",
"449623976",
"458291624",
"458359181",
"463081566",
"469138353",
"471641628",
"476492237",
"478585901",
"478586066",
"479042264",
"479042269",
"479793787",
"481475385",
"483617247",
"486378555",
"486383912",
"492121213",
"497497599",
"502693599"]
output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/"
output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/"
re_run_extract_data = False
re_run_mapping_data = False
force_save_total_data = True
force_save_total_data = False
calculate_metrics = False
extract_ways = ["text"]

View File

@ -65,8 +65,9 @@ def add_slash_to_text_as_regex(text: str):
continue
replace = r"\{0}".format(special_iter.group())
if replace not in text:
text = re.sub(replace, replace, text)
text = re.sub(r"\s+", r"\\s+", text)
text = re.sub(replace, r"\\W", text)
text = re.sub(r"( ){2,}", " ", text)
text = text.replace(" ", r"\s*")
return text

View File

@ -9,6 +9,7 @@ import json
from traceback import print_exc
from tqdm import tqdm
import base64
from copy import deepcopy
from utils.similarity import Similarity
from utils.logger import logger
@ -276,9 +277,20 @@ class PDFUtil:
title: str = "",
only_hightlight_first: bool = False,
exact_match: bool = False,
merge_nearby_lines: bool = False,
):
"""
Highlight matching values
page: page object in fitz.Document
within_bbox: bounding box to search for the text
text_block: text to search for in page text
highlight_text_inside_block: text to highlight inside parameter: text_block
content: content as JSON format to add to the highlight annotation,
please customize according to relevant business logic
title: title of the highlight annotation
only_hightlight_first: only highlight the first match
exact_match: exact match or not
merge_nearby_lines: merge nearby lines or not
"""
# logger.info(f"Highlighting matching values in {self.pdf_file}")
if within_bbox is not None:
@ -295,6 +307,8 @@ class PDFUtil:
matching_val_area = page.search_for(text_block)
else:
matching_val_area = page.search_for(text_block)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.strip())
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''))
if len(matching_val_area) == 0:
@ -304,7 +318,9 @@ class PDFUtil:
and len(highlight_text_inside_block) > 0
):
highlight_bbox_list = []
for area in matching_val_area:
merged_matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines)
pure_number_regex = re.compile(r"^\d+$")
for area in merged_matching_val_area:
text_bbox_area = page.search_for(
highlight_text_inside_block,
clip=[area.x0, area.y0, area.x1, area.y1],
@ -313,15 +329,43 @@ class PDFUtil:
if only_hightlight_first:
highlight_bbox_list.append(text_bbox_area[0])
break
else:
pure_number_match = pure_number_regex.match(highlight_text_inside_block)
if pure_number_match is not None and pure_number_match.group() == highlight_text_inside_block:
for text_bbox in text_bbox_area:
# get text by text_bbox
copy_text_bbox = deepcopy(text_bbox)
copy_text_bbox.x0 -= 10
copy_text_bbox.x1 += 10
text = page.get_text("text", clip=copy_text_bbox).strip()
if text == highlight_text_inside_block:
highlight_bbox_list.append(text_bbox)
else:
# get start and end index of the highlight_text_inside_block in text
start_index = text.find(highlight_text_inside_block)
if start_index > 0:
previous_char = text[start_index - 1]
if previous_char not in [" ", "("]:
continue
end_index = start_index + len(highlight_text_inside_block)
if end_index < len(text):
next_char = text[end_index]
if next_char not in [" ", "%", ")"]:
continue
highlight_bbox_list.append(text_bbox)
else:
highlight_bbox_list.extend(text_bbox_area)
if len(highlight_bbox_list) == 0 and len(highlight_text_inside_block.strip().split()) > 2:
highlight_bbox_list = text_bbox_area = page.search_for(
highlight_text_inside_block
)
matching_val_area = highlight_bbox_list
else:
if only_hightlight_first:
matching_val_area = [matching_val_area[0]]
if matching_val_area is not None and len(matching_val_area) > 0:
matching_val_area = self.merge_matching_val_area(matching_val_area)
matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines)
if exact_match:
matching_val_area = self.get_exact_match_area(page, matching_val_area, text_block)
# matching_val_area = self.merge_matching_val_area(matching_val_area)
@ -329,6 +373,11 @@ class PDFUtil:
highlight = page.add_highlight_annot([area])
bbox_list = [area.x0, area.y0, area.x1, area.y1]
content["bbox"] = bbox_list
normalized_bbox = self.get_bbox_normalized(page, [bbox_list])
if len(normalized_bbox) > 0:
content["normalized_bbox"] = normalized_bbox[0]
else:
content["normalized_bbox"] = []
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
@ -358,7 +407,7 @@ class PDFUtil:
pass
return results
def merge_matching_val_area(self, matching_val_area):
def merge_matching_val_area(self, matching_val_area, merge_nearby_lines=False):
"""
Merge the matching val areas which with same y0 and y1,
the x0 is the min x0, x1 is the max x1
@ -401,6 +450,91 @@ class PDFUtil:
min_x0 = min(x0_list)
max_x1 = max(x1_list)
new_matching_val_area.append(fitz.Rect(min_x0, y0, max_x1, y1))
if merge_nearby_lines and len(new_matching_val_area) > 1:
new_matching_val_area = self.merge_nearby_lines(new_matching_val_area)
# merge again
if len(new_matching_val_area) > 1:
new_matching_val_area = self.merge_nearby_lines(new_matching_val_area)
elif len(new_matching_val_area) > 1:
new_matching_val_area = self.remove_small_pitches(new_matching_val_area)
else:
pass
return new_matching_val_area
def remove_small_pitches(self, matching_val_area):
x_mini_threshold = 5
new_matching_val_area = []
for area in matching_val_area:
if area.x1 - area.x0 > x_mini_threshold:
new_matching_val_area.append(area)
return new_matching_val_area
def merge_nearby_lines(self, matching_val_area):
bbox_list = []
for bbox in matching_val_area:
bbox = [bbox.x0, bbox.y0, bbox.x1, bbox.y1]
bbox_list.append(bbox)
# order bbox_list by y0, x0, y1, x1
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
new_matching_val_area = []
last_x0 = None
last_x1 = None
last_y0 = None
last_y1 = None
x_mini_threshold = 5
y_threshold = 15
x_threshold = 10
for index, bbox in enumerate(bbox_list):
if bbox[2] - bbox[0] <= x_mini_threshold:
continue
if index == 0 or last_x0 is None:
last_x0 = bbox[0]
last_y0 = bbox[1]
last_x1 = bbox[2]
last_y1 = bbox[3]
continue
x0 = bbox[0]
y0 = bbox[1]
x1 = bbox[2]
y1 = bbox[3]
last_x0_x1_range = [i for i in range(int(last_x0), int(last_x1))]
x0_x1_range = [i for i in range(int(x0), int(x1))]
x_intersection = list(set(last_x0_x1_range).intersection(set(x0_x1_range)))
# abs(y0 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)
# exist nearby line as vertical direction,
# the horizontal coordinates are intersected or the horizontal coordinates are close to each other
# abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold
# exist nearby line as horizontal direction,
# last sentence is the begin of the current sentence
# abs(y1 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)
# last sentence and current sentence are in the same horizontal line
# the horizontal coordinates of are last sentence and current sentence intersected
# or the horizontal coordinates are close to each other
if (abs(y0 - last_y1) <= y_threshold and
(len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)) or \
(abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold) or \
(abs(y1 - last_y1) <= y_threshold and
(len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)):
last_x0 = min(last_x0, x0)
last_x1 = max(last_x1, x1)
last_y0 = min(last_y0, y0)
last_y1 = max(last_y1, y1)
else:
new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1))
last_x0 = x0
last_x1 = x1
last_y0 = y0
last_y1 = y1
new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1))
return new_matching_val_area
def highlight_matching_paragraph_text(