dc-ml-emea-ar/utils/pdf_util.py

1619 lines
74 KiB
Python
Raw Normal View History

2024-08-19 14:52:13 +00:00
# Import Libraries
from typing import Tuple
from io import BytesIO
import os
import argparse
import re
import fitz
import json
from traceback import print_exc
from tqdm import tqdm
2024-08-26 16:19:07 +00:00
import base64
2024-11-08 17:22:35 +00:00
from copy import deepcopy
2024-08-19 14:52:13 +00:00
from utils.similarity import Similarity
from utils.logger import logger
class PDFUtil:
def __init__(self, pdf_file: str) -> None:
self.pdf_file = pdf_file
self.simple_pdf_file = os.path.basename(self.pdf_file)
self.is_valid_path()
self.similarity = Similarity()
def is_valid_path(self):
"""
Validates the path inputted and checks whether it is a file path or a folder path
"""
if not self.pdf_file:
raise ValueError(f"Invalid Path")
if os.path.isfile(self.pdf_file) and self.pdf_file.endswith(".pdf"):
return True
else:
raise ValueError(
f"Invalid Path {self.pdf_file}, please input the correct pdf file path."
)
def extract_info(self) -> Tuple[bool, dict]:
"""
Extracts file info
"""
logger.info(f"Extracting file info from {self.pdf_file}")
# Open the PDF
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
output = {
"File": self.pdf_file,
"Encrypted": ("True" if pdf_encrypted else "False"),
}
# If PDF is encrypted the file metadata cannot be extracted
if not pdf_encrypted:
for key, value in pdf_doc.metadata.items():
output[key] = value
# To Display File Info
logger.info(
"## File Information ##################################################"
)
logger.info("\n".join("{}:{}".format(i, j) for i, j in output.items()))
logger.info(
"######################################################################"
)
pdf_doc.close()
return True, output
def extract_text(self, output_folder: str = None) -> Tuple[bool, str, dict]:
2024-08-19 14:52:13 +00:00
"""
Extracts text from PDF
"""
# Extract text
try:
logger.info(f"Extracting text from {self.pdf_file}")
text = ""
page_text_dict = {}
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
for page in pdf_doc:
page_text = page.get_text()
text += page_text + "\n"
page_text_dict[page.number] = page_text
# To Display Extracted Text
# logger.info(
# "## Extracted Text ####################################################"
# )
# logger.info(text)
# logger.info(
# "######################################################################"
# )
# Save to file
if output_folder:
txt_output_folder = os.path.join(output_folder, 'pdf_text/')
os.makedirs(txt_output_folder, exist_ok=True)
txt_file = os.path.join(txt_output_folder, self.simple_pdf_file.replace(".pdf", ".txt"))
with open(txt_file, "w", encoding="utf-8") as file:
2024-08-19 14:52:13 +00:00
file.write(text.strip())
json_output_folder = os.path.join(output_folder, 'pdf_json/')
os.makedirs(json_output_folder, exist_ok=True)
json_file = os.path.join(json_output_folder, self.simple_pdf_file.replace(".pdf", ".json"))
with open(json_file, "w", encoding="utf-8") as file:
json.dump(page_text_dict, file, indent=4)
2024-08-19 14:52:13 +00:00
pdf_doc.close()
return True, text, page_text_dict
except Exception as e:
logger.error(f"Error extracting text: {e}")
print_exc()
return False, str(e), {}
2024-08-26 16:19:07 +00:00
def extract_images(self,
zoom:float = 2.0,
pdf_page_index_list: list = None,
output_folder: str = None):
try:
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
if pdf_page_index_list is None or len(pdf_page_index_list) == 0:
pdf_page_index_list = range(pdf_doc.page_count)
pdf_base_name = os.path.basename(self.pdf_file).replace(".pdf", "")
mat = fitz.Matrix(zoom, zoom)
output_data = {}
for page_num in tqdm(pdf_page_index_list, disable=False):
page = pdf_doc[page_num]
pix = page.get_pixmap(matrix=mat)
img_buffer = pix.tobytes(output='png')
output_data[page_num] = {}
img_base64 = base64.b64encode(img_buffer).decode('utf-8')
if output_folder and len(output_folder) > 0:
os.makedirs(output_folder, exist_ok=True)
image_file = os.path.join(output_folder, f"{pdf_base_name}_{page_num}.png")
pix.save(image_file)
output_data[page_num]["img_file"] = image_file
output_data[page_num]["img_base64"] = img_base64
return output_data
except Exception as e:
logger.error(f"Error extracting images: {e}")
print_exc()
return {}
2024-09-19 21:29:26 +00:00
def extract_image_from_page(self,
page_index: int,
zoom:float = 2.0,
output_folder: str = None):
try:
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
pdf_base_name = os.path.basename(self.pdf_file).replace(".pdf", "")
mat = fitz.Matrix(zoom, zoom)
page = pdf_doc[page_index]
pix = page.get_pixmap(matrix=mat)
img_buffer = pix.tobytes(output='png')
img_base64 = base64.b64encode(img_buffer).decode('utf-8')
if output_folder and len(output_folder) > 0:
os.makedirs(output_folder, exist_ok=True)
image_file = os.path.join(output_folder, f"{pdf_base_name}_{page_index}.png")
pix.save(image_file)
pdf_doc.close()
return img_base64
except Exception as e:
logger.error(f"Error extracting image from page: {e}")
print_exc()
return None
2024-08-26 16:19:07 +00:00
2024-08-19 14:52:13 +00:00
def parse_blocks_page(self, page: fitz.Page):
blocks = page.get_text("blocks")
list_of_blocks = []
for block in blocks:
x0, y0, x1, y1, lines_in_the_block, block_no, block_type = block
list_of_blocks.append(
{
"bbox": [x0, y0, x1, y1],
"lines_in_the_block": lines_in_the_block,
"block_no": block_no,
"block_type": block_type,
}
)
return list_of_blocks
def parse_all_blocks(self):
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
pdf_blocks = {}
for page_num in tqdm(range(pdf_doc.page_count), disable=False):
page = pdf_doc[page_num]
blocks = self.parse_blocks_page(page)
pdf_blocks[page_num] = blocks
return pdf_blocks
def search_for_text(self, page_text, search_str):
"""
Search for the search string within the document lines
"""
# Find all matches within one line
result_iter = re.finditer(search_str, page_text, re.IGNORECASE)
results = [
result.group() for result in result_iter if result.group().strip() != ""
]
# In case multiple matches within one line
return results
def redact_matching_data(self, page, matched_value):
"""
Redacts matching values
"""
logger.info(f"Redacting matching values in {self.pdf_file}")
matching_val_area = page.search_for(matched_value)
# Redact matching values
[
page.add_redact_annot(area, text=" ", fill=(0, 0, 0))
for area in matching_val_area
]
# Apply the redaction
page.apply_redactions()
return matching_val_area
def frame_matching_data(self, page, matched_value):
"""
frames matching values
"""
matching_val_area = page.search_for(matched_value)
for area in matching_val_area:
if isinstance(area, fitz.fitz.Rect):
# Draw a rectangle around matched values
annot = page.add_redact_annot(area)
# , fill = fitz.utils.getColor('black')
annot.setColors(stroke=fitz.utils.getColor("red"))
# If you want to remove matched data
# page.addFreetextAnnot(area, ' ')
annot.update()
return matching_val_area
def highlight_rectangle(self,
pdf_doc: fitz.Document,
page_index: int,
bbox: list,
title: str = "",
content: dict = {}):
"""
Highlight rectangle
"""
rectangle = fitz.Rect(bbox[0], bbox[1], bbox[2], bbox[3])
page = pdf_doc[page_index]
highlight = page.add_highlight_annot([rectangle])
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
def batch_drilldown(self,
drilldown_data_list: list,
output_pdf_folder: str = None):
pdf_doc = fitz.open(self.pdf_file)
annotation_list = []
for drilldown_data in drilldown_data_list:
page_index = drilldown_data["page_index"]
data_point = drilldown_data["data_point"]
if isinstance(data_point, list):
data_point = ", ".join(data_point)
parent_text_block = drilldown_data.get("parent_text_block", None)
highlight_value = drilldown_data["value"]
annotation_attribute = drilldown_data.get("annotation_attribute", {})
if isinstance(highlight_value, str):
annotation_list.append(self.highlight_pdf_doc(
pdf_doc=pdf_doc,
page_index=page_index,
highlight_value=highlight_value,
parent_text_block=parent_text_block,
data_point=data_point,
annotation_attribute=annotation_attribute
))
elif isinstance(highlight_value, list):
for value in highlight_value:
annotation_list.append(self.highlight_pdf_doc(
pdf_doc=pdf_doc,
page_index=page_index,
highlight_value=value,
parent_text_block=parent_text_block,
data_point=data_point,
annotation_attribute=annotation_attribute
))
elif isinstance(highlight_value, dict):
for key, value in highlight_value.items():
annotation_list.append(self.highlight_pdf_doc(
pdf_doc=pdf_doc,
page_index=page_index,
highlight_value=value,
parent_text_block=parent_text_block,
data_point=f"{data_point}, {key}",
annotation_attribute=annotation_attribute
))
else:
2024-11-12 17:20:38 +00:00
highlight_value = str(highlight_value)
annotation_list.append(self.highlight_pdf_doc(
pdf_doc=pdf_doc,
page_index=page_index,
highlight_value=highlight_value,
parent_text_block=parent_text_block,
data_point=data_point,
annotation_attribute=annotation_attribute
))
if output_pdf_folder is not None and len(output_pdf_folder) > 0:
os.makedirs(output_pdf_folder, exist_ok=True)
pdf_file_path = self.save_annotated_pdf(pdf_doc=pdf_doc,
output_pdf_folder=output_pdf_folder)
result = {"drilldown_pdf_doc": pdf_doc,
"annotation_list": annotation_list}
return result
def save_annotated_pdf(self, pdf_doc: fitz.Document, output_pdf_folder: str):
try:
if output_pdf_folder is None or len(output_pdf_folder) == 0 or not os.path.exists(output_pdf_folder):
return
if pdf_doc is None and pdf_doc.is_closed:
return
pdf_file_name = os.path.basename(self.pdf_file)
pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf")
output_pdf_dir = os.path.join(output_pdf_folder, "pdf/")
os.makedirs(output_pdf_dir, exist_ok=True)
pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name)
output_buffer = BytesIO()
pdf_doc.save(output_buffer)
# Save the output buffer to the output file
with open(pdf_file_path, mode="wb") as f:
f.write(output_buffer.getbuffer())
pdf_doc.close()
logger.info(f"File saved to {pdf_file_path}")
return pdf_file_path
except Exception as e:
print_exc()
logger.error(f"Error when save output file: {e}")
def highlight_pdf_doc(self,
pdf_doc: fitz.Document,
page_index: int,
highlight_value: str,
parent_text_block: str = None,
data_point: str = None,
annotation_attribute: dict = {}):
page = pdf_doc[page_index]
page_text = page.get_text()
parent_text_block_search_text = None
if parent_text_block is not None:
parent_text_block_regex = self.add_slash_to_text_as_regex(parent_text_block)
parent_text_block_search = re.search(parent_text_block_regex, page_text)
parent_text_block_search_text = None
if parent_text_block_search is not None:
parent_text_block_search_text = parent_text_block_search.group()
highlight_value_search_text = ""
2024-11-12 17:20:38 +00:00
if highlight_value is not None and len(highlight_value.strip()) > 0:
pure_highlight_value = highlight_value.strip()
if len(pure_highlight_value.split()) == 1 and \
(len(pure_highlight_value) < 3 or pure_highlight_value[0].upper() == pure_highlight_value[0]):
highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value)
highlight_value_search = re.search(highlight_value_regex, page_text)
else:
2024-11-12 17:20:38 +00:00
highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value, match_special_char_after_space=False)
highlight_value_search = re.search(highlight_value_regex, page_text, re.IGNORECASE)
2024-11-12 17:20:38 +00:00
if highlight_value_search is None:
highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value, match_special_char_after_space=True)
highlight_value_search = re.search(highlight_value_regex, page_text)
if highlight_value_search is not None:
highlight_value_search_text = highlight_value_search.group()
annotation_data = {"pdf_file": self.simple_pdf_file,
"page_index": page_index,
"data_point": data_point,
"value": highlight_value,
"matching_val_area": []}
if highlight_value_search_text is not None:
content = {
"data_point": data_point,
"data_value": highlight_value
}
# append annotation_attribute to content
content.update(annotation_attribute)
if len(highlight_value_search_text.strip().split()) > 3:
merge_nearby_lines = True
else:
merge_nearby_lines = False
if parent_text_block_search_text is not None:
matching_val_area = self.highlight_matching_data(
page=page,
text_block=parent_text_block_search_text,
highlight_text_inside_block=highlight_value_search_text,
content=content,
title=data_point,
only_hightlight_first=False,
merge_nearby_lines=merge_nearby_lines
)
else:
matching_val_area = self.highlight_matching_data(
page=page,
text_block=highlight_value_search_text,
content=content,
title=data_point,
only_hightlight_first=False,
merge_nearby_lines=merge_nearby_lines
)
bbox_list = []
for area in matching_val_area:
bbox = [area.x0, area.y0, area.x1, area.y1]
bbox_list.append(bbox)
# order bbox_list by y0, x0, y1, x1
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
annotation_data["matching_val_area"] = bbox_list
return annotation_data
2024-11-12 17:20:38 +00:00
def add_slash_to_text_as_regex(self, text: str, match_special_char_after_space: bool = True):
if text is None or len(text) == 0:
return text
special_char_iter = re.finditer("\W", text)
for special_iter in special_char_iter:
if len(special_iter.group().strip()) == 0:
continue
replace = r"\{0}".format(special_iter.group())
if replace not in text:
text = re.sub(replace, r"\\W", text)
text = re.sub(r"( ){2,}", " ", text)
2024-11-12 17:20:38 +00:00
if match_special_char_after_space:
text = text.replace(" ", r"\s*\W*")
else:
text = text.replace(" ", r"\s*")
return text
2024-08-19 14:52:13 +00:00
def highlight_matching_data(
self,
page,
text_block,
within_bbox: list = None,
highlight_text_inside_block: str = None,
content: dict = {},
title: str = "",
only_hightlight_first: bool = False,
exact_match: bool = False,
2024-11-08 17:22:35 +00:00
merge_nearby_lines: bool = False,
2024-08-19 14:52:13 +00:00
):
"""
Highlight matching values
2024-11-08 17:22:35 +00:00
page: page object in fitz.Document
within_bbox: bounding box to search for the text
text_block: text to search for in page text
highlight_text_inside_block: text to highlight inside parameter: text_block
content: content as JSON format to add to the highlight annotation,
please customize according to relevant business logic
title: title of the highlight annotation
only_hightlight_first: only highlight the first match
exact_match: exact match or not
merge_nearby_lines: merge nearby lines or not
2024-08-19 14:52:13 +00:00
"""
# logger.info(f"Highlighting matching values in {self.pdf_file}")
if text_block is None or len(text_block.strip()) == 0:
return []
2024-08-19 14:52:13 +00:00
if within_bbox is not None:
matching_val_area = page.search_for(
text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])
)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''),
clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]))
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('-\n', ''),
clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]))
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block)
else:
matching_val_area = page.search_for(text_block)
2024-11-08 17:22:35 +00:00
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.strip())
2024-08-19 14:52:13 +00:00
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''))
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('-\n', ''))
2024-11-08 21:00:34 +00:00
if len(matching_val_area) > 0 and len(text_block.strip().split()) == 1:
new_matching_val_area = []
for area in matching_val_area:
# get text by text_bbox
pure_text_block = text_block.strip()
raw_area_text = page.get_text("text", clip=area).strip()
if len(text_block.strip()) < 3 and text_block.strip() != raw_area_text.strip():
continue
2024-11-08 21:00:34 +00:00
copy_area = deepcopy(area)
copy_area.x0 -= 15
copy_area.x1 += 15
2024-11-08 21:00:34 +00:00
text = page.get_text("text", clip=copy_area).strip()
if text == pure_text_block:
new_matching_val_area.append(area)
else:
# get start and end index of the pure_text_block in text
start_index = text.find(pure_text_block)
if start_index == -1:
continue
2024-11-08 21:00:34 +00:00
if start_index > 0:
previous_char = text[start_index - 1]
if previous_char not in [" ", "("]:
continue
end_index = start_index + len(pure_text_block)
if end_index < len(text):
next_char = text[end_index]
if next_char not in [" ", "%", ")"]:
continue
new_matching_val_area.append(area)
matching_val_area = new_matching_val_area
2024-08-19 14:52:13 +00:00
if (
highlight_text_inside_block is not None
and len(highlight_text_inside_block) > 0
2024-11-08 21:00:34 +00:00
and len(matching_val_area) > 0
2024-08-19 14:52:13 +00:00
):
highlight_bbox_list = []
2024-11-08 17:22:35 +00:00
merged_matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines)
pure_number_regex = re.compile(r"^\d+$")
for area in merged_matching_val_area:
2024-08-19 14:52:13 +00:00
text_bbox_area = page.search_for(
highlight_text_inside_block,
clip=[area.x0, area.y0, area.x1, area.y1],
)
if text_bbox_area is not None and len(text_bbox_area) > 0:
if only_hightlight_first:
highlight_bbox_list.append(text_bbox_area[0])
break
else:
2024-11-08 17:22:35 +00:00
pure_number_match = pure_number_regex.match(highlight_text_inside_block)
if pure_number_match is not None and pure_number_match.group() == highlight_text_inside_block:
2024-11-08 21:00:34 +00:00
for area in text_bbox_area:
2024-11-08 17:22:35 +00:00
# get text by text_bbox
2024-11-08 21:00:34 +00:00
copy_area = deepcopy(area)
copy_area.x0 -= 15
copy_area.x1 += 15
2024-11-08 21:00:34 +00:00
text = page.get_text("text", clip=copy_area).strip()
2024-11-08 17:22:35 +00:00
if text == highlight_text_inside_block:
2024-11-08 21:00:34 +00:00
highlight_bbox_list.append(area)
2024-11-08 17:22:35 +00:00
else:
# get start and end index of the highlight_text_inside_block in text
start_index = text.find(highlight_text_inside_block)
if start_index > 0:
previous_char = text[start_index - 1]
if previous_char not in [" ", "("]:
continue
end_index = start_index + len(highlight_text_inside_block)
if end_index < len(text):
next_char = text[end_index]
if next_char not in [" ", "%", ")"]:
continue
2024-11-08 21:00:34 +00:00
highlight_bbox_list.append(area)
2024-11-08 17:22:35 +00:00
else:
highlight_bbox_list.extend(text_bbox_area)
if len(highlight_bbox_list) == 0 and len(highlight_text_inside_block.strip().split()) > 2:
highlight_bbox_list = text_bbox_area = page.search_for(
highlight_text_inside_block
)
2024-08-19 14:52:13 +00:00
matching_val_area = highlight_bbox_list
else:
if only_hightlight_first:
matching_val_area = [matching_val_area[0]]
if matching_val_area is not None and len(matching_val_area) > 0:
2024-11-08 21:00:34 +00:00
if (highlight_text_inside_block is not None and len(highlight_text_inside_block.strip().split()) > 1) or \
(highlight_text_inside_block is None and len(text_block.strip().split()) > 1):
matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines)
2024-08-19 14:52:13 +00:00
if exact_match:
matching_val_area = self.get_exact_match_area(page, matching_val_area, text_block)
# matching_val_area = self.merge_matching_val_area(matching_val_area)
for area in matching_val_area:
highlight = page.add_highlight_annot([area])
bbox_list = [area.x0, area.y0, area.x1, area.y1]
content["bbox"] = bbox_list
2024-11-08 17:22:35 +00:00
normalized_bbox = self.get_bbox_normalized(page, [bbox_list])
if len(normalized_bbox) > 0:
content["normalized_bbox"] = normalized_bbox[0]
else:
content["normalized_bbox"] = []
2024-08-19 14:52:13 +00:00
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
return matching_val_area
def get_exact_match_area(self, page, matching_val_area, search_text):
results = []
for area in matching_val_area:
area_text = page.get_text("text", clip=area).strip()
area_text_list = area_text.split()
search_text_list = search_text.split()
capital_not_match = False
any_word_match = False
for search_split in search_text_list:
if search_split in area_text_list:
any_word_match = True
search_split_lower = search_split.lower()
if search_split_lower in area_text_list and \
search_split not in area_text_list:
capital_not_match = True
break
if capital_not_match:
continue
elif any_word_match:
results.append(area)
else:
pass
return results
2024-11-08 17:22:35 +00:00
def merge_matching_val_area(self, matching_val_area, merge_nearby_lines=False):
2024-08-19 14:52:13 +00:00
"""
Merge the matching val areas which with same y0 and y1,
the x0 is the min x0, x1 is the max x1
"""
if matching_val_area is None or len(matching_val_area) == 0:
return matching_val_area
if len(matching_val_area) == 1:
return matching_val_area
# unify the y0 and y1 which are close to each other (less than 5 pixels)
y0_list = []
y1_list = []
for area in matching_val_area:
y0 = area.y0
y1 = area.y1
if len(y0_list) == 0:
y0_list.append(y0)
y1_list.append(y1)
else:
for t_y0 in y0_list:
if abs(t_y0 - y0) < 5:
area.y0 = t_y0
else:
if y0 not in y0_list:
y0_list.append(y0)
for t_y1 in y1_list:
if abs(t_y1 - y1) < 5:
area.y1 = t_y1
else:
if y1 not in y1_list:
y1_list.append(y1)
# get area list which with same y0 and y1
y0_y1_list = list(set([(area.y0, area.y1) for area in matching_val_area]))
new_matching_val_area = []
for y0_y1 in y0_y1_list:
y0 = y0_y1[0]
y1 = y0_y1[1]
x0_list = [area.x0 for area in matching_val_area if area.y0 == y0 and area.y1 == y1]
x1_list = [area.x1 for area in matching_val_area if area.y0 == y0 and area.y1 == y1]
min_x0 = min(x0_list)
max_x1 = max(x1_list)
new_matching_val_area.append(fitz.Rect(min_x0, y0, max_x1, y1))
2024-11-08 17:22:35 +00:00
if merge_nearby_lines and len(new_matching_val_area) > 1:
new_matching_val_area = self.merge_nearby_lines(new_matching_val_area)
# merge again
if len(new_matching_val_area) > 1:
new_matching_val_area = self.merge_nearby_lines(new_matching_val_area)
elif len(new_matching_val_area) > 1:
new_matching_val_area = self.remove_small_pitches(new_matching_val_area)
else:
pass
return new_matching_val_area
def remove_small_pitches(self, matching_val_area):
x_mini_threshold = 5
new_matching_val_area = []
for area in matching_val_area:
if area.x1 - area.x0 > x_mini_threshold:
new_matching_val_area.append(area)
return new_matching_val_area
def merge_nearby_lines(self, matching_val_area):
bbox_list = []
for bbox in matching_val_area:
bbox = [bbox.x0, bbox.y0, bbox.x1, bbox.y1]
bbox_list.append(bbox)
# order bbox_list by y0, x0, y1, x1
bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2]))
new_matching_val_area = []
last_x0 = None
last_x1 = None
last_y0 = None
last_y1 = None
x_mini_threshold = 5
y_threshold = 15
x_threshold = 10
for index, bbox in enumerate(bbox_list):
if bbox[2] - bbox[0] <= x_mini_threshold:
continue
if index == 0 or last_x0 is None:
last_x0 = bbox[0]
last_y0 = bbox[1]
last_x1 = bbox[2]
last_y1 = bbox[3]
continue
x0 = bbox[0]
y0 = bbox[1]
x1 = bbox[2]
y1 = bbox[3]
last_x0_x1_range = [i for i in range(int(last_x0), int(last_x1))]
x0_x1_range = [i for i in range(int(x0), int(x1))]
x_intersection = list(set(last_x0_x1_range).intersection(set(x0_x1_range)))
# abs(y0 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)
# exist nearby line as vertical direction,
# the horizontal coordinates are intersected or the horizontal coordinates are close to each other
# abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold
# exist nearby line as horizontal direction,
# last sentence is the begin of the current sentence
# abs(y1 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)
# last sentence and current sentence are in the same horizontal line
# the horizontal coordinates of are last sentence and current sentence intersected
# or the horizontal coordinates are close to each other
if (abs(y0 - last_y1) <= y_threshold and
(len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)) or \
(abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold) or \
(abs(y1 - last_y1) <= y_threshold and
(len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)):
last_x0 = min(last_x0, x0)
last_x1 = max(last_x1, x1)
last_y0 = min(last_y0, y0)
last_y1 = max(last_y1, y1)
else:
new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1))
last_x0 = x0
last_x1 = x1
last_y0 = y0
last_y1 = y1
new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1))
2024-08-19 14:52:13 +00:00
return new_matching_val_area
def highlight_matching_paragraph_text(
self,
pdf_doc: fitz.Document,
page_index: int,
search_paragraph_text: str,
sibling_paragraph_text_list: list = [],
next_page_found_lines: list = [],
content: dict = {},
title: str = "",
):
page = pdf_doc[page_index]
page_text = page.get_text("text")
page_lines = [
line for line in page_text.split("\n") if len(line.strip()) > 0
]
matching_val_area = []
find_begin = False
search_paragraph_text_words = [
word.strip()
for word in search_paragraph_text.lower().split()
if len(word.strip()) > 0
]
found_words = []
found_lines = []
jacard_similarity = 0
found_matched = False
found_lines_dict = {}
for index, line in enumerate(page_lines):
if len(next_page_found_lines) > 0:
if line in next_page_found_lines:
continue
words = [
word.strip() for word in line.lower().split() if len(word.strip()) > 0
]
if len(words) == 0:
continue
if find_begin:
found_words.extend(words)
new_jacard_similarity = self.similarity.jaccard_similarity(
search_paragraph_text_words, found_words
)
if new_jacard_similarity > jacard_similarity:
jacard_similarity = new_jacard_similarity
found_lines.append(line)
else:
if jacard_similarity > 0.4:
found_matched = True
break
else:
if search_paragraph_text_words[0].lower() in line.lower() and \
search_paragraph_text_words[1].lower() in line.lower() and \
search_paragraph_text_words[2].lower() in line.lower():
jacard_similarity = self.similarity.jaccard_similarity(
search_paragraph_text_words, words
)
if jacard_similarity > 0.05:
find_begin = True
found_words.extend(words)
found_lines.append(line)
if jacard_similarity > 0.4:
found_matched = True
if found_matched and len(found_lines) > 0:
total_matching_val_area = []
for line in found_lines:
matching_val_area = page.search_for(line)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(line.strip())
if len(matching_val_area) == 0:
continue
elif len(matching_val_area) == 1:
total_matching_val_area.extend(matching_val_area)
else:
y1_list = [area.y1 for area in matching_val_area]
if len(total_matching_val_area) == 0:
y1_min = max(y1_list)
y1_min_index = y1_list.index(y1_min)
total_matching_val_area.append(matching_val_area[y1_min_index])
else:
last_y1 = total_matching_val_area[-1].y1
latest_bigger_y1_list = max(
[y1 for y1 in y1_list if y1 > last_y1]
)
latest_bigger_y1_index = y1_list.index(latest_bigger_y1_list)
total_matching_val_area.append(
matching_val_area[latest_bigger_y1_index]
)
# get min x0, min y0, max x1, max y1 from total_matching_val_area
x0_list = [area.x0 for area in total_matching_val_area]
y0_list = [area.y0 for area in total_matching_val_area]
x1_list = [area.x1 for area in total_matching_val_area]
y1_list = [area.y1 for area in total_matching_val_area]
min_x0 = min(x0_list)
min_y0 = min(y0_list)
max_x1 = max(x1_list)
max_y1 = max(y1_list)
matching_val_area = [fitz.Rect(min_x0, min_y0, max_x1, max_y1)]
highlight = page.add_highlight_annot(matching_val_area)
bbox_list = [[min_x0, min_y0, max_x1, max_y1]]
content["bbox"] = bbox_list
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
found_lines_dict = {
page_index: {"bbox_list": bbox_list, "found_lines": found_lines}
}
# jacard_similarity is between 0.4 and 0.9,
# perhaps there are left lines in next page, so we need to check the next page.
if jacard_similarity > 0.4 and jacard_similarity < 0.9:
next_page_index = page_index + 1
if next_page_index < pdf_doc.page_count:
next_page = pdf_doc[next_page_index]
next_page_text = next_page.get_text("text")
next_page_lines = [
line
for line in next_page_text.split("\n")
if len(line.strip()) > 0
]
found_line_index = -1
for i in range(10):
if len(next_page_lines) < i + 1:
break
next_page_line = next_page_lines[i]
words = [
word.strip()
for word in next_page_line.lower().split()
if len(word.strip()) > 0
]
if len(words) == 0:
continue
temp_found_words = found_words + words
new_jacard_similarity = self.similarity.jaccard_similarity(
search_paragraph_text_words, temp_found_words
)
if new_jacard_similarity > jacard_similarity:
found_line_index = i
break
if found_line_index != -1:
new_found_words = found_words
new_found_lines = []
found_matched = False
for index, line in enumerate(next_page_lines):
if index < found_line_index:
continue
words = [
word.strip()
for word in line.lower().split()
if len(word.strip()) > 0
]
if len(words) == 0:
continue
new_found_words.extend(words)
new_jacard_similarity = (
self.similarity.jaccard_similarity(
search_paragraph_text_words, new_found_words
)
)
if new_jacard_similarity > jacard_similarity:
jacard_similarity = new_jacard_similarity
new_found_lines.append(line)
else:
break
if len(new_found_lines) > 0:
total_matching_val_area = []
for line in new_found_lines:
matching_val_area = next_page.search_for(line)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(line.strip())
if len(matching_val_area) == 0:
continue
elif len(matching_val_area) == 1:
total_matching_val_area.extend(matching_val_area)
else:
y1_list = [area.y1 for area in matching_val_area]
if len(total_matching_val_area) == 0:
y1_min = max(y1_list)
y1_min_index = y1_list.index(y1_min)
total_matching_val_area.append(
matching_val_area[y1_min_index]
)
else:
last_y1 = total_matching_val_area[-1].y1
latest_bigger_y1_list = max(
[y1 for y1 in y1_list if y1 > last_y1]
)
latest_bigger_y1_index = y1_list.index(
latest_bigger_y1_list
)
total_matching_val_area.append(
matching_val_area[latest_bigger_y1_index]
)
# get min x0, min y0, max x1, max y1 from total_matching_val_area
x0_list = [area.x0 for area in total_matching_val_area]
y0_list = [area.y0 for area in total_matching_val_area]
x1_list = [area.x1 for area in total_matching_val_area]
y1_list = [area.y1 for area in total_matching_val_area]
min_x0 = min(x0_list)
min_y0 = min(y0_list)
max_x1 = max(x1_list)
max_y1 = max(y1_list)
matching_val_area = [
fitz.Rect(min_x0, min_y0, max_x1, max_y1)
]
highlight = next_page.add_highlight_annot(matching_val_area)
new_bbox_list = [[min_x0, min_y0, max_x1, max_y1]]
content["found_page"] = next_page_index
content["bbox"] = new_bbox_list
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
found_lines_dict[next_page_index] = {
"bbox_list": new_bbox_list,
"found_lines": new_found_lines,
}
found_lines_dict_keys = list(found_lines_dict.keys())
exact_match = True
exact_match_search_paragraph_text = search_paragraph_text
if len(found_lines_dict_keys) > 0 and len(sibling_paragraph_text_list) > 0:
found_line_list = []
for key in found_lines_dict_keys:
found_line_list.extend(found_lines_dict[key]["found_lines"])
found_line_words = []
for line in found_line_list:
words = [
word.strip()
for word in line.lower().split()
if len(word.strip()) > 0
]
found_line_words.extend(words)
max_sibling_jacard_similarity = 0
max_sibling_jacard_similarity_index = -1
for index, sibling_paragraph_text in enumerate(
sibling_paragraph_text_list
):
sibling_paragraph_text_words = [
word.strip()
for word in sibling_paragraph_text.lower().split()
if len(word.strip()) > 0
]
sibling_jacard_similarity = self.similarity.jaccard_similarity(
sibling_paragraph_text_words, found_line_words
)
if sibling_jacard_similarity > max_sibling_jacard_similarity:
max_sibling_jacard_similarity = sibling_jacard_similarity
max_sibling_jacard_similarity_index = index
if max_sibling_jacard_similarity > jacard_similarity:
exact_match = False
exact_match_search_paragraph_text = sibling_paragraph_text_list[
max_sibling_jacard_similarity_index
]
return {
"found_lines_dict": found_lines_dict,
"exact_match": exact_match,
"exact_match_search_paragraph_text": exact_match_search_paragraph_text,
}
def get_page_range_by_keywords(
self,
pdf_doc,
start_keywords,
end_keywords,
return_page_text_list=False,
):
"""
Get page range by keywords
pdf_doc: pdf document
page_range_start_keywords: list of start keywords
page_range_end_keywords: list of end keywords
return_page_text_list: return page text list or not
"""
start_page = -1
end_page = -1
if len(start_keywords) == 0 or len(end_keywords) == 0:
start_page = 0
if len(start_keywords) == 0 and len(end_keywords) == 0:
end_page = pdf_doc.page_count - 1
search_start = 0
# avoid to search the TOC part
if pdf_doc.page_count > 20:
search_start = 8
for page_index in range(search_start, pdf_doc.page_count):
if start_page >= 0 and end_page >= 0:
break
page = pdf_doc[page_index]
page_text = page.get_text("text").strip()
page_text_list = [
split.strip()
for split in page_text.split("\n")
if len(split.strip()) > 0
]
if start_page == -1:
find = self.find_keywords_in_text_list(page_text_list, start_keywords)
if find:
start_page = page_index
if start_page >= 0 and end_page == -1:
find = self.find_keywords_in_text_list(page_text_list, end_keywords)
if find:
end_page = page_index
break
# return page_list which starts from start_page and ends at end_page
page_text_list = []
if start_page >= 0 and end_page >= 0:
page_list = [i for i in range(start_page, end_page)]
if return_page_text_list:
for page_index in page_list:
page = pdf_doc[page_index]
page_text = page.get_text("text").strip()
page_text_list.append(page_text)
else:
page_list = []
return page_list, page_text_list
def exist_keywords_in_text_list(self, page_text, keywords):
page_text_list = [
split.strip() for split in page_text.split("\n") if len(split.strip()) > 0
]
find = self.find_keywords_in_text_list(page_text_list, keywords)
return find
def find_keywords_in_text_list(self, text_list, keywords):
"""
Find keywords in text list
"""
find = False
for keyword in keywords:
for index, line in enumerate(text_list):
if line.lower().startswith(keyword.lower()):
lower_case_begin_words_count = (
self.get_lower_case_begin_words_count(line)
)
if lower_case_begin_words_count > 3:
continue
if line.upper() == line:
find = True
break
if index != 0:
lower_case_begin_words_count = (
self.get_lower_case_begin_words_count(text_list[index - 1])
)
if lower_case_begin_words_count > 3:
continue
if index > 5:
if "." in line or "," in line:
continue
find = True
break
if find:
break
return find
def get_lower_case_begin_words_count(self, text):
count = 0
for word in text.split():
if word[0].islower():
count += 1
return count
def process_data(
self,
output_file: str,
dp_Value_info: dict,
pages: Tuple = None,
action: str = "Highlight",
):
"""
Process the pages of the PDF File
1. Open the input file.
2. Create a memory buffer for storing temporarily the output file.
3. Initialize a variable for storing the total number of matches of the string we were searching for.
4. Iterate throughout the selected pages of the input file and split the current page into lines.
5. Search for the string within the page.
6. Apply the corresponding action (i.e "Redact", "Frame", "Highlight", etc.)
7. Display a message signaling the status of the search process.
8. Save and close the input file.
9. Save the memory buffer to the output file.
output_file: The path of the PDF file to generate after processing.
dp_Value_info: The information for data points.
pages: The pages to consider while processing the PDF file.
action: The action to perform on the PDF file.
"""
logger.info(f"Processing {self.pdf_file}")
data_list = []
try:
# Save the generated PDF to memory buffer
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
output_buffer = BytesIO()
find_value_dp_list = []
matching_val_area_list = []
page_list = [i for i in range(pdf_doc.page_count)]
dp_range_page_list = {}
for dp_name, dp_detail in dp_Value_info.items():
if not isinstance(dp_detail, dict):
continue
page_range_start_keywords = dp_detail.get(
"page_range_start_keywords", []
)
page_range_end_keywords = dp_detail.get("page_range_end_keywords", [])
if (
len(page_range_start_keywords) > 0
and len(page_range_end_keywords) > 0
):
page_list, page_text_list = self.get_page_range_by_keywords(
pdf_doc,
page_range_start_keywords,
page_range_end_keywords,
return_page_text_list=False,
)
dp_range_page_list[dp_name] = page_list
# Iterate through pages
next_page_found_lines = []
for page_index in page_list:
# If required for specific pages
if pages:
if page_index not in pages:
continue
# Select the page
page = pdf_doc[page_index]
# Get Matching Data
# Split page by lines
page_text = page.get_text("text")
# if page_index in [24, 25]:
# print(page_text)
for dp_name, dp_detail in dp_Value_info.items():
if not isinstance(dp_detail, dict):
continue
dp_biz_name = dp_detail.get("biz_name", "")
dp_level = dp_detail.get("level", "")
dp_value = dp_detail.get("value", "")
value_text_type = dp_detail.get("value_text_type", "string")
if value_text_type == "string":
dp_value_text = dp_detail.get("value_text", "")
elif value_text_type == "list":
dp_value_text = dp_detail.get("value_text", [])
else:
dp_value_text = dp_detail.get("value_text", "")
value_text_structure = dp_detail.get("value_text_structure", "word")
inner_context_regex = dp_detail.get("inner_context_regex", "")
text_value_dict = dp_detail.get("text_value_dict", {})
search_str_list = dp_detail.get("regex_list", [])
only_hightlight_value_text = dp_detail.get(
"only_hightlight_value_text", False
)
only_hightlight_first = dp_detail.get(
"only_hightlight_first", False
)
# logger.info(f"Processing Data Point: {dp_name}")
page_range_start_keywords = dp_detail.get(
"page_range_start_keywords", []
)
page_range_end_keywords = dp_detail.get(
"page_range_end_keywords", []
)
if (
len(page_range_start_keywords) > 0
and len(page_range_end_keywords) > 0
):
if not page_index in dp_range_page_list.get(dp_name, []):
continue
# find_value = False
if value_text_structure == "paragraph":
found = dp_detail.get("found", False)
if found:
continue
if (
dp_detail.get("matched_page", -1) != -1
and len(dp_detail.get("bbox_list", [])) > 0
):
continue
sibling_paragraph_text_list = dp_detail.get(
"sibling_paragraph_text_list", []
)
content = {
"data_point": dp_biz_name,
"data_point_db_name": dp_name,
"data_point_level": dp_level,
"found_page": page_index,
"bbox": None,
}
found_dict = self.highlight_matching_paragraph_text(
pdf_doc=pdf_doc,
page_index=page_index,
search_paragraph_text=dp_value_text,
sibling_paragraph_text_list=sibling_paragraph_text_list,
next_page_found_lines=next_page_found_lines,
content=content,
title=dp_biz_name,
)
found_lines_dict = found_dict.get("found_lines_dict", {})
exact_match = found_dict.get("exact_match", True)
exact_match_search_paragraph_text = found_dict.get(
"exact_match_search_paragraph_text", dp_value_text
)
found_lines_dict_keys = list(found_lines_dict.keys())
if len(found_lines_dict_keys) > 0:
found_next_page_lines = False
if exact_match:
dp_detail["found"] = True
for found_page_index, found_lines_info in found_lines_dict.items():
bbox_list = found_lines_info.get("bbox_list", [])
bbox_normalized_list = self.get_bbox_normalized(
page, bbox_list
)
found_lines = found_lines_info.get("found_lines", [])
if found_page_index == page_index + 1:
next_page_found_lines = found_lines
found_next_page_lines = True
found_text = " ".join(found_lines).strip()
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": dp_value_text,
"ground_truth_text": dp_value_text,
"search_str": "",
"found_page": found_page_index,
"found_value": found_text,
"found_value_context": found_text,
"found_bbox": bbox_list,
"found_bbox_normalized": bbox_normalized_list,
"output_file": output_file,
"action": action,
"comment": "found page number is page number, as page number starts from 0.",
}
if dp_name not in find_value_dp_list:
find_value_dp_list.append(dp_name)
data_list.append(data)
else:
for dp_name, dp_detail in dp_Value_info.items():
value_text_structure = dp_detail.get("value_text_structure", "word")
if value_text_structure == "paragraph":
dp_value_text = dp_detail.get("value_text", "")
if dp_value_text == exact_match_search_paragraph_text:
dp_detail["found"] = True
break
for found_page_index, found_lines_info in found_lines_dict.items():
bbox_list = found_lines_info.get("bbox_list", [])
bbox_normalized_list = self.get_bbox_normalized(
page, bbox_list
)
found_lines = found_lines_info.get("found_lines", [])
if found_page_index == page_index + 1:
next_page_found_lines = found_lines
found_next_page_lines = True
found_text = " ".join(found_lines).strip()
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": exact_match_search_paragraph_text,
"ground_truth_text": exact_match_search_paragraph_text,
"search_str": "",
"found_page": found_page_index,
"found_value": found_text,
"found_value_context": found_text,
"found_bbox": bbox_list,
"found_bbox_normalized": bbox_normalized_list,
"output_file": output_file,
"action": action,
"comment": "found page number is page number, as page number starts from 0.",
}
if dp_name not in find_value_dp_list:
find_value_dp_list.append(dp_name)
data_list.append(data)
if not found_next_page_lines:
next_page_found_lines = []
else:
if search_str_list is not None and len(search_str_list) > 0:
matched_blocks = []
for search_str in search_str_list:
found_blocks = self.search_for_text(
page_text, search_str
)
if found_blocks:
matched_blocks.extend(found_blocks)
else:
# get matched blocks by similarity for dp_value_text
# the data point value is string, not totally same as the value in the database.
# such as, dp_name is FundName or ShareClassName or Advisor or Strategy
matched_blocks = []
if matched_blocks:
for matched_block in matched_blocks:
dp_value = ""
if inner_context_regex != "":
dp_value_text_search = re.search(inner_context_regex, matched_block, re.IGNORECASE)
if dp_value_text_search is not None:
dp_value_text = dp_value_text_search.group(0).strip()
# remove special characters
dp_value_text = re.sub(r"\W+", " ", dp_value_text).strip()
if dp_value_text == "may":
continue
if dp_value_text != "" and len(text_value_dict.keys()) > 0:
dp_value = text_value_dict.get(dp_value_text.lower(), "")
else:
dp_value_text = matched_block
dp_value = matched_block
else:
if dp_value_text == "":
dp_value_text = matched_block
dp_value = matched_block
content = {
"data_point_name": dp_biz_name,
"data_point_db_name": dp_name,
"data_point_level": dp_level,
"page_index": page_index,
"dp_value_text": dp_value_text,
"dp_value": dp_value,
"bbox": None,
}
if only_hightlight_value_text and dp_value_text != "" and dp_value_text != matched_block:
matching_val_area = self.highlight_matching_data(
page=page,
text_block=matched_block,
highlight_text_inside_block=dp_value_text,
content=content,
title=dp_biz_name,
only_hightlight_first=only_hightlight_first,
)
else:
matching_val_area = self.highlight_matching_data(
page=page,
text_block=matched_block,
highlight_text_inside_block=None,
content=content,
title=dp_biz_name,
only_hightlight_first=only_hightlight_first,
)
if len(matching_val_area) > 0:
matching_val_area_list.extend(matching_val_area)
if len(matching_val_area) > 0:
bbox_list = []
for area in matching_val_area:
bbox_list.append(
[area.x0, area.y0, area.x1, area.y1]
)
bbox_normalized_list = self.get_bbox_normalized(
page, bbox_list
)
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": dp_value,
"ground_truth_text": dp_value_text,
"search_str": search_str,
"found_page": page_index,
"found_value_text": dp_value_text,
"found_value": dp_value,
"found_value_context": matched_block.strip(),
"found_bbox": bbox_list,
"found_bbox_normalized": bbox_normalized_list,
"output_file": output_file,
"action": action,
"comment": "found page number is page number, as page number starts from 0.",
}
if dp_name not in find_value_dp_list:
find_value_dp_list.append(dp_name)
data_list.append(data)
# find_value = True
if only_hightlight_first:
break
if len(find_value_dp_list) == 0:
output_file = ""
for dp_name, dp_detail in dp_Value_info.items():
if dp_name not in find_value_dp_list:
dp_biz_name = dp_detail.get("biz_name", "")
dp_level = dp_detail.get("level", "")
dp_value = dp_detail.get("value", "")
dp_value_text = dp_detail.get("value_text", "")
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": dp_value,
"ground_truth_text": dp_value_text,
"search_str": "",
"found_page": -1,
"found_value_text": "",
"found_value": -1,
"found_value_context": "",
"found_bbox": [],
"found_bbox_normalized": [],
"output_file": output_file,
"action": action,
"comment": f"Not found {dp_biz_name} in the document.",
}
data_list.append(data)
logger.info(
f"{len(matching_val_area_list)} Match(es) In Input File: {self.pdf_file}"
)
# Save to output
pdf_doc.save(output_buffer)
pdf_doc.close()
if len(find_value_dp_list) > 0 and \
output_file is not None and \
output_file != "":
# Save the output buffer to the output file
with open(output_file, mode="wb") as f:
f.write(output_buffer.getbuffer())
logger.info(f"File saved to {output_file}")
except Exception as e:
logger.error(f"Error processing file: {e}")
print_exc()
if len(data_list) == 0:
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": "",
"dp_biz_name": "",
"dp_level": "",
"ground_truth": "",
"ground_truth_text": "",
"search_str": "",
"found_page": -1,
"found_value_text": "",
"found_value": -1,
"found_value_context": "",
"found_bbox": [],
"found_bbox_normalized": [],
"output_file": output_file,
"action": action,
"comment": "",
}
data_list.append(data)
return data_list
def get_bbox_normalized(self, page, bbox):
page_width = page.rect.width
page_height = page.rect.height
bbox_normalized = []
for box in bbox:
x0 = box[0] / page_width
y0 = box[1] / page_height
x1 = box[2] / page_width
y1 = box[3] / page_height
bbox_normalized.append([x0, y0, x1, y1])
return bbox_normalized
def find_value_by_regex(self, page_text, search_str: str):
pass
def get_high_similarity_text(
self, page_text, search_str: str, threshold: float = 0.8
):
matched_values = []
page_text_list = page_text.split("\n")
return matched_values
def remove_highlght(self, output_file: str, pages: Tuple = None):
"""
1. Open the input file.
2. Create a memory buffer for storing temporarily the output file.
3. Iterate throughout the pages of the input file and checks if annotations are found.
4. Delete these annotations.
5. Display a message signaling the status of this process.
6. Close the input file.
7. Save the memory buffer to the output file.
"""
logger.info(f"Removing Highlights from {self.pdf_file}")
try:
# Save the generated PDF to memory buffer
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
output_buffer = BytesIO()
# Initialize a counter for annotations
annot_found = 0
# Iterate through pages
for pg in range(pdf_doc.page_count):
# If required for specific pages
if pages:
if str(pg) not in pages:
continue
# Select the page
page = pdf_doc[pg]
annot = page.first_annot
while annot:
annot_found += 1
page.delete_annot(annot)
annot = annot.next
if annot_found >= 0:
print(f"Annotation(s) Found In The Input File: {self.pdf_file}")
# Save to output
pdf_doc.save(output_buffer)
pdf_doc.close()
# Save the output buffer to the output file
with open(output_file, mode="wb") as f:
f.write(output_buffer.getbuffer())
except Exception as e:
logger.error(f"Error removing highlights: {e}")
print_exc()
def process_file(
self,
output_file: str,
dp_Value_info: dict = None,
pages: Tuple = None,
action: str = "Highlight",
):
"""
To process one single file
Redact, Frame, Highlight... one PDF File
Remove Highlights from a single PDF File
action: Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove
"""
logger.info(f"Processing {self.pdf_file}")
if output_file is None:
output_file = self.pdf_file
data_list = []
# Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove
if action == "Remove":
# Remove the Highlights except Redactions
self.remove_highlght(output_file=output_file, pages=pages)
else:
data_list = self.process_data(
output_file=output_file,
dp_Value_info=dp_Value_info,
pages=pages,
action=action,
)
return data_list