2024-08-19 14:52:13 +00:00
|
|
|
# Import Libraries
|
|
|
|
|
from typing import Tuple
|
|
|
|
|
from io import BytesIO
|
|
|
|
|
import os
|
|
|
|
|
import argparse
|
|
|
|
|
import re
|
|
|
|
|
import fitz
|
|
|
|
|
import json
|
|
|
|
|
from traceback import print_exc
|
|
|
|
|
from tqdm import tqdm
|
2024-08-26 16:19:07 +00:00
|
|
|
import base64
|
2024-11-08 17:22:35 +00:00
|
|
|
from copy import deepcopy
|
2024-08-19 14:52:13 +00:00
|
|
|
from utils.similarity import Similarity
|
|
|
|
|
|
|
|
|
|
from utils.logger import logger
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PDFUtil:
|
|
|
|
|
def __init__(self, pdf_file: str) -> None:
|
|
|
|
|
self.pdf_file = pdf_file
|
|
|
|
|
self.simple_pdf_file = os.path.basename(self.pdf_file)
|
|
|
|
|
self.is_valid_path()
|
|
|
|
|
self.similarity = Similarity()
|
|
|
|
|
|
|
|
|
|
def is_valid_path(self):
|
|
|
|
|
"""
|
|
|
|
|
Validates the path inputted and checks whether it is a file path or a folder path
|
|
|
|
|
"""
|
|
|
|
|
if not self.pdf_file:
|
|
|
|
|
raise ValueError(f"Invalid Path")
|
|
|
|
|
if os.path.isfile(self.pdf_file) and self.pdf_file.endswith(".pdf"):
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Invalid Path {self.pdf_file}, please input the correct pdf file path."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def extract_info(self) -> Tuple[bool, dict]:
|
|
|
|
|
"""
|
|
|
|
|
Extracts file info
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f"Extracting file info from {self.pdf_file}")
|
|
|
|
|
# Open the PDF
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
try:
|
|
|
|
|
pdf_encrypted = pdf_doc.isEncrypted
|
|
|
|
|
except:
|
|
|
|
|
pdf_encrypted = pdf_doc.is_encrypted
|
|
|
|
|
output = {
|
|
|
|
|
"File": self.pdf_file,
|
|
|
|
|
"Encrypted": ("True" if pdf_encrypted else "False"),
|
|
|
|
|
}
|
|
|
|
|
# If PDF is encrypted the file metadata cannot be extracted
|
|
|
|
|
if not pdf_encrypted:
|
|
|
|
|
for key, value in pdf_doc.metadata.items():
|
|
|
|
|
output[key] = value
|
|
|
|
|
# To Display File Info
|
|
|
|
|
logger.info(
|
|
|
|
|
"## File Information ##################################################"
|
|
|
|
|
)
|
|
|
|
|
logger.info("\n".join("{}:{}".format(i, j) for i, j in output.items()))
|
|
|
|
|
logger.info(
|
|
|
|
|
"######################################################################"
|
|
|
|
|
)
|
|
|
|
|
pdf_doc.close()
|
|
|
|
|
return True, output
|
|
|
|
|
|
2024-08-19 20:49:45 +00:00
|
|
|
def extract_text(self, output_folder: str = None) -> Tuple[bool, str, dict]:
|
2024-08-19 14:52:13 +00:00
|
|
|
"""
|
|
|
|
|
Extracts text from PDF
|
|
|
|
|
"""
|
|
|
|
|
# Extract text
|
|
|
|
|
try:
|
|
|
|
|
logger.info(f"Extracting text from {self.pdf_file}")
|
|
|
|
|
text = ""
|
|
|
|
|
page_text_dict = {}
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
try:
|
|
|
|
|
pdf_encrypted = pdf_doc.isEncrypted
|
|
|
|
|
except:
|
|
|
|
|
pdf_encrypted = pdf_doc.is_encrypted
|
|
|
|
|
if pdf_encrypted:
|
|
|
|
|
pdf_doc.authenticate("")
|
|
|
|
|
for page in pdf_doc:
|
|
|
|
|
page_text = page.get_text()
|
|
|
|
|
text += page_text + "\n"
|
|
|
|
|
page_text_dict[page.number] = page_text
|
|
|
|
|
# To Display Extracted Text
|
|
|
|
|
# logger.info(
|
|
|
|
|
# "## Extracted Text ####################################################"
|
|
|
|
|
# )
|
|
|
|
|
# logger.info(text)
|
|
|
|
|
# logger.info(
|
|
|
|
|
# "######################################################################"
|
|
|
|
|
# )
|
|
|
|
|
# Save to file
|
2024-08-19 20:49:45 +00:00
|
|
|
if output_folder:
|
|
|
|
|
txt_output_folder = os.path.join(output_folder, 'pdf_text/')
|
|
|
|
|
os.makedirs(txt_output_folder, exist_ok=True)
|
|
|
|
|
txt_file = os.path.join(txt_output_folder, self.simple_pdf_file.replace(".pdf", ".txt"))
|
|
|
|
|
with open(txt_file, "w", encoding="utf-8") as file:
|
2024-08-19 14:52:13 +00:00
|
|
|
file.write(text.strip())
|
2024-08-19 20:49:45 +00:00
|
|
|
|
|
|
|
|
json_output_folder = os.path.join(output_folder, 'pdf_json/')
|
|
|
|
|
os.makedirs(json_output_folder, exist_ok=True)
|
|
|
|
|
json_file = os.path.join(json_output_folder, self.simple_pdf_file.replace(".pdf", ".json"))
|
|
|
|
|
with open(json_file, "w", encoding="utf-8") as file:
|
|
|
|
|
json.dump(page_text_dict, file, indent=4)
|
2024-08-19 14:52:13 +00:00
|
|
|
pdf_doc.close()
|
|
|
|
|
return True, text, page_text_dict
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error extracting text: {e}")
|
|
|
|
|
print_exc()
|
|
|
|
|
return False, str(e), {}
|
2024-08-26 16:19:07 +00:00
|
|
|
|
|
|
|
|
def extract_images(self,
|
|
|
|
|
zoom:float = 2.0,
|
|
|
|
|
pdf_page_index_list: list = None,
|
|
|
|
|
output_folder: str = None):
|
|
|
|
|
try:
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
try:
|
|
|
|
|
pdf_encrypted = pdf_doc.isEncrypted
|
|
|
|
|
except:
|
|
|
|
|
pdf_encrypted = pdf_doc.is_encrypted
|
|
|
|
|
if pdf_encrypted:
|
|
|
|
|
pdf_doc.authenticate("")
|
|
|
|
|
if pdf_page_index_list is None or len(pdf_page_index_list) == 0:
|
|
|
|
|
pdf_page_index_list = range(pdf_doc.page_count)
|
|
|
|
|
pdf_base_name = os.path.basename(self.pdf_file).replace(".pdf", "")
|
|
|
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
|
|
|
output_data = {}
|
|
|
|
|
for page_num in tqdm(pdf_page_index_list, disable=False):
|
|
|
|
|
page = pdf_doc[page_num]
|
|
|
|
|
pix = page.get_pixmap(matrix=mat)
|
|
|
|
|
img_buffer = pix.tobytes(output='png')
|
|
|
|
|
output_data[page_num] = {}
|
|
|
|
|
img_base64 = base64.b64encode(img_buffer).decode('utf-8')
|
|
|
|
|
if output_folder and len(output_folder) > 0:
|
|
|
|
|
os.makedirs(output_folder, exist_ok=True)
|
|
|
|
|
image_file = os.path.join(output_folder, f"{pdf_base_name}_{page_num}.png")
|
|
|
|
|
pix.save(image_file)
|
|
|
|
|
output_data[page_num]["img_file"] = image_file
|
|
|
|
|
output_data[page_num]["img_base64"] = img_base64
|
|
|
|
|
return output_data
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error extracting images: {e}")
|
|
|
|
|
print_exc()
|
|
|
|
|
return {}
|
2024-09-19 21:29:26 +00:00
|
|
|
|
|
|
|
|
def extract_image_from_page(self,
|
|
|
|
|
page_index: int,
|
|
|
|
|
zoom:float = 2.0,
|
|
|
|
|
output_folder: str = None):
|
|
|
|
|
try:
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
try:
|
|
|
|
|
pdf_encrypted = pdf_doc.isEncrypted
|
|
|
|
|
except:
|
|
|
|
|
pdf_encrypted = pdf_doc.is_encrypted
|
|
|
|
|
if pdf_encrypted:
|
|
|
|
|
pdf_doc.authenticate("")
|
|
|
|
|
pdf_base_name = os.path.basename(self.pdf_file).replace(".pdf", "")
|
|
|
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
|
|
|
page = pdf_doc[page_index]
|
|
|
|
|
pix = page.get_pixmap(matrix=mat)
|
|
|
|
|
img_buffer = pix.tobytes(output='png')
|
|
|
|
|
img_base64 = base64.b64encode(img_buffer).decode('utf-8')
|
|
|
|
|
if output_folder and len(output_folder) > 0:
|
|
|
|
|
os.makedirs(output_folder, exist_ok=True)
|
|
|
|
|
image_file = os.path.join(output_folder, f"{pdf_base_name}_{page_index}.png")
|
|
|
|
|
pix.save(image_file)
|
|
|
|
|
pdf_doc.close()
|
|
|
|
|
return img_base64
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error extracting image from page: {e}")
|
|
|
|
|
print_exc()
|
|
|
|
|
return None
|
|
|
|
|
|
2024-08-26 16:19:07 +00:00
|
|
|
|
2024-08-19 14:52:13 +00:00
|
|
|
def parse_blocks_page(self, page: fitz.Page):
|
|
|
|
|
blocks = page.get_text("blocks")
|
|
|
|
|
list_of_blocks = []
|
|
|
|
|
for block in blocks:
|
|
|
|
|
x0, y0, x1, y1, lines_in_the_block, block_no, block_type = block
|
|
|
|
|
list_of_blocks.append(
|
|
|
|
|
{
|
|
|
|
|
"bbox": [x0, y0, x1, y1],
|
|
|
|
|
"lines_in_the_block": lines_in_the_block,
|
|
|
|
|
"block_no": block_no,
|
|
|
|
|
"block_type": block_type,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
return list_of_blocks
|
|
|
|
|
|
|
|
|
|
def parse_all_blocks(self):
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
try:
|
|
|
|
|
pdf_encrypted = pdf_doc.isEncrypted
|
|
|
|
|
except:
|
|
|
|
|
pdf_encrypted = pdf_doc.is_encrypted
|
|
|
|
|
if pdf_encrypted:
|
|
|
|
|
pdf_doc.authenticate("")
|
|
|
|
|
pdf_blocks = {}
|
|
|
|
|
for page_num in tqdm(range(pdf_doc.page_count), disable=False):
|
|
|
|
|
page = pdf_doc[page_num]
|
|
|
|
|
blocks = self.parse_blocks_page(page)
|
|
|
|
|
pdf_blocks[page_num] = blocks
|
|
|
|
|
return pdf_blocks
|
|
|
|
|
|
|
|
|
|
def search_for_text(self, page_text, search_str):
|
|
|
|
|
"""
|
|
|
|
|
Search for the search string within the document lines
|
|
|
|
|
"""
|
|
|
|
|
# Find all matches within one line
|
|
|
|
|
result_iter = re.finditer(search_str, page_text, re.IGNORECASE)
|
|
|
|
|
results = [
|
|
|
|
|
result.group() for result in result_iter if result.group().strip() != ""
|
|
|
|
|
]
|
|
|
|
|
# In case multiple matches within one line
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
def redact_matching_data(self, page, matched_value):
|
|
|
|
|
"""
|
|
|
|
|
Redacts matching values
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f"Redacting matching values in {self.pdf_file}")
|
|
|
|
|
matching_val_area = page.search_for(matched_value)
|
|
|
|
|
# Redact matching values
|
|
|
|
|
[
|
|
|
|
|
page.add_redact_annot(area, text=" ", fill=(0, 0, 0))
|
|
|
|
|
for area in matching_val_area
|
|
|
|
|
]
|
|
|
|
|
# Apply the redaction
|
|
|
|
|
page.apply_redactions()
|
|
|
|
|
return matching_val_area
|
|
|
|
|
|
|
|
|
|
def frame_matching_data(self, page, matched_value):
|
|
|
|
|
"""
|
|
|
|
|
frames matching values
|
|
|
|
|
"""
|
|
|
|
|
matching_val_area = page.search_for(matched_value)
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
if isinstance(area, fitz.fitz.Rect):
|
|
|
|
|
# Draw a rectangle around matched values
|
|
|
|
|
annot = page.add_redact_annot(area)
|
|
|
|
|
# , fill = fitz.utils.getColor('black')
|
|
|
|
|
annot.setColors(stroke=fitz.utils.getColor("red"))
|
|
|
|
|
# If you want to remove matched data
|
|
|
|
|
# page.addFreetextAnnot(area, ' ')
|
|
|
|
|
annot.update()
|
|
|
|
|
return matching_val_area
|
|
|
|
|
|
|
|
|
|
def highlight_rectangle(self,
|
|
|
|
|
pdf_doc: fitz.Document,
|
|
|
|
|
page_index: int,
|
|
|
|
|
bbox: list,
|
|
|
|
|
title: str = "",
|
|
|
|
|
content: dict = {}):
|
|
|
|
|
"""
|
|
|
|
|
Highlight rectangle
|
|
|
|
|
"""
|
|
|
|
|
rectangle = fitz.Rect(bbox[0], bbox[1], bbox[2], bbox[3])
|
|
|
|
|
page = pdf_doc[page_index]
|
|
|
|
|
highlight = page.add_highlight_annot([rectangle])
|
|
|
|
|
content_text = json.dumps(content)
|
|
|
|
|
highlight.set_info(content=content_text, title=title)
|
|
|
|
|
highlight.update()
|
2024-11-11 22:34:25 +00:00
|
|
|
|
|
|
|
|
def batch_drilldown(self,
|
|
|
|
|
drilldown_data_list: list,
|
|
|
|
|
output_pdf_folder: str = None):
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
annotation_list = []
|
|
|
|
|
for drilldown_data in drilldown_data_list:
|
|
|
|
|
page_index = drilldown_data["page_index"]
|
|
|
|
|
data_point = drilldown_data["data_point"]
|
|
|
|
|
if isinstance(data_point, list):
|
|
|
|
|
data_point = ", ".join(data_point)
|
|
|
|
|
parent_text_block = drilldown_data.get("parent_text_block", None)
|
|
|
|
|
highlight_value = drilldown_data["value"]
|
|
|
|
|
annotation_attribute = drilldown_data.get("annotation_attribute", {})
|
|
|
|
|
if isinstance(highlight_value, str):
|
|
|
|
|
annotation_list.append(self.highlight_pdf_doc(
|
|
|
|
|
pdf_doc=pdf_doc,
|
|
|
|
|
page_index=page_index,
|
|
|
|
|
highlight_value=highlight_value,
|
|
|
|
|
parent_text_block=parent_text_block,
|
|
|
|
|
data_point=data_point,
|
|
|
|
|
annotation_attribute=annotation_attribute
|
|
|
|
|
))
|
|
|
|
|
elif isinstance(highlight_value, list):
|
|
|
|
|
for value in highlight_value:
|
|
|
|
|
annotation_list.append(self.highlight_pdf_doc(
|
|
|
|
|
pdf_doc=pdf_doc,
|
|
|
|
|
page_index=page_index,
|
|
|
|
|
highlight_value=value,
|
|
|
|
|
parent_text_block=parent_text_block,
|
|
|
|
|
data_point=data_point,
|
|
|
|
|
annotation_attribute=annotation_attribute
|
|
|
|
|
))
|
|
|
|
|
elif isinstance(highlight_value, dict):
|
|
|
|
|
for key, value in highlight_value.items():
|
|
|
|
|
annotation_list.append(self.highlight_pdf_doc(
|
|
|
|
|
pdf_doc=pdf_doc,
|
|
|
|
|
page_index=page_index,
|
|
|
|
|
highlight_value=value,
|
|
|
|
|
parent_text_block=parent_text_block,
|
|
|
|
|
data_point=f"{data_point}, {key}",
|
|
|
|
|
annotation_attribute=annotation_attribute
|
|
|
|
|
))
|
|
|
|
|
else:
|
2024-11-12 17:20:38 +00:00
|
|
|
highlight_value = str(highlight_value)
|
|
|
|
|
annotation_list.append(self.highlight_pdf_doc(
|
|
|
|
|
pdf_doc=pdf_doc,
|
|
|
|
|
page_index=page_index,
|
|
|
|
|
highlight_value=highlight_value,
|
|
|
|
|
parent_text_block=parent_text_block,
|
|
|
|
|
data_point=data_point,
|
|
|
|
|
annotation_attribute=annotation_attribute
|
|
|
|
|
))
|
2024-11-11 22:34:25 +00:00
|
|
|
if output_pdf_folder is not None and len(output_pdf_folder) > 0:
|
|
|
|
|
os.makedirs(output_pdf_folder, exist_ok=True)
|
|
|
|
|
pdf_file_path = self.save_annotated_pdf(pdf_doc=pdf_doc,
|
|
|
|
|
output_pdf_folder=output_pdf_folder)
|
|
|
|
|
result = {"drilldown_pdf_doc": pdf_doc,
|
|
|
|
|
"annotation_list": annotation_list}
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def save_annotated_pdf(self, pdf_doc: fitz.Document, output_pdf_folder: str):
|
|
|
|
|
try:
|
|
|
|
|
if output_pdf_folder is None or len(output_pdf_folder) == 0 or not os.path.exists(output_pdf_folder):
|
|
|
|
|
return
|
|
|
|
|
if pdf_doc is None and pdf_doc.is_closed:
|
|
|
|
|
return
|
|
|
|
|
pdf_file_name = os.path.basename(self.pdf_file)
|
|
|
|
|
pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf")
|
|
|
|
|
output_pdf_dir = os.path.join(output_pdf_folder, "pdf/")
|
|
|
|
|
os.makedirs(output_pdf_dir, exist_ok=True)
|
|
|
|
|
pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name)
|
|
|
|
|
output_buffer = BytesIO()
|
|
|
|
|
pdf_doc.save(output_buffer)
|
|
|
|
|
|
|
|
|
|
# Save the output buffer to the output file
|
|
|
|
|
with open(pdf_file_path, mode="wb") as f:
|
|
|
|
|
f.write(output_buffer.getbuffer())
|
|
|
|
|
pdf_doc.close()
|
|
|
|
|
logger.info(f"File saved to {pdf_file_path}")
|
|
|
|
|
return pdf_file_path
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print_exc()
|
|
|
|
|
logger.error(f"Error when save output file: {e}")
|
|
|
|
|
|
|
|
|
|
def highlight_pdf_doc(self,
|
|
|
|
|
pdf_doc: fitz.Document,
|
|
|
|
|
page_index: int,
|
|
|
|
|
highlight_value: str,
|
|
|
|
|
parent_text_block: str = None,
|
|
|
|
|
data_point: str = None,
|
|
|
|
|
annotation_attribute: dict = {}):
|
|
|
|
|
page = pdf_doc[page_index]
|
|
|
|
|
page_text = page.get_text()
|
|
|
|
|
|
|
|
|
|
parent_text_block_search_text = None
|
|
|
|
|
if parent_text_block is not None:
|
|
|
|
|
parent_text_block_regex = self.add_slash_to_text_as_regex(parent_text_block)
|
|
|
|
|
parent_text_block_search = re.search(parent_text_block_regex, page_text)
|
|
|
|
|
parent_text_block_search_text = None
|
|
|
|
|
if parent_text_block_search is not None:
|
|
|
|
|
parent_text_block_search_text = parent_text_block_search.group()
|
|
|
|
|
|
|
|
|
|
highlight_value_search_text = ""
|
2024-11-12 17:20:38 +00:00
|
|
|
|
|
|
|
|
if highlight_value is not None and len(highlight_value.strip()) > 0:
|
|
|
|
|
pure_highlight_value = highlight_value.strip()
|
|
|
|
|
|
2024-11-12 23:01:10 +00:00
|
|
|
highlight_value_search_text = None
|
2024-11-12 17:20:38 +00:00
|
|
|
if len(pure_highlight_value.split()) == 1 and \
|
|
|
|
|
(len(pure_highlight_value) < 3 or pure_highlight_value[0].upper() == pure_highlight_value[0]):
|
|
|
|
|
highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value)
|
2024-11-12 23:01:10 +00:00
|
|
|
highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, page_text, ignore_case=False)
|
2024-11-11 22:34:25 +00:00
|
|
|
else:
|
2024-11-12 17:20:38 +00:00
|
|
|
highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value, match_special_char_after_space=False)
|
2024-11-12 23:01:10 +00:00
|
|
|
highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, page_text, ignore_case=True)
|
|
|
|
|
if highlight_value_search_text is None:
|
2024-11-12 17:20:38 +00:00
|
|
|
highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value, match_special_char_after_space=True)
|
2024-11-12 23:01:10 +00:00
|
|
|
highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, page_text, ignore_case=False)
|
2024-11-11 22:34:25 +00:00
|
|
|
|
|
|
|
|
annotation_data = {"pdf_file": self.simple_pdf_file,
|
|
|
|
|
"page_index": page_index,
|
|
|
|
|
"data_point": data_point,
|
|
|
|
|
"value": highlight_value,
|
|
|
|
|
"matching_val_area": []}
|
|
|
|
|
if highlight_value_search_text is not None:
|
|
|
|
|
content = {
|
|
|
|
|
"data_point": data_point,
|
|
|
|
|
"data_value": highlight_value
|
|
|
|
|
}
|
|
|
|
|
# append annotation_attribute to content
|
|
|
|
|
content.update(annotation_attribute)
|
|
|
|
|
|
|
|
|
|
if len(highlight_value_search_text.strip().split()) > 3:
|
|
|
|
|
merge_nearby_lines = True
|
|
|
|
|
else:
|
|
|
|
|
merge_nearby_lines = False
|
|
|
|
|
if parent_text_block_search_text is not None:
|
|
|
|
|
matching_val_area = self.highlight_matching_data(
|
|
|
|
|
page=page,
|
|
|
|
|
text_block=parent_text_block_search_text,
|
|
|
|
|
highlight_text_inside_block=highlight_value_search_text,
|
|
|
|
|
content=content,
|
|
|
|
|
title=data_point,
|
|
|
|
|
only_hightlight_first=False,
|
|
|
|
|
merge_nearby_lines=merge_nearby_lines
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
matching_val_area = self.highlight_matching_data(
|
|
|
|
|
page=page,
|
|
|
|
|
text_block=highlight_value_search_text,
|
|
|
|
|
content=content,
|
|
|
|
|
title=data_point,
|
|
|
|
|
only_hightlight_first=False,
|
|
|
|
|
merge_nearby_lines=merge_nearby_lines
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
bbox_list = []
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
bbox = [area.x0, area.y0, area.x1, area.y1]
|
|
|
|
|
bbox_list.append(bbox)
|
|
|
|
|
# order bbox_list by y0, x0, y1, x1
|
|
|
|
|
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
|
|
|
|
|
annotation_data["matching_val_area"] = bbox_list
|
2024-11-25 21:11:03 +00:00
|
|
|
if len(bbox_list) > 0:
|
|
|
|
|
annotation_data["normalized_bbox"] = self.get_bbox_normalized(page, bbox_list)
|
2024-11-11 22:34:25 +00:00
|
|
|
return annotation_data
|
|
|
|
|
|
2024-11-12 23:01:10 +00:00
|
|
|
def get_proper_search_text(self, raw_value: str, highlight_value_regex: str, page_text: str, ignore_case: bool = True):
|
|
|
|
|
if ignore_case:
|
|
|
|
|
highlight_value_search_iter = re.finditer(highlight_value_regex, page_text, re.IGNORECASE)
|
|
|
|
|
else:
|
|
|
|
|
highlight_value_search_iter = re.finditer(highlight_value_regex, page_text)
|
|
|
|
|
|
|
|
|
|
highlight_value_search_text = None
|
|
|
|
|
for highlight_value_search in highlight_value_search_iter:
|
|
|
|
|
highlight_value_search_text = highlight_value_search.group().strip()
|
|
|
|
|
if highlight_value_search_text == raw_value:
|
|
|
|
|
return highlight_value_search_text
|
|
|
|
|
return highlight_value_search_text
|
|
|
|
|
|
2024-11-12 17:20:38 +00:00
|
|
|
def add_slash_to_text_as_regex(self, text: str, match_special_char_after_space: bool = True):
|
2024-11-11 22:34:25 +00:00
|
|
|
if text is None or len(text) == 0:
|
|
|
|
|
return text
|
|
|
|
|
special_char_iter = re.finditer("\W", text)
|
|
|
|
|
for special_iter in special_char_iter:
|
|
|
|
|
if len(special_iter.group().strip()) == 0:
|
|
|
|
|
continue
|
|
|
|
|
replace = r"\{0}".format(special_iter.group())
|
|
|
|
|
if replace not in text:
|
2024-11-12 23:01:10 +00:00
|
|
|
special_iter_text = special_iter.group()
|
|
|
|
|
if special_iter_text == ")" and text.strip()[-1] == ")" and \
|
|
|
|
|
text.strip().count(")") == 1:
|
|
|
|
|
text = text.replace(")", r"\)")
|
|
|
|
|
else:
|
|
|
|
|
text = re.sub(replace, r"\\W", text)
|
|
|
|
|
|
2024-11-11 22:34:25 +00:00
|
|
|
text = re.sub(r"( ){2,}", " ", text)
|
2024-11-12 17:20:38 +00:00
|
|
|
if match_special_char_after_space:
|
|
|
|
|
text = text.replace(" ", r"\s*\W*")
|
|
|
|
|
else:
|
|
|
|
|
text = text.replace(" ", r"\s*")
|
2024-11-11 22:34:25 +00:00
|
|
|
return text
|
2024-08-19 14:52:13 +00:00
|
|
|
|
|
|
|
|
def highlight_matching_data(
|
|
|
|
|
self,
|
|
|
|
|
page,
|
|
|
|
|
text_block,
|
|
|
|
|
within_bbox: list = None,
|
|
|
|
|
highlight_text_inside_block: str = None,
|
|
|
|
|
content: dict = {},
|
|
|
|
|
title: str = "",
|
|
|
|
|
only_hightlight_first: bool = False,
|
|
|
|
|
exact_match: bool = False,
|
2024-11-08 17:22:35 +00:00
|
|
|
merge_nearby_lines: bool = False,
|
2024-08-19 14:52:13 +00:00
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Highlight matching values
|
2024-11-08 17:22:35 +00:00
|
|
|
page: page object in fitz.Document
|
|
|
|
|
within_bbox: bounding box to search for the text
|
|
|
|
|
text_block: text to search for in page text
|
|
|
|
|
highlight_text_inside_block: text to highlight inside parameter: text_block
|
|
|
|
|
content: content as JSON format to add to the highlight annotation,
|
|
|
|
|
please customize according to relevant business logic
|
|
|
|
|
title: title of the highlight annotation
|
|
|
|
|
only_hightlight_first: only highlight the first match
|
|
|
|
|
exact_match: exact match or not
|
|
|
|
|
merge_nearby_lines: merge nearby lines or not
|
2024-08-19 14:52:13 +00:00
|
|
|
"""
|
|
|
|
|
# logger.info(f"Highlighting matching values in {self.pdf_file}")
|
2024-11-11 22:34:25 +00:00
|
|
|
if text_block is None or len(text_block.strip()) == 0:
|
|
|
|
|
return []
|
2024-08-19 14:52:13 +00:00
|
|
|
if within_bbox is not None:
|
|
|
|
|
matching_val_area = page.search_for(
|
|
|
|
|
text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])
|
|
|
|
|
)
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''),
|
|
|
|
|
clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]))
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(text_block.replace('-\n', ''),
|
|
|
|
|
clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]))
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(text_block)
|
|
|
|
|
else:
|
|
|
|
|
matching_val_area = page.search_for(text_block)
|
2024-11-11 22:34:25 +00:00
|
|
|
|
2024-11-08 17:22:35 +00:00
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(text_block.strip())
|
2024-08-19 14:52:13 +00:00
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''))
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(text_block.replace('-\n', ''))
|
2024-11-08 21:00:34 +00:00
|
|
|
if len(matching_val_area) > 0 and len(text_block.strip().split()) == 1:
|
|
|
|
|
new_matching_val_area = []
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
# get text by text_bbox
|
|
|
|
|
pure_text_block = text_block.strip()
|
2024-11-11 22:34:25 +00:00
|
|
|
raw_area_text = page.get_text("text", clip=area).strip()
|
|
|
|
|
if len(text_block.strip()) < 3 and text_block.strip() != raw_area_text.strip():
|
|
|
|
|
continue
|
2024-11-08 21:00:34 +00:00
|
|
|
copy_area = deepcopy(area)
|
2024-11-11 22:34:25 +00:00
|
|
|
copy_area.x0 -= 15
|
|
|
|
|
copy_area.x1 += 15
|
2024-11-08 21:00:34 +00:00
|
|
|
text = page.get_text("text", clip=copy_area).strip()
|
|
|
|
|
if text == pure_text_block:
|
|
|
|
|
new_matching_val_area.append(area)
|
|
|
|
|
else:
|
|
|
|
|
# get start and end index of the pure_text_block in text
|
|
|
|
|
start_index = text.find(pure_text_block)
|
2024-11-11 22:34:25 +00:00
|
|
|
if start_index == -1:
|
|
|
|
|
continue
|
2024-11-08 21:00:34 +00:00
|
|
|
if start_index > 0:
|
2024-11-12 17:40:19 +00:00
|
|
|
previous_char = text[start_index - 1].strip()
|
|
|
|
|
if previous_char not in ["", " ", "("]:
|
2024-11-08 21:00:34 +00:00
|
|
|
continue
|
|
|
|
|
end_index = start_index + len(pure_text_block)
|
|
|
|
|
if end_index < len(text):
|
2024-11-12 17:40:19 +00:00
|
|
|
next_char = text[end_index].strip()
|
2024-11-25 21:11:03 +00:00
|
|
|
if next_char not in ["", " ", "%", ")", "0"]:
|
2024-11-08 21:00:34 +00:00
|
|
|
continue
|
|
|
|
|
new_matching_val_area.append(area)
|
|
|
|
|
matching_val_area = new_matching_val_area
|
2024-08-19 14:52:13 +00:00
|
|
|
if (
|
|
|
|
|
highlight_text_inside_block is not None
|
|
|
|
|
and len(highlight_text_inside_block) > 0
|
2024-11-08 21:00:34 +00:00
|
|
|
and len(matching_val_area) > 0
|
2024-08-19 14:52:13 +00:00
|
|
|
):
|
|
|
|
|
highlight_bbox_list = []
|
2024-11-08 17:22:35 +00:00
|
|
|
merged_matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines)
|
|
|
|
|
pure_number_regex = re.compile(r"^\d+$")
|
|
|
|
|
for area in merged_matching_val_area:
|
2024-08-19 14:52:13 +00:00
|
|
|
text_bbox_area = page.search_for(
|
|
|
|
|
highlight_text_inside_block,
|
|
|
|
|
clip=[area.x0, area.y0, area.x1, area.y1],
|
|
|
|
|
)
|
|
|
|
|
if text_bbox_area is not None and len(text_bbox_area) > 0:
|
|
|
|
|
if only_hightlight_first:
|
|
|
|
|
highlight_bbox_list.append(text_bbox_area[0])
|
|
|
|
|
break
|
|
|
|
|
else:
|
2024-11-08 17:22:35 +00:00
|
|
|
pure_number_match = pure_number_regex.match(highlight_text_inside_block)
|
|
|
|
|
if pure_number_match is not None and pure_number_match.group() == highlight_text_inside_block:
|
2024-11-08 21:00:34 +00:00
|
|
|
for area in text_bbox_area:
|
2024-11-08 17:22:35 +00:00
|
|
|
# get text by text_bbox
|
2024-11-08 21:00:34 +00:00
|
|
|
copy_area = deepcopy(area)
|
2024-11-11 22:34:25 +00:00
|
|
|
copy_area.x0 -= 15
|
|
|
|
|
copy_area.x1 += 15
|
2024-11-08 21:00:34 +00:00
|
|
|
text = page.get_text("text", clip=copy_area).strip()
|
2024-11-08 17:22:35 +00:00
|
|
|
if text == highlight_text_inside_block:
|
2024-11-08 21:00:34 +00:00
|
|
|
highlight_bbox_list.append(area)
|
2024-11-08 17:22:35 +00:00
|
|
|
else:
|
|
|
|
|
# get start and end index of the highlight_text_inside_block in text
|
|
|
|
|
start_index = text.find(highlight_text_inside_block)
|
|
|
|
|
if start_index > 0:
|
2024-11-12 17:40:19 +00:00
|
|
|
previous_char = text[start_index - 1].strip()
|
|
|
|
|
if previous_char not in ["", " ", "("]:
|
2024-11-08 17:22:35 +00:00
|
|
|
continue
|
|
|
|
|
end_index = start_index + len(highlight_text_inside_block)
|
|
|
|
|
if end_index < len(text):
|
2024-11-12 17:40:19 +00:00
|
|
|
next_char = text[end_index].strip()
|
|
|
|
|
if next_char not in ["", " ", "%", ")"]:
|
2024-11-08 17:22:35 +00:00
|
|
|
continue
|
2024-11-08 21:00:34 +00:00
|
|
|
highlight_bbox_list.append(area)
|
2024-11-08 17:22:35 +00:00
|
|
|
else:
|
|
|
|
|
highlight_bbox_list.extend(text_bbox_area)
|
|
|
|
|
if len(highlight_bbox_list) == 0 and len(highlight_text_inside_block.strip().split()) > 2:
|
|
|
|
|
highlight_bbox_list = text_bbox_area = page.search_for(
|
|
|
|
|
highlight_text_inside_block
|
|
|
|
|
)
|
2024-08-19 14:52:13 +00:00
|
|
|
matching_val_area = highlight_bbox_list
|
|
|
|
|
else:
|
|
|
|
|
if only_hightlight_first:
|
|
|
|
|
matching_val_area = [matching_val_area[0]]
|
|
|
|
|
|
|
|
|
|
if matching_val_area is not None and len(matching_val_area) > 0:
|
2024-11-08 21:00:34 +00:00
|
|
|
if (highlight_text_inside_block is not None and len(highlight_text_inside_block.strip().split()) > 1) or \
|
|
|
|
|
(highlight_text_inside_block is None and len(text_block.strip().split()) > 1):
|
|
|
|
|
matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines)
|
2024-08-19 14:52:13 +00:00
|
|
|
if exact_match:
|
|
|
|
|
matching_val_area = self.get_exact_match_area(page, matching_val_area, text_block)
|
|
|
|
|
# matching_val_area = self.merge_matching_val_area(matching_val_area)
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
highlight = page.add_highlight_annot([area])
|
|
|
|
|
bbox_list = [area.x0, area.y0, area.x1, area.y1]
|
|
|
|
|
content["bbox"] = bbox_list
|
2024-11-08 17:22:35 +00:00
|
|
|
normalized_bbox = self.get_bbox_normalized(page, [bbox_list])
|
|
|
|
|
if len(normalized_bbox) > 0:
|
|
|
|
|
content["normalized_bbox"] = normalized_bbox[0]
|
|
|
|
|
else:
|
|
|
|
|
content["normalized_bbox"] = []
|
2024-08-19 14:52:13 +00:00
|
|
|
content_text = json.dumps(content)
|
|
|
|
|
highlight.set_info(content=content_text, title=title)
|
|
|
|
|
highlight.update()
|
|
|
|
|
return matching_val_area
|
|
|
|
|
|
|
|
|
|
def get_exact_match_area(self, page, matching_val_area, search_text):
|
|
|
|
|
results = []
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
area_text = page.get_text("text", clip=area).strip()
|
|
|
|
|
area_text_list = area_text.split()
|
|
|
|
|
search_text_list = search_text.split()
|
|
|
|
|
capital_not_match = False
|
|
|
|
|
any_word_match = False
|
|
|
|
|
for search_split in search_text_list:
|
|
|
|
|
if search_split in area_text_list:
|
|
|
|
|
any_word_match = True
|
|
|
|
|
search_split_lower = search_split.lower()
|
|
|
|
|
if search_split_lower in area_text_list and \
|
|
|
|
|
search_split not in area_text_list:
|
|
|
|
|
capital_not_match = True
|
|
|
|
|
break
|
|
|
|
|
if capital_not_match:
|
|
|
|
|
continue
|
|
|
|
|
elif any_word_match:
|
|
|
|
|
results.append(area)
|
|
|
|
|
else:
|
|
|
|
|
pass
|
|
|
|
|
return results
|
|
|
|
|
|
2024-11-08 17:22:35 +00:00
|
|
|
def merge_matching_val_area(self, matching_val_area, merge_nearby_lines=False):
|
2024-08-19 14:52:13 +00:00
|
|
|
"""
|
|
|
|
|
Merge the matching val areas which with same y0 and y1,
|
|
|
|
|
the x0 is the min x0, x1 is the max x1
|
|
|
|
|
"""
|
|
|
|
|
if matching_val_area is None or len(matching_val_area) == 0:
|
|
|
|
|
return matching_val_area
|
|
|
|
|
if len(matching_val_area) == 1:
|
|
|
|
|
return matching_val_area
|
|
|
|
|
# unify the y0 and y1 which are close to each other (less than 5 pixels)
|
|
|
|
|
y0_list = []
|
|
|
|
|
y1_list = []
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
y0 = area.y0
|
|
|
|
|
y1 = area.y1
|
|
|
|
|
if len(y0_list) == 0:
|
|
|
|
|
y0_list.append(y0)
|
|
|
|
|
y1_list.append(y1)
|
|
|
|
|
else:
|
|
|
|
|
for t_y0 in y0_list:
|
|
|
|
|
if abs(t_y0 - y0) < 5:
|
|
|
|
|
area.y0 = t_y0
|
|
|
|
|
else:
|
|
|
|
|
if y0 not in y0_list:
|
|
|
|
|
y0_list.append(y0)
|
|
|
|
|
for t_y1 in y1_list:
|
|
|
|
|
if abs(t_y1 - y1) < 5:
|
|
|
|
|
area.y1 = t_y1
|
|
|
|
|
else:
|
|
|
|
|
if y1 not in y1_list:
|
|
|
|
|
y1_list.append(y1)
|
|
|
|
|
# get area list which with same y0 and y1
|
|
|
|
|
y0_y1_list = list(set([(area.y0, area.y1) for area in matching_val_area]))
|
|
|
|
|
|
|
|
|
|
new_matching_val_area = []
|
|
|
|
|
for y0_y1 in y0_y1_list:
|
|
|
|
|
y0 = y0_y1[0]
|
|
|
|
|
y1 = y0_y1[1]
|
|
|
|
|
x0_list = [area.x0 for area in matching_val_area if area.y0 == y0 and area.y1 == y1]
|
|
|
|
|
x1_list = [area.x1 for area in matching_val_area if area.y0 == y0 and area.y1 == y1]
|
|
|
|
|
min_x0 = min(x0_list)
|
|
|
|
|
max_x1 = max(x1_list)
|
|
|
|
|
new_matching_val_area.append(fitz.Rect(min_x0, y0, max_x1, y1))
|
2024-11-08 17:22:35 +00:00
|
|
|
if merge_nearby_lines and len(new_matching_val_area) > 1:
|
|
|
|
|
new_matching_val_area = self.merge_nearby_lines(new_matching_val_area)
|
|
|
|
|
# merge again
|
|
|
|
|
if len(new_matching_val_area) > 1:
|
|
|
|
|
new_matching_val_area = self.merge_nearby_lines(new_matching_val_area)
|
|
|
|
|
elif len(new_matching_val_area) > 1:
|
|
|
|
|
new_matching_val_area = self.remove_small_pitches(new_matching_val_area)
|
|
|
|
|
else:
|
|
|
|
|
pass
|
|
|
|
|
return new_matching_val_area
|
|
|
|
|
|
|
|
|
|
def remove_small_pitches(self, matching_val_area):
|
|
|
|
|
x_mini_threshold = 5
|
|
|
|
|
new_matching_val_area = []
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
if area.x1 - area.x0 > x_mini_threshold:
|
|
|
|
|
new_matching_val_area.append(area)
|
|
|
|
|
return new_matching_val_area
|
|
|
|
|
|
|
|
|
|
def merge_nearby_lines(self, matching_val_area):
|
|
|
|
|
bbox_list = []
|
|
|
|
|
|
|
|
|
|
for bbox in matching_val_area:
|
|
|
|
|
bbox = [bbox.x0, bbox.y0, bbox.x1, bbox.y1]
|
|
|
|
|
bbox_list.append(bbox)
|
|
|
|
|
# order bbox_list by y0, x0, y1, x1
|
|
|
|
|
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
|
|
|
|
|
new_matching_val_area = []
|
|
|
|
|
|
|
|
|
|
last_x0 = None
|
|
|
|
|
last_x1 = None
|
|
|
|
|
last_y0 = None
|
|
|
|
|
last_y1 = None
|
|
|
|
|
x_mini_threshold = 5
|
|
|
|
|
y_threshold = 15
|
|
|
|
|
x_threshold = 10
|
|
|
|
|
for index, bbox in enumerate(bbox_list):
|
|
|
|
|
if bbox[2] - bbox[0] <= x_mini_threshold:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if index == 0 or last_x0 is None:
|
|
|
|
|
last_x0 = bbox[0]
|
|
|
|
|
last_y0 = bbox[1]
|
|
|
|
|
last_x1 = bbox[2]
|
|
|
|
|
last_y1 = bbox[3]
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
x0 = bbox[0]
|
|
|
|
|
y0 = bbox[1]
|
|
|
|
|
x1 = bbox[2]
|
|
|
|
|
y1 = bbox[3]
|
|
|
|
|
|
|
|
|
|
last_x0_x1_range = [i for i in range(int(last_x0), int(last_x1))]
|
|
|
|
|
x0_x1_range = [i for i in range(int(x0), int(x1))]
|
|
|
|
|
x_intersection = list(set(last_x0_x1_range).intersection(set(x0_x1_range)))
|
|
|
|
|
|
|
|
|
|
# abs(y0 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)
|
|
|
|
|
# exist nearby line as vertical direction,
|
|
|
|
|
# the horizontal coordinates are intersected or the horizontal coordinates are close to each other
|
|
|
|
|
|
|
|
|
|
# abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold
|
|
|
|
|
# exist nearby line as horizontal direction,
|
|
|
|
|
# last sentence is the begin of the current sentence
|
|
|
|
|
|
|
|
|
|
# abs(y1 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)
|
|
|
|
|
# last sentence and current sentence are in the same horizontal line
|
|
|
|
|
# the horizontal coordinates of are last sentence and current sentence intersected
|
|
|
|
|
# or the horizontal coordinates are close to each other
|
|
|
|
|
|
|
|
|
|
if (abs(y0 - last_y1) <= y_threshold and
|
|
|
|
|
(len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)) or \
|
|
|
|
|
(abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold) or \
|
|
|
|
|
(abs(y1 - last_y1) <= y_threshold and
|
|
|
|
|
(len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)):
|
|
|
|
|
last_x0 = min(last_x0, x0)
|
|
|
|
|
last_x1 = max(last_x1, x1)
|
|
|
|
|
last_y0 = min(last_y0, y0)
|
|
|
|
|
last_y1 = max(last_y1, y1)
|
|
|
|
|
else:
|
|
|
|
|
new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1))
|
|
|
|
|
last_x0 = x0
|
|
|
|
|
last_x1 = x1
|
|
|
|
|
last_y0 = y0
|
|
|
|
|
last_y1 = y1
|
|
|
|
|
new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1))
|
2024-08-19 14:52:13 +00:00
|
|
|
return new_matching_val_area
|
|
|
|
|
|
|
|
|
|
def highlight_matching_paragraph_text(
|
|
|
|
|
self,
|
|
|
|
|
pdf_doc: fitz.Document,
|
|
|
|
|
page_index: int,
|
|
|
|
|
search_paragraph_text: str,
|
|
|
|
|
sibling_paragraph_text_list: list = [],
|
|
|
|
|
next_page_found_lines: list = [],
|
|
|
|
|
content: dict = {},
|
|
|
|
|
title: str = "",
|
|
|
|
|
):
|
|
|
|
|
page = pdf_doc[page_index]
|
|
|
|
|
page_text = page.get_text("text")
|
|
|
|
|
page_lines = [
|
|
|
|
|
line for line in page_text.split("\n") if len(line.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
matching_val_area = []
|
|
|
|
|
find_begin = False
|
|
|
|
|
search_paragraph_text_words = [
|
|
|
|
|
word.strip()
|
|
|
|
|
for word in search_paragraph_text.lower().split()
|
|
|
|
|
if len(word.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
found_words = []
|
|
|
|
|
found_lines = []
|
|
|
|
|
jacard_similarity = 0
|
|
|
|
|
found_matched = False
|
|
|
|
|
found_lines_dict = {}
|
|
|
|
|
for index, line in enumerate(page_lines):
|
|
|
|
|
if len(next_page_found_lines) > 0:
|
|
|
|
|
if line in next_page_found_lines:
|
|
|
|
|
continue
|
|
|
|
|
words = [
|
|
|
|
|
word.strip() for word in line.lower().split() if len(word.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
if len(words) == 0:
|
|
|
|
|
continue
|
|
|
|
|
if find_begin:
|
|
|
|
|
found_words.extend(words)
|
|
|
|
|
new_jacard_similarity = self.similarity.jaccard_similarity(
|
|
|
|
|
search_paragraph_text_words, found_words
|
|
|
|
|
)
|
|
|
|
|
if new_jacard_similarity > jacard_similarity:
|
|
|
|
|
jacard_similarity = new_jacard_similarity
|
|
|
|
|
found_lines.append(line)
|
|
|
|
|
else:
|
|
|
|
|
if jacard_similarity > 0.4:
|
|
|
|
|
found_matched = True
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
if search_paragraph_text_words[0].lower() in line.lower() and \
|
|
|
|
|
search_paragraph_text_words[1].lower() in line.lower() and \
|
|
|
|
|
search_paragraph_text_words[2].lower() in line.lower():
|
|
|
|
|
jacard_similarity = self.similarity.jaccard_similarity(
|
|
|
|
|
search_paragraph_text_words, words
|
|
|
|
|
)
|
|
|
|
|
if jacard_similarity > 0.05:
|
|
|
|
|
find_begin = True
|
|
|
|
|
found_words.extend(words)
|
|
|
|
|
found_lines.append(line)
|
|
|
|
|
if jacard_similarity > 0.4:
|
|
|
|
|
found_matched = True
|
|
|
|
|
|
|
|
|
|
if found_matched and len(found_lines) > 0:
|
|
|
|
|
total_matching_val_area = []
|
|
|
|
|
for line in found_lines:
|
|
|
|
|
matching_val_area = page.search_for(line)
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(line.strip())
|
|
|
|
|
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
continue
|
|
|
|
|
elif len(matching_val_area) == 1:
|
|
|
|
|
total_matching_val_area.extend(matching_val_area)
|
|
|
|
|
else:
|
|
|
|
|
y1_list = [area.y1 for area in matching_val_area]
|
|
|
|
|
if len(total_matching_val_area) == 0:
|
|
|
|
|
y1_min = max(y1_list)
|
|
|
|
|
y1_min_index = y1_list.index(y1_min)
|
|
|
|
|
total_matching_val_area.append(matching_val_area[y1_min_index])
|
|
|
|
|
else:
|
|
|
|
|
last_y1 = total_matching_val_area[-1].y1
|
|
|
|
|
latest_bigger_y1_list = max(
|
|
|
|
|
[y1 for y1 in y1_list if y1 > last_y1]
|
|
|
|
|
)
|
|
|
|
|
latest_bigger_y1_index = y1_list.index(latest_bigger_y1_list)
|
|
|
|
|
total_matching_val_area.append(
|
|
|
|
|
matching_val_area[latest_bigger_y1_index]
|
|
|
|
|
)
|
|
|
|
|
# get min x0, min y0, max x1, max y1 from total_matching_val_area
|
|
|
|
|
x0_list = [area.x0 for area in total_matching_val_area]
|
|
|
|
|
y0_list = [area.y0 for area in total_matching_val_area]
|
|
|
|
|
x1_list = [area.x1 for area in total_matching_val_area]
|
|
|
|
|
y1_list = [area.y1 for area in total_matching_val_area]
|
|
|
|
|
min_x0 = min(x0_list)
|
|
|
|
|
min_y0 = min(y0_list)
|
|
|
|
|
max_x1 = max(x1_list)
|
|
|
|
|
max_y1 = max(y1_list)
|
|
|
|
|
matching_val_area = [fitz.Rect(min_x0, min_y0, max_x1, max_y1)]
|
|
|
|
|
highlight = page.add_highlight_annot(matching_val_area)
|
|
|
|
|
bbox_list = [[min_x0, min_y0, max_x1, max_y1]]
|
|
|
|
|
content["bbox"] = bbox_list
|
|
|
|
|
content_text = json.dumps(content)
|
|
|
|
|
highlight.set_info(content=content_text, title=title)
|
|
|
|
|
highlight.update()
|
|
|
|
|
|
|
|
|
|
found_lines_dict = {
|
|
|
|
|
page_index: {"bbox_list": bbox_list, "found_lines": found_lines}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# jacard_similarity is between 0.4 and 0.9,
|
|
|
|
|
# perhaps there are left lines in next page, so we need to check the next page.
|
|
|
|
|
if jacard_similarity > 0.4 and jacard_similarity < 0.9:
|
|
|
|
|
next_page_index = page_index + 1
|
|
|
|
|
if next_page_index < pdf_doc.page_count:
|
|
|
|
|
next_page = pdf_doc[next_page_index]
|
|
|
|
|
next_page_text = next_page.get_text("text")
|
|
|
|
|
next_page_lines = [
|
|
|
|
|
line
|
|
|
|
|
for line in next_page_text.split("\n")
|
|
|
|
|
if len(line.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
found_line_index = -1
|
|
|
|
|
for i in range(10):
|
|
|
|
|
if len(next_page_lines) < i + 1:
|
|
|
|
|
break
|
|
|
|
|
next_page_line = next_page_lines[i]
|
|
|
|
|
words = [
|
|
|
|
|
word.strip()
|
|
|
|
|
for word in next_page_line.lower().split()
|
|
|
|
|
if len(word.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
if len(words) == 0:
|
|
|
|
|
continue
|
|
|
|
|
temp_found_words = found_words + words
|
|
|
|
|
new_jacard_similarity = self.similarity.jaccard_similarity(
|
|
|
|
|
search_paragraph_text_words, temp_found_words
|
|
|
|
|
)
|
|
|
|
|
if new_jacard_similarity > jacard_similarity:
|
|
|
|
|
found_line_index = i
|
|
|
|
|
break
|
|
|
|
|
if found_line_index != -1:
|
|
|
|
|
new_found_words = found_words
|
|
|
|
|
new_found_lines = []
|
|
|
|
|
found_matched = False
|
|
|
|
|
for index, line in enumerate(next_page_lines):
|
|
|
|
|
if index < found_line_index:
|
|
|
|
|
continue
|
|
|
|
|
words = [
|
|
|
|
|
word.strip()
|
|
|
|
|
for word in line.lower().split()
|
|
|
|
|
if len(word.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
if len(words) == 0:
|
|
|
|
|
continue
|
|
|
|
|
new_found_words.extend(words)
|
|
|
|
|
new_jacard_similarity = (
|
|
|
|
|
self.similarity.jaccard_similarity(
|
|
|
|
|
search_paragraph_text_words, new_found_words
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if new_jacard_similarity > jacard_similarity:
|
|
|
|
|
jacard_similarity = new_jacard_similarity
|
|
|
|
|
new_found_lines.append(line)
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
|
if len(new_found_lines) > 0:
|
|
|
|
|
total_matching_val_area = []
|
|
|
|
|
for line in new_found_lines:
|
|
|
|
|
matching_val_area = next_page.search_for(line)
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
matching_val_area = page.search_for(line.strip())
|
|
|
|
|
|
|
|
|
|
if len(matching_val_area) == 0:
|
|
|
|
|
continue
|
|
|
|
|
elif len(matching_val_area) == 1:
|
|
|
|
|
total_matching_val_area.extend(matching_val_area)
|
|
|
|
|
else:
|
|
|
|
|
y1_list = [area.y1 for area in matching_val_area]
|
|
|
|
|
if len(total_matching_val_area) == 0:
|
|
|
|
|
y1_min = max(y1_list)
|
|
|
|
|
y1_min_index = y1_list.index(y1_min)
|
|
|
|
|
total_matching_val_area.append(
|
|
|
|
|
matching_val_area[y1_min_index]
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
last_y1 = total_matching_val_area[-1].y1
|
|
|
|
|
latest_bigger_y1_list = max(
|
|
|
|
|
[y1 for y1 in y1_list if y1 > last_y1]
|
|
|
|
|
)
|
|
|
|
|
latest_bigger_y1_index = y1_list.index(
|
|
|
|
|
latest_bigger_y1_list
|
|
|
|
|
)
|
|
|
|
|
total_matching_val_area.append(
|
|
|
|
|
matching_val_area[latest_bigger_y1_index]
|
|
|
|
|
)
|
|
|
|
|
# get min x0, min y0, max x1, max y1 from total_matching_val_area
|
|
|
|
|
x0_list = [area.x0 for area in total_matching_val_area]
|
|
|
|
|
y0_list = [area.y0 for area in total_matching_val_area]
|
|
|
|
|
x1_list = [area.x1 for area in total_matching_val_area]
|
|
|
|
|
y1_list = [area.y1 for area in total_matching_val_area]
|
|
|
|
|
min_x0 = min(x0_list)
|
|
|
|
|
min_y0 = min(y0_list)
|
|
|
|
|
max_x1 = max(x1_list)
|
|
|
|
|
max_y1 = max(y1_list)
|
|
|
|
|
matching_val_area = [
|
|
|
|
|
fitz.Rect(min_x0, min_y0, max_x1, max_y1)
|
|
|
|
|
]
|
|
|
|
|
highlight = next_page.add_highlight_annot(matching_val_area)
|
|
|
|
|
new_bbox_list = [[min_x0, min_y0, max_x1, max_y1]]
|
|
|
|
|
content["found_page"] = next_page_index
|
|
|
|
|
content["bbox"] = new_bbox_list
|
|
|
|
|
content_text = json.dumps(content)
|
|
|
|
|
highlight.set_info(content=content_text, title=title)
|
|
|
|
|
highlight.update()
|
|
|
|
|
found_lines_dict[next_page_index] = {
|
|
|
|
|
"bbox_list": new_bbox_list,
|
|
|
|
|
"found_lines": new_found_lines,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
found_lines_dict_keys = list(found_lines_dict.keys())
|
|
|
|
|
exact_match = True
|
|
|
|
|
exact_match_search_paragraph_text = search_paragraph_text
|
|
|
|
|
if len(found_lines_dict_keys) > 0 and len(sibling_paragraph_text_list) > 0:
|
|
|
|
|
found_line_list = []
|
|
|
|
|
for key in found_lines_dict_keys:
|
|
|
|
|
found_line_list.extend(found_lines_dict[key]["found_lines"])
|
|
|
|
|
found_line_words = []
|
|
|
|
|
for line in found_line_list:
|
|
|
|
|
words = [
|
|
|
|
|
word.strip()
|
|
|
|
|
for word in line.lower().split()
|
|
|
|
|
if len(word.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
found_line_words.extend(words)
|
|
|
|
|
|
|
|
|
|
max_sibling_jacard_similarity = 0
|
|
|
|
|
max_sibling_jacard_similarity_index = -1
|
|
|
|
|
for index, sibling_paragraph_text in enumerate(
|
|
|
|
|
sibling_paragraph_text_list
|
|
|
|
|
):
|
|
|
|
|
sibling_paragraph_text_words = [
|
|
|
|
|
word.strip()
|
|
|
|
|
for word in sibling_paragraph_text.lower().split()
|
|
|
|
|
if len(word.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
sibling_jacard_similarity = self.similarity.jaccard_similarity(
|
|
|
|
|
sibling_paragraph_text_words, found_line_words
|
|
|
|
|
)
|
|
|
|
|
if sibling_jacard_similarity > max_sibling_jacard_similarity:
|
|
|
|
|
max_sibling_jacard_similarity = sibling_jacard_similarity
|
|
|
|
|
max_sibling_jacard_similarity_index = index
|
|
|
|
|
if max_sibling_jacard_similarity > jacard_similarity:
|
|
|
|
|
exact_match = False
|
|
|
|
|
exact_match_search_paragraph_text = sibling_paragraph_text_list[
|
|
|
|
|
max_sibling_jacard_similarity_index
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"found_lines_dict": found_lines_dict,
|
|
|
|
|
"exact_match": exact_match,
|
|
|
|
|
"exact_match_search_paragraph_text": exact_match_search_paragraph_text,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def get_page_range_by_keywords(
|
|
|
|
|
self,
|
|
|
|
|
pdf_doc,
|
|
|
|
|
start_keywords,
|
|
|
|
|
end_keywords,
|
|
|
|
|
return_page_text_list=False,
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Get page range by keywords
|
|
|
|
|
pdf_doc: pdf document
|
|
|
|
|
page_range_start_keywords: list of start keywords
|
|
|
|
|
page_range_end_keywords: list of end keywords
|
|
|
|
|
return_page_text_list: return page text list or not
|
|
|
|
|
"""
|
|
|
|
|
start_page = -1
|
|
|
|
|
end_page = -1
|
|
|
|
|
if len(start_keywords) == 0 or len(end_keywords) == 0:
|
|
|
|
|
start_page = 0
|
|
|
|
|
if len(start_keywords) == 0 and len(end_keywords) == 0:
|
|
|
|
|
end_page = pdf_doc.page_count - 1
|
|
|
|
|
search_start = 0
|
|
|
|
|
# avoid to search the TOC part
|
|
|
|
|
if pdf_doc.page_count > 20:
|
|
|
|
|
search_start = 8
|
|
|
|
|
for page_index in range(search_start, pdf_doc.page_count):
|
|
|
|
|
if start_page >= 0 and end_page >= 0:
|
|
|
|
|
break
|
|
|
|
|
page = pdf_doc[page_index]
|
|
|
|
|
page_text = page.get_text("text").strip()
|
|
|
|
|
page_text_list = [
|
|
|
|
|
split.strip()
|
|
|
|
|
for split in page_text.split("\n")
|
|
|
|
|
if len(split.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
if start_page == -1:
|
|
|
|
|
find = self.find_keywords_in_text_list(page_text_list, start_keywords)
|
|
|
|
|
if find:
|
|
|
|
|
start_page = page_index
|
|
|
|
|
|
|
|
|
|
if start_page >= 0 and end_page == -1:
|
|
|
|
|
find = self.find_keywords_in_text_list(page_text_list, end_keywords)
|
|
|
|
|
if find:
|
|
|
|
|
end_page = page_index
|
|
|
|
|
break
|
|
|
|
|
# return page_list which starts from start_page and ends at end_page
|
|
|
|
|
page_text_list = []
|
|
|
|
|
if start_page >= 0 and end_page >= 0:
|
|
|
|
|
page_list = [i for i in range(start_page, end_page)]
|
|
|
|
|
if return_page_text_list:
|
|
|
|
|
for page_index in page_list:
|
|
|
|
|
page = pdf_doc[page_index]
|
|
|
|
|
page_text = page.get_text("text").strip()
|
|
|
|
|
page_text_list.append(page_text)
|
|
|
|
|
else:
|
|
|
|
|
page_list = []
|
|
|
|
|
return page_list, page_text_list
|
|
|
|
|
|
|
|
|
|
def exist_keywords_in_text_list(self, page_text, keywords):
|
|
|
|
|
page_text_list = [
|
|
|
|
|
split.strip() for split in page_text.split("\n") if len(split.strip()) > 0
|
|
|
|
|
]
|
|
|
|
|
find = self.find_keywords_in_text_list(page_text_list, keywords)
|
|
|
|
|
return find
|
|
|
|
|
|
|
|
|
|
def find_keywords_in_text_list(self, text_list, keywords):
|
|
|
|
|
"""
|
|
|
|
|
Find keywords in text list
|
|
|
|
|
"""
|
|
|
|
|
find = False
|
|
|
|
|
for keyword in keywords:
|
|
|
|
|
for index, line in enumerate(text_list):
|
|
|
|
|
if line.lower().startswith(keyword.lower()):
|
|
|
|
|
lower_case_begin_words_count = (
|
|
|
|
|
self.get_lower_case_begin_words_count(line)
|
|
|
|
|
)
|
|
|
|
|
if lower_case_begin_words_count > 3:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if line.upper() == line:
|
|
|
|
|
find = True
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if index != 0:
|
|
|
|
|
lower_case_begin_words_count = (
|
|
|
|
|
self.get_lower_case_begin_words_count(text_list[index - 1])
|
|
|
|
|
)
|
|
|
|
|
if lower_case_begin_words_count > 3:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if index > 5:
|
|
|
|
|
if "." in line or "," in line:
|
|
|
|
|
continue
|
|
|
|
|
find = True
|
|
|
|
|
break
|
|
|
|
|
if find:
|
|
|
|
|
break
|
|
|
|
|
return find
|
|
|
|
|
|
|
|
|
|
def get_lower_case_begin_words_count(self, text):
|
|
|
|
|
count = 0
|
|
|
|
|
for word in text.split():
|
|
|
|
|
if word[0].islower():
|
|
|
|
|
count += 1
|
|
|
|
|
return count
|
|
|
|
|
|
|
|
|
|
def process_data(
|
|
|
|
|
self,
|
|
|
|
|
output_file: str,
|
|
|
|
|
dp_Value_info: dict,
|
|
|
|
|
pages: Tuple = None,
|
|
|
|
|
action: str = "Highlight",
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Process the pages of the PDF File
|
|
|
|
|
1. Open the input file.
|
|
|
|
|
2. Create a memory buffer for storing temporarily the output file.
|
|
|
|
|
3. Initialize a variable for storing the total number of matches of the string we were searching for.
|
|
|
|
|
4. Iterate throughout the selected pages of the input file and split the current page into lines.
|
|
|
|
|
5. Search for the string within the page.
|
|
|
|
|
6. Apply the corresponding action (i.e "Redact", "Frame", "Highlight", etc.)
|
|
|
|
|
7. Display a message signaling the status of the search process.
|
|
|
|
|
8. Save and close the input file.
|
|
|
|
|
9. Save the memory buffer to the output file.
|
|
|
|
|
|
|
|
|
|
output_file: The path of the PDF file to generate after processing.
|
|
|
|
|
dp_Value_info: The information for data points.
|
|
|
|
|
pages: The pages to consider while processing the PDF file.
|
|
|
|
|
action: The action to perform on the PDF file.
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f"Processing {self.pdf_file}")
|
|
|
|
|
data_list = []
|
|
|
|
|
try:
|
|
|
|
|
# Save the generated PDF to memory buffer
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
try:
|
|
|
|
|
pdf_encrypted = pdf_doc.isEncrypted
|
|
|
|
|
except:
|
|
|
|
|
pdf_encrypted = pdf_doc.is_encrypted
|
|
|
|
|
if pdf_encrypted:
|
|
|
|
|
pdf_doc.authenticate("")
|
|
|
|
|
output_buffer = BytesIO()
|
|
|
|
|
find_value_dp_list = []
|
|
|
|
|
matching_val_area_list = []
|
|
|
|
|
|
|
|
|
|
page_list = [i for i in range(pdf_doc.page_count)]
|
|
|
|
|
|
|
|
|
|
dp_range_page_list = {}
|
|
|
|
|
|
|
|
|
|
for dp_name, dp_detail in dp_Value_info.items():
|
|
|
|
|
if not isinstance(dp_detail, dict):
|
|
|
|
|
continue
|
|
|
|
|
page_range_start_keywords = dp_detail.get(
|
|
|
|
|
"page_range_start_keywords", []
|
|
|
|
|
)
|
|
|
|
|
page_range_end_keywords = dp_detail.get("page_range_end_keywords", [])
|
|
|
|
|
if (
|
|
|
|
|
len(page_range_start_keywords) > 0
|
|
|
|
|
and len(page_range_end_keywords) > 0
|
|
|
|
|
):
|
|
|
|
|
page_list, page_text_list = self.get_page_range_by_keywords(
|
|
|
|
|
pdf_doc,
|
|
|
|
|
page_range_start_keywords,
|
|
|
|
|
page_range_end_keywords,
|
|
|
|
|
return_page_text_list=False,
|
|
|
|
|
)
|
|
|
|
|
dp_range_page_list[dp_name] = page_list
|
|
|
|
|
|
|
|
|
|
# Iterate through pages
|
|
|
|
|
next_page_found_lines = []
|
|
|
|
|
for page_index in page_list:
|
|
|
|
|
# If required for specific pages
|
|
|
|
|
if pages:
|
|
|
|
|
if page_index not in pages:
|
|
|
|
|
continue
|
|
|
|
|
# Select the page
|
|
|
|
|
page = pdf_doc[page_index]
|
|
|
|
|
# Get Matching Data
|
|
|
|
|
# Split page by lines
|
|
|
|
|
page_text = page.get_text("text")
|
|
|
|
|
# if page_index in [24, 25]:
|
|
|
|
|
# print(page_text)
|
|
|
|
|
for dp_name, dp_detail in dp_Value_info.items():
|
|
|
|
|
if not isinstance(dp_detail, dict):
|
|
|
|
|
continue
|
|
|
|
|
dp_biz_name = dp_detail.get("biz_name", "")
|
|
|
|
|
dp_level = dp_detail.get("level", "")
|
|
|
|
|
dp_value = dp_detail.get("value", "")
|
|
|
|
|
value_text_type = dp_detail.get("value_text_type", "string")
|
|
|
|
|
if value_text_type == "string":
|
|
|
|
|
dp_value_text = dp_detail.get("value_text", "")
|
|
|
|
|
elif value_text_type == "list":
|
|
|
|
|
dp_value_text = dp_detail.get("value_text", [])
|
|
|
|
|
else:
|
|
|
|
|
dp_value_text = dp_detail.get("value_text", "")
|
|
|
|
|
value_text_structure = dp_detail.get("value_text_structure", "word")
|
|
|
|
|
|
|
|
|
|
inner_context_regex = dp_detail.get("inner_context_regex", "")
|
|
|
|
|
text_value_dict = dp_detail.get("text_value_dict", {})
|
|
|
|
|
|
|
|
|
|
search_str_list = dp_detail.get("regex_list", [])
|
|
|
|
|
only_hightlight_value_text = dp_detail.get(
|
|
|
|
|
"only_hightlight_value_text", False
|
|
|
|
|
)
|
|
|
|
|
only_hightlight_first = dp_detail.get(
|
|
|
|
|
"only_hightlight_first", False
|
|
|
|
|
)
|
|
|
|
|
# logger.info(f"Processing Data Point: {dp_name}")
|
|
|
|
|
|
|
|
|
|
page_range_start_keywords = dp_detail.get(
|
|
|
|
|
"page_range_start_keywords", []
|
|
|
|
|
)
|
|
|
|
|
page_range_end_keywords = dp_detail.get(
|
|
|
|
|
"page_range_end_keywords", []
|
|
|
|
|
)
|
|
|
|
|
if (
|
|
|
|
|
len(page_range_start_keywords) > 0
|
|
|
|
|
and len(page_range_end_keywords) > 0
|
|
|
|
|
):
|
|
|
|
|
if not page_index in dp_range_page_list.get(dp_name, []):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# find_value = False
|
|
|
|
|
if value_text_structure == "paragraph":
|
|
|
|
|
found = dp_detail.get("found", False)
|
|
|
|
|
if found:
|
|
|
|
|
continue
|
|
|
|
|
if (
|
|
|
|
|
dp_detail.get("matched_page", -1) != -1
|
|
|
|
|
and len(dp_detail.get("bbox_list", [])) > 0
|
|
|
|
|
):
|
|
|
|
|
continue
|
|
|
|
|
sibling_paragraph_text_list = dp_detail.get(
|
|
|
|
|
"sibling_paragraph_text_list", []
|
|
|
|
|
)
|
|
|
|
|
content = {
|
|
|
|
|
"data_point": dp_biz_name,
|
|
|
|
|
"data_point_db_name": dp_name,
|
|
|
|
|
"data_point_level": dp_level,
|
|
|
|
|
"found_page": page_index,
|
|
|
|
|
"bbox": None,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
found_dict = self.highlight_matching_paragraph_text(
|
|
|
|
|
pdf_doc=pdf_doc,
|
|
|
|
|
page_index=page_index,
|
|
|
|
|
search_paragraph_text=dp_value_text,
|
|
|
|
|
sibling_paragraph_text_list=sibling_paragraph_text_list,
|
|
|
|
|
next_page_found_lines=next_page_found_lines,
|
|
|
|
|
content=content,
|
|
|
|
|
title=dp_biz_name,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
found_lines_dict = found_dict.get("found_lines_dict", {})
|
|
|
|
|
exact_match = found_dict.get("exact_match", True)
|
|
|
|
|
exact_match_search_paragraph_text = found_dict.get(
|
|
|
|
|
"exact_match_search_paragraph_text", dp_value_text
|
|
|
|
|
)
|
|
|
|
|
found_lines_dict_keys = list(found_lines_dict.keys())
|
|
|
|
|
if len(found_lines_dict_keys) > 0:
|
|
|
|
|
found_next_page_lines = False
|
|
|
|
|
if exact_match:
|
|
|
|
|
dp_detail["found"] = True
|
|
|
|
|
for found_page_index, found_lines_info in found_lines_dict.items():
|
|
|
|
|
bbox_list = found_lines_info.get("bbox_list", [])
|
|
|
|
|
bbox_normalized_list = self.get_bbox_normalized(
|
|
|
|
|
page, bbox_list
|
|
|
|
|
)
|
|
|
|
|
found_lines = found_lines_info.get("found_lines", [])
|
|
|
|
|
if found_page_index == page_index + 1:
|
|
|
|
|
next_page_found_lines = found_lines
|
|
|
|
|
found_next_page_lines = True
|
|
|
|
|
found_text = " ".join(found_lines).strip()
|
|
|
|
|
data = {
|
|
|
|
|
"pdf_file": self.simple_pdf_file,
|
|
|
|
|
"dp_name": dp_name,
|
|
|
|
|
"dp_biz_name": dp_biz_name,
|
|
|
|
|
"dp_level": dp_level,
|
|
|
|
|
"ground_truth": dp_value_text,
|
|
|
|
|
"ground_truth_text": dp_value_text,
|
|
|
|
|
"search_str": "",
|
|
|
|
|
"found_page": found_page_index,
|
|
|
|
|
"found_value": found_text,
|
|
|
|
|
"found_value_context": found_text,
|
|
|
|
|
"found_bbox": bbox_list,
|
|
|
|
|
"found_bbox_normalized": bbox_normalized_list,
|
|
|
|
|
"output_file": output_file,
|
|
|
|
|
"action": action,
|
|
|
|
|
"comment": "found page number is page number, as page number starts from 0.",
|
|
|
|
|
}
|
|
|
|
|
if dp_name not in find_value_dp_list:
|
|
|
|
|
find_value_dp_list.append(dp_name)
|
|
|
|
|
data_list.append(data)
|
|
|
|
|
else:
|
|
|
|
|
for dp_name, dp_detail in dp_Value_info.items():
|
|
|
|
|
value_text_structure = dp_detail.get("value_text_structure", "word")
|
|
|
|
|
if value_text_structure == "paragraph":
|
|
|
|
|
dp_value_text = dp_detail.get("value_text", "")
|
|
|
|
|
if dp_value_text == exact_match_search_paragraph_text:
|
|
|
|
|
dp_detail["found"] = True
|
|
|
|
|
break
|
|
|
|
|
for found_page_index, found_lines_info in found_lines_dict.items():
|
|
|
|
|
bbox_list = found_lines_info.get("bbox_list", [])
|
|
|
|
|
bbox_normalized_list = self.get_bbox_normalized(
|
|
|
|
|
page, bbox_list
|
|
|
|
|
)
|
|
|
|
|
found_lines = found_lines_info.get("found_lines", [])
|
|
|
|
|
if found_page_index == page_index + 1:
|
|
|
|
|
next_page_found_lines = found_lines
|
|
|
|
|
found_next_page_lines = True
|
|
|
|
|
found_text = " ".join(found_lines).strip()
|
|
|
|
|
data = {
|
|
|
|
|
"pdf_file": self.simple_pdf_file,
|
|
|
|
|
"dp_name": dp_name,
|
|
|
|
|
"dp_biz_name": dp_biz_name,
|
|
|
|
|
"dp_level": dp_level,
|
|
|
|
|
"ground_truth": exact_match_search_paragraph_text,
|
|
|
|
|
"ground_truth_text": exact_match_search_paragraph_text,
|
|
|
|
|
"search_str": "",
|
|
|
|
|
"found_page": found_page_index,
|
|
|
|
|
"found_value": found_text,
|
|
|
|
|
"found_value_context": found_text,
|
|
|
|
|
"found_bbox": bbox_list,
|
|
|
|
|
"found_bbox_normalized": bbox_normalized_list,
|
|
|
|
|
"output_file": output_file,
|
|
|
|
|
"action": action,
|
|
|
|
|
"comment": "found page number is page number, as page number starts from 0.",
|
|
|
|
|
}
|
|
|
|
|
if dp_name not in find_value_dp_list:
|
|
|
|
|
find_value_dp_list.append(dp_name)
|
|
|
|
|
data_list.append(data)
|
|
|
|
|
if not found_next_page_lines:
|
|
|
|
|
next_page_found_lines = []
|
|
|
|
|
else:
|
|
|
|
|
if search_str_list is not None and len(search_str_list) > 0:
|
|
|
|
|
matched_blocks = []
|
|
|
|
|
for search_str in search_str_list:
|
|
|
|
|
found_blocks = self.search_for_text(
|
|
|
|
|
page_text, search_str
|
|
|
|
|
)
|
|
|
|
|
if found_blocks:
|
|
|
|
|
matched_blocks.extend(found_blocks)
|
|
|
|
|
else:
|
|
|
|
|
# get matched blocks by similarity for dp_value_text
|
|
|
|
|
# the data point value is string, not totally same as the value in the database.
|
|
|
|
|
# such as, dp_name is FundName or ShareClassName or Advisor or Strategy
|
|
|
|
|
matched_blocks = []
|
|
|
|
|
if matched_blocks:
|
|
|
|
|
for matched_block in matched_blocks:
|
|
|
|
|
dp_value = ""
|
|
|
|
|
if inner_context_regex != "":
|
|
|
|
|
dp_value_text_search = re.search(inner_context_regex, matched_block, re.IGNORECASE)
|
|
|
|
|
if dp_value_text_search is not None:
|
|
|
|
|
dp_value_text = dp_value_text_search.group(0).strip()
|
|
|
|
|
# remove special characters
|
|
|
|
|
dp_value_text = re.sub(r"\W+", " ", dp_value_text).strip()
|
|
|
|
|
if dp_value_text == "may":
|
|
|
|
|
continue
|
|
|
|
|
if dp_value_text != "" and len(text_value_dict.keys()) > 0:
|
|
|
|
|
dp_value = text_value_dict.get(dp_value_text.lower(), "")
|
|
|
|
|
else:
|
|
|
|
|
dp_value_text = matched_block
|
|
|
|
|
dp_value = matched_block
|
|
|
|
|
else:
|
|
|
|
|
if dp_value_text == "":
|
|
|
|
|
dp_value_text = matched_block
|
|
|
|
|
dp_value = matched_block
|
|
|
|
|
content = {
|
|
|
|
|
"data_point_name": dp_biz_name,
|
|
|
|
|
"data_point_db_name": dp_name,
|
|
|
|
|
"data_point_level": dp_level,
|
|
|
|
|
"page_index": page_index,
|
|
|
|
|
"dp_value_text": dp_value_text,
|
|
|
|
|
"dp_value": dp_value,
|
|
|
|
|
"bbox": None,
|
|
|
|
|
}
|
|
|
|
|
if only_hightlight_value_text and dp_value_text != "" and dp_value_text != matched_block:
|
|
|
|
|
matching_val_area = self.highlight_matching_data(
|
|
|
|
|
page=page,
|
|
|
|
|
text_block=matched_block,
|
|
|
|
|
highlight_text_inside_block=dp_value_text,
|
|
|
|
|
content=content,
|
|
|
|
|
title=dp_biz_name,
|
|
|
|
|
only_hightlight_first=only_hightlight_first,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
matching_val_area = self.highlight_matching_data(
|
|
|
|
|
page=page,
|
|
|
|
|
text_block=matched_block,
|
|
|
|
|
highlight_text_inside_block=None,
|
|
|
|
|
content=content,
|
|
|
|
|
title=dp_biz_name,
|
|
|
|
|
only_hightlight_first=only_hightlight_first,
|
|
|
|
|
)
|
|
|
|
|
if len(matching_val_area) > 0:
|
|
|
|
|
matching_val_area_list.extend(matching_val_area)
|
|
|
|
|
|
|
|
|
|
if len(matching_val_area) > 0:
|
|
|
|
|
bbox_list = []
|
|
|
|
|
for area in matching_val_area:
|
|
|
|
|
bbox_list.append(
|
|
|
|
|
[area.x0, area.y0, area.x1, area.y1]
|
|
|
|
|
)
|
|
|
|
|
bbox_normalized_list = self.get_bbox_normalized(
|
|
|
|
|
page, bbox_list
|
|
|
|
|
)
|
|
|
|
|
data = {
|
|
|
|
|
"pdf_file": self.simple_pdf_file,
|
|
|
|
|
"dp_name": dp_name,
|
|
|
|
|
"dp_biz_name": dp_biz_name,
|
|
|
|
|
"dp_level": dp_level,
|
|
|
|
|
"ground_truth": dp_value,
|
|
|
|
|
"ground_truth_text": dp_value_text,
|
|
|
|
|
"search_str": search_str,
|
|
|
|
|
"found_page": page_index,
|
|
|
|
|
"found_value_text": dp_value_text,
|
|
|
|
|
"found_value": dp_value,
|
|
|
|
|
"found_value_context": matched_block.strip(),
|
|
|
|
|
"found_bbox": bbox_list,
|
|
|
|
|
"found_bbox_normalized": bbox_normalized_list,
|
|
|
|
|
"output_file": output_file,
|
|
|
|
|
"action": action,
|
|
|
|
|
"comment": "found page number is page number, as page number starts from 0.",
|
|
|
|
|
}
|
|
|
|
|
if dp_name not in find_value_dp_list:
|
|
|
|
|
find_value_dp_list.append(dp_name)
|
|
|
|
|
data_list.append(data)
|
|
|
|
|
# find_value = True
|
|
|
|
|
if only_hightlight_first:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if len(find_value_dp_list) == 0:
|
|
|
|
|
output_file = ""
|
|
|
|
|
for dp_name, dp_detail in dp_Value_info.items():
|
|
|
|
|
if dp_name not in find_value_dp_list:
|
|
|
|
|
dp_biz_name = dp_detail.get("biz_name", "")
|
|
|
|
|
dp_level = dp_detail.get("level", "")
|
|
|
|
|
dp_value = dp_detail.get("value", "")
|
|
|
|
|
dp_value_text = dp_detail.get("value_text", "")
|
|
|
|
|
data = {
|
|
|
|
|
"pdf_file": self.simple_pdf_file,
|
|
|
|
|
"dp_name": dp_name,
|
|
|
|
|
"dp_biz_name": dp_biz_name,
|
|
|
|
|
"dp_level": dp_level,
|
|
|
|
|
"ground_truth": dp_value,
|
|
|
|
|
"ground_truth_text": dp_value_text,
|
|
|
|
|
"search_str": "",
|
|
|
|
|
"found_page": -1,
|
|
|
|
|
"found_value_text": "",
|
|
|
|
|
"found_value": -1,
|
|
|
|
|
"found_value_context": "",
|
|
|
|
|
"found_bbox": [],
|
|
|
|
|
"found_bbox_normalized": [],
|
|
|
|
|
"output_file": output_file,
|
|
|
|
|
"action": action,
|
|
|
|
|
"comment": f"Not found {dp_biz_name} in the document.",
|
|
|
|
|
}
|
|
|
|
|
data_list.append(data)
|
|
|
|
|
logger.info(
|
|
|
|
|
f"{len(matching_val_area_list)} Match(es) In Input File: {self.pdf_file}"
|
|
|
|
|
)
|
|
|
|
|
# Save to output
|
|
|
|
|
pdf_doc.save(output_buffer)
|
|
|
|
|
pdf_doc.close()
|
|
|
|
|
if len(find_value_dp_list) > 0 and \
|
|
|
|
|
output_file is not None and \
|
|
|
|
|
output_file != "":
|
|
|
|
|
# Save the output buffer to the output file
|
|
|
|
|
with open(output_file, mode="wb") as f:
|
|
|
|
|
f.write(output_buffer.getbuffer())
|
|
|
|
|
logger.info(f"File saved to {output_file}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error processing file: {e}")
|
|
|
|
|
print_exc()
|
|
|
|
|
if len(data_list) == 0:
|
|
|
|
|
data = {
|
|
|
|
|
"pdf_file": self.simple_pdf_file,
|
|
|
|
|
"dp_name": "",
|
|
|
|
|
"dp_biz_name": "",
|
|
|
|
|
"dp_level": "",
|
|
|
|
|
"ground_truth": "",
|
|
|
|
|
"ground_truth_text": "",
|
|
|
|
|
"search_str": "",
|
|
|
|
|
"found_page": -1,
|
|
|
|
|
"found_value_text": "",
|
|
|
|
|
"found_value": -1,
|
|
|
|
|
"found_value_context": "",
|
|
|
|
|
"found_bbox": [],
|
|
|
|
|
"found_bbox_normalized": [],
|
|
|
|
|
"output_file": output_file,
|
|
|
|
|
"action": action,
|
|
|
|
|
"comment": "",
|
|
|
|
|
}
|
|
|
|
|
data_list.append(data)
|
|
|
|
|
return data_list
|
|
|
|
|
|
|
|
|
|
def get_bbox_normalized(self, page, bbox):
|
|
|
|
|
page_width = page.rect.width
|
|
|
|
|
page_height = page.rect.height
|
|
|
|
|
bbox_normalized = []
|
|
|
|
|
for box in bbox:
|
|
|
|
|
x0 = box[0] / page_width
|
|
|
|
|
y0 = box[1] / page_height
|
|
|
|
|
x1 = box[2] / page_width
|
|
|
|
|
y1 = box[3] / page_height
|
|
|
|
|
bbox_normalized.append([x0, y0, x1, y1])
|
|
|
|
|
return bbox_normalized
|
|
|
|
|
|
|
|
|
|
def find_value_by_regex(self, page_text, search_str: str):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def get_high_similarity_text(
|
|
|
|
|
self, page_text, search_str: str, threshold: float = 0.8
|
|
|
|
|
):
|
|
|
|
|
matched_values = []
|
|
|
|
|
page_text_list = page_text.split("\n")
|
|
|
|
|
return matched_values
|
|
|
|
|
|
|
|
|
|
def remove_highlght(self, output_file: str, pages: Tuple = None):
|
|
|
|
|
"""
|
|
|
|
|
1. Open the input file.
|
|
|
|
|
2. Create a memory buffer for storing temporarily the output file.
|
|
|
|
|
3. Iterate throughout the pages of the input file and checks if annotations are found.
|
|
|
|
|
4. Delete these annotations.
|
|
|
|
|
5. Display a message signaling the status of this process.
|
|
|
|
|
6. Close the input file.
|
|
|
|
|
7. Save the memory buffer to the output file.
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f"Removing Highlights from {self.pdf_file}")
|
|
|
|
|
try:
|
|
|
|
|
# Save the generated PDF to memory buffer
|
|
|
|
|
pdf_doc = fitz.open(self.pdf_file)
|
|
|
|
|
try:
|
|
|
|
|
pdf_encrypted = pdf_doc.isEncrypted
|
|
|
|
|
except:
|
|
|
|
|
pdf_encrypted = pdf_doc.is_encrypted
|
|
|
|
|
if pdf_encrypted:
|
|
|
|
|
pdf_doc.authenticate("")
|
|
|
|
|
output_buffer = BytesIO()
|
|
|
|
|
# Initialize a counter for annotations
|
|
|
|
|
annot_found = 0
|
|
|
|
|
# Iterate through pages
|
|
|
|
|
for pg in range(pdf_doc.page_count):
|
|
|
|
|
# If required for specific pages
|
|
|
|
|
if pages:
|
|
|
|
|
if str(pg) not in pages:
|
|
|
|
|
continue
|
|
|
|
|
# Select the page
|
|
|
|
|
page = pdf_doc[pg]
|
|
|
|
|
annot = page.first_annot
|
|
|
|
|
while annot:
|
|
|
|
|
annot_found += 1
|
|
|
|
|
page.delete_annot(annot)
|
|
|
|
|
annot = annot.next
|
|
|
|
|
if annot_found >= 0:
|
|
|
|
|
print(f"Annotation(s) Found In The Input File: {self.pdf_file}")
|
|
|
|
|
# Save to output
|
|
|
|
|
pdf_doc.save(output_buffer)
|
|
|
|
|
pdf_doc.close()
|
|
|
|
|
# Save the output buffer to the output file
|
|
|
|
|
with open(output_file, mode="wb") as f:
|
|
|
|
|
f.write(output_buffer.getbuffer())
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Error removing highlights: {e}")
|
|
|
|
|
print_exc()
|
|
|
|
|
|
|
|
|
|
def process_file(
|
|
|
|
|
self,
|
|
|
|
|
output_file: str,
|
|
|
|
|
dp_Value_info: dict = None,
|
|
|
|
|
pages: Tuple = None,
|
|
|
|
|
action: str = "Highlight",
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
To process one single file
|
|
|
|
|
Redact, Frame, Highlight... one PDF File
|
|
|
|
|
Remove Highlights from a single PDF File
|
|
|
|
|
action: Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f"Processing {self.pdf_file}")
|
|
|
|
|
|
|
|
|
|
if output_file is None:
|
|
|
|
|
output_file = self.pdf_file
|
|
|
|
|
|
|
|
|
|
data_list = []
|
|
|
|
|
# Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove
|
|
|
|
|
if action == "Remove":
|
|
|
|
|
# Remove the Highlights except Redactions
|
|
|
|
|
self.remove_highlght(output_file=output_file, pages=pages)
|
|
|
|
|
else:
|
|
|
|
|
data_list = self.process_data(
|
|
|
|
|
output_file=output_file,
|
|
|
|
|
dp_Value_info=dp_Value_info,
|
|
|
|
|
pages=pages,
|
|
|
|
|
action=action,
|
|
|
|
|
)
|
|
|
|
|
return data_list
|