integrate pdf drilldown logic to pdf_util.py
This commit is contained in:
parent
c34e2e960e
commit
c6c3e99d3e
114
main.py
114
main.py
|
|
@ -137,8 +137,7 @@ class EMEA_AR_Parsing:
|
||||||
def drilldown_pdf_document(self, data_from_gpt: list) -> list:
|
def drilldown_pdf_document(self, data_from_gpt: list) -> list:
|
||||||
logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}")
|
logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}")
|
||||||
pdf_util = PDFUtil(self.pdf_file)
|
pdf_util = PDFUtil(self.pdf_file)
|
||||||
pdf_doc = self.get_pdf_doc(self.pdf_file)
|
drilldown_data_list = []
|
||||||
highlight_annotation = False
|
|
||||||
for data in data_from_gpt:
|
for data in data_from_gpt:
|
||||||
page_index = data.get("page_index", -1)
|
page_index = data.get("page_index", -1)
|
||||||
if page_index == -1:
|
if page_index == -1:
|
||||||
|
|
@ -152,104 +151,32 @@ class EMEA_AR_Parsing:
|
||||||
continue
|
continue
|
||||||
if data_point in ["ter", "ogc", "performance_fee"]:
|
if data_point in ["ter", "ogc", "performance_fee"]:
|
||||||
continue
|
continue
|
||||||
drilldown_data = self.highlight_pdf_doc(pdf_doc=pdf_doc,
|
drilldown_data = {
|
||||||
page_index=page_index,
|
"page_index": page_index,
|
||||||
highlight_value=value,
|
"data_point": data_point,
|
||||||
data_point=data_point,
|
"parent_text_block": None,
|
||||||
pdf_util=pdf_util)
|
"value": value,
|
||||||
if len(drilldown_data.get("matching_val_area", [])) > 0:
|
"annotation_attribute": {}
|
||||||
highlight_annotation = True
|
}
|
||||||
|
drilldown_data_list.append(drilldown_data)
|
||||||
highlighted_value_list.append(value)
|
highlighted_value_list.append(value)
|
||||||
|
|
||||||
for data_point, reported_name in dp_reported_name_dict.items():
|
for data_point, reported_name in dp_reported_name_dict.items():
|
||||||
if reported_name in highlighted_value_list:
|
if reported_name in highlighted_value_list:
|
||||||
continue
|
continue
|
||||||
data_point = f"{data_point}_reported_name"
|
data_point = f"{data_point}_reported_name"
|
||||||
drilldown_data = self.highlight_pdf_doc(pdf_doc=pdf_doc,
|
drilldown_data = {
|
||||||
page_index=page_index,
|
"page_index": page_index,
|
||||||
highlight_value=reported_name,
|
"data_point": data_point,
|
||||||
data_point=data_point,
|
"parent_text_block": None,
|
||||||
pdf_util=pdf_util)
|
"value": reported_name,
|
||||||
if len(drilldown_data.get("matching_val_area", [])) > 0:
|
"annotation_attribute": {}
|
||||||
highlight_annotation = True
|
}
|
||||||
|
drilldown_data_list.append(drilldown_data)
|
||||||
highlighted_value_list.append(reported_name)
|
highlighted_value_list.append(reported_name)
|
||||||
if highlight_annotation:
|
|
||||||
annotated_pdf_file = self.save_annotated_pdf(pdf_doc)
|
drilldown_result = pdf_util.batch_drilldown(drilldown_data_list=drilldown_data_list,
|
||||||
return annotated_pdf_file
|
output_pdf_folder=self.drilldown_folder)
|
||||||
|
|
||||||
def highlight_pdf_doc(self,
|
|
||||||
pdf_doc: fitz.Document,
|
|
||||||
page_index: int,
|
|
||||||
highlight_value: str,
|
|
||||||
data_point: str = None,
|
|
||||||
pdf_util: PDFUtil = None,):
|
|
||||||
page = pdf_doc[page_index]
|
|
||||||
page_text = page.get_text()
|
|
||||||
highlight_value = str(highlight_value)
|
|
||||||
highlight_value_regex = add_slash_to_text_as_regex(highlight_value)
|
|
||||||
highlight_value_search = re.search(highlight_value_regex, page_text)
|
|
||||||
highlight_value_search_text = None
|
|
||||||
if highlight_value_search is not None:
|
|
||||||
highlight_value_search_text = highlight_value_search.group()
|
|
||||||
drilldown_data = {"DocumentId": self.doc_id,
|
|
||||||
"page_index": page_index,
|
|
||||||
"data_point": data_point,
|
|
||||||
"value": highlight_value,
|
|
||||||
"matching_val_area": []}
|
|
||||||
if highlight_value_search_text is not None:
|
|
||||||
content = {
|
|
||||||
"data_point": data_point,
|
|
||||||
"data_value": highlight_value
|
|
||||||
}
|
|
||||||
matching_val_area = pdf_util.highlight_matching_data(
|
|
||||||
page=page,
|
|
||||||
text_block=highlight_value_search_text,
|
|
||||||
content=content,
|
|
||||||
title=data_point,
|
|
||||||
only_hightlight_first=False,
|
|
||||||
merge_nearby_lines=False
|
|
||||||
)
|
|
||||||
|
|
||||||
bbox_list = []
|
|
||||||
for area in matching_val_area:
|
|
||||||
bbox = [area.x0, area.y0, area.x1, area.y1]
|
|
||||||
bbox_list.append(bbox)
|
|
||||||
# order bbox_list by y0, x0, y1, x1
|
|
||||||
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
|
|
||||||
drilldown_data["matching_val_area"] = bbox_list
|
|
||||||
return drilldown_data
|
|
||||||
|
|
||||||
def get_pdf_doc(self, pdf_file):
|
|
||||||
pdf_doc = fitz.open(pdf_file)
|
|
||||||
try:
|
|
||||||
pdf_encrypted = pdf_doc.isEncrypted
|
|
||||||
except:
|
|
||||||
pdf_encrypted = pdf_doc.is_encrypted
|
|
||||||
if pdf_encrypted:
|
|
||||||
pdf_doc.authenticate("")
|
|
||||||
return pdf_doc
|
|
||||||
|
|
||||||
def save_annotated_pdf(self, pdf_doc: fitz.Document):
|
|
||||||
try:
|
|
||||||
if pdf_doc is None and pdf_doc.is_closed:
|
|
||||||
return
|
|
||||||
pdf_file_name = os.path.basename(self.pdf_file)
|
|
||||||
pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf")
|
|
||||||
output_pdf_dir = os.path.join(self.drilldown_folder, "pdf/")
|
|
||||||
os.makedirs(output_pdf_dir, exist_ok=True)
|
|
||||||
pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name)
|
|
||||||
output_buffer = BytesIO()
|
|
||||||
pdf_doc.save(output_buffer)
|
|
||||||
|
|
||||||
# Save the output buffer to the output file
|
|
||||||
with open(pdf_file_path, mode="wb") as f:
|
|
||||||
f.write(output_buffer.getbuffer())
|
|
||||||
pdf_doc.close()
|
|
||||||
logger.info(f"File saved to {pdf_file_path}")
|
|
||||||
return pdf_file_path
|
|
||||||
except Exception as e:
|
|
||||||
print_exc()
|
|
||||||
logger.error(f"Error when save output file: {e}")
|
|
||||||
|
|
||||||
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
|
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
|
||||||
if not re_run:
|
if not re_run:
|
||||||
|
|
@ -1183,6 +1110,7 @@ if __name__ == "__main__":
|
||||||
"546046730",
|
"546046730",
|
||||||
"546919329"
|
"546919329"
|
||||||
]
|
]
|
||||||
|
special_doc_id_list = ["501380497"]
|
||||||
output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/"
|
output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/"
|
||||||
output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/"
|
output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/"
|
||||||
re_run_extract_data = False
|
re_run_extract_data = False
|
||||||
|
|
|
||||||
|
|
@ -266,6 +266,170 @@ class PDFUtil:
|
||||||
content_text = json.dumps(content)
|
content_text = json.dumps(content)
|
||||||
highlight.set_info(content=content_text, title=title)
|
highlight.set_info(content=content_text, title=title)
|
||||||
highlight.update()
|
highlight.update()
|
||||||
|
|
||||||
|
def batch_drilldown(self,
|
||||||
|
drilldown_data_list: list,
|
||||||
|
output_pdf_folder: str = None):
|
||||||
|
pdf_doc = fitz.open(self.pdf_file)
|
||||||
|
annotation_list = []
|
||||||
|
for drilldown_data in drilldown_data_list:
|
||||||
|
page_index = drilldown_data["page_index"]
|
||||||
|
data_point = drilldown_data["data_point"]
|
||||||
|
if isinstance(data_point, list):
|
||||||
|
data_point = ", ".join(data_point)
|
||||||
|
parent_text_block = drilldown_data.get("parent_text_block", None)
|
||||||
|
highlight_value = drilldown_data["value"]
|
||||||
|
annotation_attribute = drilldown_data.get("annotation_attribute", {})
|
||||||
|
if isinstance(highlight_value, str):
|
||||||
|
annotation_list.append(self.highlight_pdf_doc(
|
||||||
|
pdf_doc=pdf_doc,
|
||||||
|
page_index=page_index,
|
||||||
|
highlight_value=highlight_value,
|
||||||
|
parent_text_block=parent_text_block,
|
||||||
|
data_point=data_point,
|
||||||
|
annotation_attribute=annotation_attribute
|
||||||
|
))
|
||||||
|
elif isinstance(highlight_value, list):
|
||||||
|
for value in highlight_value:
|
||||||
|
annotation_list.append(self.highlight_pdf_doc(
|
||||||
|
pdf_doc=pdf_doc,
|
||||||
|
page_index=page_index,
|
||||||
|
highlight_value=value,
|
||||||
|
parent_text_block=parent_text_block,
|
||||||
|
data_point=data_point,
|
||||||
|
annotation_attribute=annotation_attribute
|
||||||
|
))
|
||||||
|
elif isinstance(highlight_value, dict):
|
||||||
|
for key, value in highlight_value.items():
|
||||||
|
annotation_list.append(self.highlight_pdf_doc(
|
||||||
|
pdf_doc=pdf_doc,
|
||||||
|
page_index=page_index,
|
||||||
|
highlight_value=value,
|
||||||
|
parent_text_block=parent_text_block,
|
||||||
|
data_point=f"{data_point}, {key}",
|
||||||
|
annotation_attribute=annotation_attribute
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
if output_pdf_folder is not None and len(output_pdf_folder) > 0:
|
||||||
|
os.makedirs(output_pdf_folder, exist_ok=True)
|
||||||
|
pdf_file_path = self.save_annotated_pdf(pdf_doc=pdf_doc,
|
||||||
|
output_pdf_folder=output_pdf_folder)
|
||||||
|
result = {"drilldown_pdf_doc": pdf_doc,
|
||||||
|
"annotation_list": annotation_list}
|
||||||
|
return result
|
||||||
|
|
||||||
|
def save_annotated_pdf(self, pdf_doc: fitz.Document, output_pdf_folder: str):
|
||||||
|
try:
|
||||||
|
if output_pdf_folder is None or len(output_pdf_folder) == 0 or not os.path.exists(output_pdf_folder):
|
||||||
|
return
|
||||||
|
if pdf_doc is None and pdf_doc.is_closed:
|
||||||
|
return
|
||||||
|
pdf_file_name = os.path.basename(self.pdf_file)
|
||||||
|
pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf")
|
||||||
|
output_pdf_dir = os.path.join(output_pdf_folder, "pdf/")
|
||||||
|
os.makedirs(output_pdf_dir, exist_ok=True)
|
||||||
|
pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name)
|
||||||
|
output_buffer = BytesIO()
|
||||||
|
pdf_doc.save(output_buffer)
|
||||||
|
|
||||||
|
# Save the output buffer to the output file
|
||||||
|
with open(pdf_file_path, mode="wb") as f:
|
||||||
|
f.write(output_buffer.getbuffer())
|
||||||
|
pdf_doc.close()
|
||||||
|
logger.info(f"File saved to {pdf_file_path}")
|
||||||
|
return pdf_file_path
|
||||||
|
except Exception as e:
|
||||||
|
print_exc()
|
||||||
|
logger.error(f"Error when save output file: {e}")
|
||||||
|
|
||||||
|
def highlight_pdf_doc(self,
|
||||||
|
pdf_doc: fitz.Document,
|
||||||
|
page_index: int,
|
||||||
|
highlight_value: str,
|
||||||
|
parent_text_block: str = None,
|
||||||
|
data_point: str = None,
|
||||||
|
annotation_attribute: dict = {}):
|
||||||
|
page = pdf_doc[page_index]
|
||||||
|
page_text = page.get_text()
|
||||||
|
|
||||||
|
parent_text_block_search_text = None
|
||||||
|
if parent_text_block is not None:
|
||||||
|
parent_text_block_regex = self.add_slash_to_text_as_regex(parent_text_block)
|
||||||
|
parent_text_block_search = re.search(parent_text_block_regex, page_text)
|
||||||
|
parent_text_block_search_text = None
|
||||||
|
if parent_text_block_search is not None:
|
||||||
|
parent_text_block_search_text = parent_text_block_search.group()
|
||||||
|
|
||||||
|
highlight_value_search_text = ""
|
||||||
|
if highlight_value is not None:
|
||||||
|
highlight_value_regex = self.add_slash_to_text_as_regex(highlight_value)
|
||||||
|
if len(highlight_value.strip().split()) == 1 and len(highlight_value.strip()) < 3:
|
||||||
|
highlight_value_search = re.search(highlight_value_regex, page_text)
|
||||||
|
else:
|
||||||
|
highlight_value_search = re.search(highlight_value_regex, page_text, re.IGNORECASE)
|
||||||
|
if highlight_value_search is not None:
|
||||||
|
highlight_value_search_text = highlight_value_search.group()
|
||||||
|
|
||||||
|
annotation_data = {"pdf_file": self.simple_pdf_file,
|
||||||
|
"page_index": page_index,
|
||||||
|
"data_point": data_point,
|
||||||
|
"value": highlight_value,
|
||||||
|
"matching_val_area": []}
|
||||||
|
if highlight_value_search_text is not None:
|
||||||
|
content = {
|
||||||
|
"data_point": data_point,
|
||||||
|
"data_value": highlight_value
|
||||||
|
}
|
||||||
|
# append annotation_attribute to content
|
||||||
|
content.update(annotation_attribute)
|
||||||
|
|
||||||
|
if len(highlight_value_search_text.strip().split()) > 3:
|
||||||
|
merge_nearby_lines = True
|
||||||
|
else:
|
||||||
|
merge_nearby_lines = False
|
||||||
|
if parent_text_block_search_text is not None:
|
||||||
|
matching_val_area = self.highlight_matching_data(
|
||||||
|
page=page,
|
||||||
|
text_block=parent_text_block_search_text,
|
||||||
|
highlight_text_inside_block=highlight_value_search_text,
|
||||||
|
content=content,
|
||||||
|
title=data_point,
|
||||||
|
only_hightlight_first=False,
|
||||||
|
merge_nearby_lines=merge_nearby_lines
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
matching_val_area = self.highlight_matching_data(
|
||||||
|
page=page,
|
||||||
|
text_block=highlight_value_search_text,
|
||||||
|
content=content,
|
||||||
|
title=data_point,
|
||||||
|
only_hightlight_first=False,
|
||||||
|
merge_nearby_lines=merge_nearby_lines
|
||||||
|
)
|
||||||
|
|
||||||
|
bbox_list = []
|
||||||
|
for area in matching_val_area:
|
||||||
|
bbox = [area.x0, area.y0, area.x1, area.y1]
|
||||||
|
bbox_list.append(bbox)
|
||||||
|
# order bbox_list by y0, x0, y1, x1
|
||||||
|
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
|
||||||
|
annotation_data["matching_val_area"] = bbox_list
|
||||||
|
return annotation_data
|
||||||
|
|
||||||
|
def add_slash_to_text_as_regex(self, text: str):
|
||||||
|
if text is None or len(text) == 0:
|
||||||
|
return text
|
||||||
|
special_char_iter = re.finditer("\W", text)
|
||||||
|
for special_iter in special_char_iter:
|
||||||
|
if len(special_iter.group().strip()) == 0:
|
||||||
|
continue
|
||||||
|
replace = r"\{0}".format(special_iter.group())
|
||||||
|
if replace not in text:
|
||||||
|
text = re.sub(replace, r"\\W", text)
|
||||||
|
text = re.sub(r"( ){2,}", " ", text)
|
||||||
|
text = text.replace(" ", r"\s*\W*")
|
||||||
|
return text
|
||||||
|
|
||||||
def highlight_matching_data(
|
def highlight_matching_data(
|
||||||
self,
|
self,
|
||||||
|
|
@ -293,6 +457,8 @@ class PDFUtil:
|
||||||
merge_nearby_lines: merge nearby lines or not
|
merge_nearby_lines: merge nearby lines or not
|
||||||
"""
|
"""
|
||||||
# logger.info(f"Highlighting matching values in {self.pdf_file}")
|
# logger.info(f"Highlighting matching values in {self.pdf_file}")
|
||||||
|
if text_block is None or len(text_block.strip()) == 0:
|
||||||
|
return []
|
||||||
if within_bbox is not None:
|
if within_bbox is not None:
|
||||||
matching_val_area = page.search_for(
|
matching_val_area = page.search_for(
|
||||||
text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])
|
text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])
|
||||||
|
|
@ -307,6 +473,7 @@ class PDFUtil:
|
||||||
matching_val_area = page.search_for(text_block)
|
matching_val_area = page.search_for(text_block)
|
||||||
else:
|
else:
|
||||||
matching_val_area = page.search_for(text_block)
|
matching_val_area = page.search_for(text_block)
|
||||||
|
|
||||||
if len(matching_val_area) == 0:
|
if len(matching_val_area) == 0:
|
||||||
matching_val_area = page.search_for(text_block.strip())
|
matching_val_area = page.search_for(text_block.strip())
|
||||||
if len(matching_val_area) == 0:
|
if len(matching_val_area) == 0:
|
||||||
|
|
@ -318,15 +485,20 @@ class PDFUtil:
|
||||||
for area in matching_val_area:
|
for area in matching_val_area:
|
||||||
# get text by text_bbox
|
# get text by text_bbox
|
||||||
pure_text_block = text_block.strip()
|
pure_text_block = text_block.strip()
|
||||||
|
raw_area_text = page.get_text("text", clip=area).strip()
|
||||||
|
if len(text_block.strip()) < 3 and text_block.strip() != raw_area_text.strip():
|
||||||
|
continue
|
||||||
copy_area = deepcopy(area)
|
copy_area = deepcopy(area)
|
||||||
copy_area.x0 -= 10
|
copy_area.x0 -= 15
|
||||||
copy_area.x1 += 10
|
copy_area.x1 += 15
|
||||||
text = page.get_text("text", clip=copy_area).strip()
|
text = page.get_text("text", clip=copy_area).strip()
|
||||||
if text == pure_text_block:
|
if text == pure_text_block:
|
||||||
new_matching_val_area.append(area)
|
new_matching_val_area.append(area)
|
||||||
else:
|
else:
|
||||||
# get start and end index of the pure_text_block in text
|
# get start and end index of the pure_text_block in text
|
||||||
start_index = text.find(pure_text_block)
|
start_index = text.find(pure_text_block)
|
||||||
|
if start_index == -1:
|
||||||
|
continue
|
||||||
if start_index > 0:
|
if start_index > 0:
|
||||||
previous_char = text[start_index - 1]
|
previous_char = text[start_index - 1]
|
||||||
if previous_char not in [" ", "("]:
|
if previous_char not in [" ", "("]:
|
||||||
|
|
@ -361,8 +533,8 @@ class PDFUtil:
|
||||||
for area in text_bbox_area:
|
for area in text_bbox_area:
|
||||||
# get text by text_bbox
|
# get text by text_bbox
|
||||||
copy_area = deepcopy(area)
|
copy_area = deepcopy(area)
|
||||||
copy_area.x0 -= 10
|
copy_area.x0 -= 15
|
||||||
copy_area.x1 += 10
|
copy_area.x1 += 15
|
||||||
text = page.get_text("text", clip=copy_area).strip()
|
text = page.get_text("text", clip=copy_area).strip()
|
||||||
if text == highlight_text_inside_block:
|
if text == highlight_text_inside_block:
|
||||||
highlight_bbox_list.append(area)
|
highlight_bbox_list.append(area)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue