dc-ml-emea-ar/utils/pdf_util.py

1236 lines
56 KiB
Python

# Import Libraries
from typing import Tuple
from io import BytesIO
import os
import argparse
import re
import fitz
import json
from traceback import print_exc
from tqdm import tqdm
import base64
from utils.similarity import Similarity
from utils.logger import logger
class PDFUtil:
def __init__(self, pdf_file: str) -> None:
self.pdf_file = pdf_file
self.simple_pdf_file = os.path.basename(self.pdf_file)
self.is_valid_path()
self.similarity = Similarity()
def is_valid_path(self):
"""
Validates the path inputted and checks whether it is a file path or a folder path
"""
if not self.pdf_file:
raise ValueError(f"Invalid Path")
if os.path.isfile(self.pdf_file) and self.pdf_file.endswith(".pdf"):
return True
else:
raise ValueError(
f"Invalid Path {self.pdf_file}, please input the correct pdf file path."
)
def extract_info(self) -> Tuple[bool, dict]:
"""
Extracts file info
"""
logger.info(f"Extracting file info from {self.pdf_file}")
# Open the PDF
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
output = {
"File": self.pdf_file,
"Encrypted": ("True" if pdf_encrypted else "False"),
}
# If PDF is encrypted the file metadata cannot be extracted
if not pdf_encrypted:
for key, value in pdf_doc.metadata.items():
output[key] = value
# To Display File Info
logger.info(
"## File Information ##################################################"
)
logger.info("\n".join("{}:{}".format(i, j) for i, j in output.items()))
logger.info(
"######################################################################"
)
pdf_doc.close()
return True, output
def extract_text(self, output_folder: str = None) -> Tuple[bool, str, dict]:
"""
Extracts text from PDF
"""
# Extract text
try:
logger.info(f"Extracting text from {self.pdf_file}")
text = ""
page_text_dict = {}
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
for page in pdf_doc:
page_text = page.get_text()
text += page_text + "\n"
page_text_dict[page.number] = page_text
# To Display Extracted Text
# logger.info(
# "## Extracted Text ####################################################"
# )
# logger.info(text)
# logger.info(
# "######################################################################"
# )
# Save to file
if output_folder:
txt_output_folder = os.path.join(output_folder, 'pdf_text/')
os.makedirs(txt_output_folder, exist_ok=True)
txt_file = os.path.join(txt_output_folder, self.simple_pdf_file.replace(".pdf", ".txt"))
with open(txt_file, "w", encoding="utf-8") as file:
file.write(text.strip())
json_output_folder = os.path.join(output_folder, 'pdf_json/')
os.makedirs(json_output_folder, exist_ok=True)
json_file = os.path.join(json_output_folder, self.simple_pdf_file.replace(".pdf", ".json"))
with open(json_file, "w", encoding="utf-8") as file:
json.dump(page_text_dict, file, indent=4)
pdf_doc.close()
return True, text, page_text_dict
except Exception as e:
logger.error(f"Error extracting text: {e}")
print_exc()
return False, str(e), {}
def extract_images(self,
zoom:float = 2.0,
pdf_page_index_list: list = None,
output_folder: str = None):
try:
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
if pdf_page_index_list is None or len(pdf_page_index_list) == 0:
pdf_page_index_list = range(pdf_doc.page_count)
pdf_base_name = os.path.basename(self.pdf_file).replace(".pdf", "")
mat = fitz.Matrix(zoom, zoom)
output_data = {}
for page_num in tqdm(pdf_page_index_list, disable=False):
page = pdf_doc[page_num]
pix = page.get_pixmap(matrix=mat)
img_buffer = pix.tobytes(output='png')
output_data[page_num] = {}
img_base64 = base64.b64encode(img_buffer).decode('utf-8')
if output_folder and len(output_folder) > 0:
os.makedirs(output_folder, exist_ok=True)
image_file = os.path.join(output_folder, f"{pdf_base_name}_{page_num}.png")
pix.save(image_file)
output_data[page_num]["img_file"] = image_file
output_data[page_num]["img_base64"] = img_base64
return output_data
except Exception as e:
logger.error(f"Error extracting images: {e}")
print_exc()
return {}
def parse_blocks_page(self, page: fitz.Page):
blocks = page.get_text("blocks")
list_of_blocks = []
for block in blocks:
x0, y0, x1, y1, lines_in_the_block, block_no, block_type = block
list_of_blocks.append(
{
"bbox": [x0, y0, x1, y1],
"lines_in_the_block": lines_in_the_block,
"block_no": block_no,
"block_type": block_type,
}
)
return list_of_blocks
def parse_all_blocks(self):
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
pdf_blocks = {}
for page_num in tqdm(range(pdf_doc.page_count), disable=False):
page = pdf_doc[page_num]
blocks = self.parse_blocks_page(page)
pdf_blocks[page_num] = blocks
return pdf_blocks
def search_for_text(self, page_text, search_str):
"""
Search for the search string within the document lines
"""
# Find all matches within one line
result_iter = re.finditer(search_str, page_text, re.IGNORECASE)
results = [
result.group() for result in result_iter if result.group().strip() != ""
]
# In case multiple matches within one line
return results
def redact_matching_data(self, page, matched_value):
"""
Redacts matching values
"""
logger.info(f"Redacting matching values in {self.pdf_file}")
matching_val_area = page.search_for(matched_value)
# Redact matching values
[
page.add_redact_annot(area, text=" ", fill=(0, 0, 0))
for area in matching_val_area
]
# Apply the redaction
page.apply_redactions()
return matching_val_area
def frame_matching_data(self, page, matched_value):
"""
frames matching values
"""
matching_val_area = page.search_for(matched_value)
for area in matching_val_area:
if isinstance(area, fitz.fitz.Rect):
# Draw a rectangle around matched values
annot = page.add_redact_annot(area)
# , fill = fitz.utils.getColor('black')
annot.setColors(stroke=fitz.utils.getColor("red"))
# If you want to remove matched data
# page.addFreetextAnnot(area, ' ')
annot.update()
return matching_val_area
def highlight_rectangle(self,
pdf_doc: fitz.Document,
page_index: int,
bbox: list,
title: str = "",
content: dict = {}):
"""
Highlight rectangle
"""
rectangle = fitz.Rect(bbox[0], bbox[1], bbox[2], bbox[3])
page = pdf_doc[page_index]
highlight = page.add_highlight_annot([rectangle])
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
def highlight_matching_data(
self,
page,
text_block,
within_bbox: list = None,
highlight_text_inside_block: str = None,
content: dict = {},
title: str = "",
only_hightlight_first: bool = False,
exact_match: bool = False,
):
"""
Highlight matching values
"""
# logger.info(f"Highlighting matching values in {self.pdf_file}")
if within_bbox is not None:
matching_val_area = page.search_for(
text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])
)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''),
clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]))
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('-\n', ''),
clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]))
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block)
else:
matching_val_area = page.search_for(text_block)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''))
if len(matching_val_area) == 0:
matching_val_area = page.search_for(text_block.replace('-\n', ''))
if (
highlight_text_inside_block is not None
and len(highlight_text_inside_block) > 0
):
highlight_bbox_list = []
for area in matching_val_area:
text_bbox_area = page.search_for(
highlight_text_inside_block,
clip=[area.x0, area.y0, area.x1, area.y1],
)
if text_bbox_area is not None and len(text_bbox_area) > 0:
if only_hightlight_first:
highlight_bbox_list.append(text_bbox_area[0])
break
else:
highlight_bbox_list.extend(text_bbox_area)
matching_val_area = highlight_bbox_list
else:
if only_hightlight_first:
matching_val_area = [matching_val_area[0]]
if matching_val_area is not None and len(matching_val_area) > 0:
matching_val_area = self.merge_matching_val_area(matching_val_area)
if exact_match:
matching_val_area = self.get_exact_match_area(page, matching_val_area, text_block)
# matching_val_area = self.merge_matching_val_area(matching_val_area)
for area in matching_val_area:
highlight = page.add_highlight_annot([area])
bbox_list = [area.x0, area.y0, area.x1, area.y1]
content["bbox"] = bbox_list
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
return matching_val_area
def get_exact_match_area(self, page, matching_val_area, search_text):
results = []
for area in matching_val_area:
area_text = page.get_text("text", clip=area).strip()
area_text_list = area_text.split()
search_text_list = search_text.split()
capital_not_match = False
any_word_match = False
for search_split in search_text_list:
if search_split in area_text_list:
any_word_match = True
search_split_lower = search_split.lower()
if search_split_lower in area_text_list and \
search_split not in area_text_list:
capital_not_match = True
break
if capital_not_match:
continue
elif any_word_match:
results.append(area)
else:
pass
return results
def merge_matching_val_area(self, matching_val_area):
"""
Merge the matching val areas which with same y0 and y1,
the x0 is the min x0, x1 is the max x1
"""
if matching_val_area is None or len(matching_val_area) == 0:
return matching_val_area
if len(matching_val_area) == 1:
return matching_val_area
# unify the y0 and y1 which are close to each other (less than 5 pixels)
y0_list = []
y1_list = []
for area in matching_val_area:
y0 = area.y0
y1 = area.y1
if len(y0_list) == 0:
y0_list.append(y0)
y1_list.append(y1)
else:
for t_y0 in y0_list:
if abs(t_y0 - y0) < 5:
area.y0 = t_y0
else:
if y0 not in y0_list:
y0_list.append(y0)
for t_y1 in y1_list:
if abs(t_y1 - y1) < 5:
area.y1 = t_y1
else:
if y1 not in y1_list:
y1_list.append(y1)
# get area list which with same y0 and y1
y0_y1_list = list(set([(area.y0, area.y1) for area in matching_val_area]))
new_matching_val_area = []
for y0_y1 in y0_y1_list:
y0 = y0_y1[0]
y1 = y0_y1[1]
x0_list = [area.x0 for area in matching_val_area if area.y0 == y0 and area.y1 == y1]
x1_list = [area.x1 for area in matching_val_area if area.y0 == y0 and area.y1 == y1]
min_x0 = min(x0_list)
max_x1 = max(x1_list)
new_matching_val_area.append(fitz.Rect(min_x0, y0, max_x1, y1))
return new_matching_val_area
def highlight_matching_paragraph_text(
self,
pdf_doc: fitz.Document,
page_index: int,
search_paragraph_text: str,
sibling_paragraph_text_list: list = [],
next_page_found_lines: list = [],
content: dict = {},
title: str = "",
):
page = pdf_doc[page_index]
page_text = page.get_text("text")
page_lines = [
line for line in page_text.split("\n") if len(line.strip()) > 0
]
matching_val_area = []
find_begin = False
search_paragraph_text_words = [
word.strip()
for word in search_paragraph_text.lower().split()
if len(word.strip()) > 0
]
found_words = []
found_lines = []
jacard_similarity = 0
found_matched = False
found_lines_dict = {}
for index, line in enumerate(page_lines):
if len(next_page_found_lines) > 0:
if line in next_page_found_lines:
continue
words = [
word.strip() for word in line.lower().split() if len(word.strip()) > 0
]
if len(words) == 0:
continue
if find_begin:
found_words.extend(words)
new_jacard_similarity = self.similarity.jaccard_similarity(
search_paragraph_text_words, found_words
)
if new_jacard_similarity > jacard_similarity:
jacard_similarity = new_jacard_similarity
found_lines.append(line)
else:
if jacard_similarity > 0.4:
found_matched = True
break
else:
if search_paragraph_text_words[0].lower() in line.lower() and \
search_paragraph_text_words[1].lower() in line.lower() and \
search_paragraph_text_words[2].lower() in line.lower():
jacard_similarity = self.similarity.jaccard_similarity(
search_paragraph_text_words, words
)
if jacard_similarity > 0.05:
find_begin = True
found_words.extend(words)
found_lines.append(line)
if jacard_similarity > 0.4:
found_matched = True
if found_matched and len(found_lines) > 0:
total_matching_val_area = []
for line in found_lines:
matching_val_area = page.search_for(line)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(line.strip())
if len(matching_val_area) == 0:
continue
elif len(matching_val_area) == 1:
total_matching_val_area.extend(matching_val_area)
else:
y1_list = [area.y1 for area in matching_val_area]
if len(total_matching_val_area) == 0:
y1_min = max(y1_list)
y1_min_index = y1_list.index(y1_min)
total_matching_val_area.append(matching_val_area[y1_min_index])
else:
last_y1 = total_matching_val_area[-1].y1
latest_bigger_y1_list = max(
[y1 for y1 in y1_list if y1 > last_y1]
)
latest_bigger_y1_index = y1_list.index(latest_bigger_y1_list)
total_matching_val_area.append(
matching_val_area[latest_bigger_y1_index]
)
# get min x0, min y0, max x1, max y1 from total_matching_val_area
x0_list = [area.x0 for area in total_matching_val_area]
y0_list = [area.y0 for area in total_matching_val_area]
x1_list = [area.x1 for area in total_matching_val_area]
y1_list = [area.y1 for area in total_matching_val_area]
min_x0 = min(x0_list)
min_y0 = min(y0_list)
max_x1 = max(x1_list)
max_y1 = max(y1_list)
matching_val_area = [fitz.Rect(min_x0, min_y0, max_x1, max_y1)]
highlight = page.add_highlight_annot(matching_val_area)
bbox_list = [[min_x0, min_y0, max_x1, max_y1]]
content["bbox"] = bbox_list
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
found_lines_dict = {
page_index: {"bbox_list": bbox_list, "found_lines": found_lines}
}
# jacard_similarity is between 0.4 and 0.9,
# perhaps there are left lines in next page, so we need to check the next page.
if jacard_similarity > 0.4 and jacard_similarity < 0.9:
next_page_index = page_index + 1
if next_page_index < pdf_doc.page_count:
next_page = pdf_doc[next_page_index]
next_page_text = next_page.get_text("text")
next_page_lines = [
line
for line in next_page_text.split("\n")
if len(line.strip()) > 0
]
found_line_index = -1
for i in range(10):
if len(next_page_lines) < i + 1:
break
next_page_line = next_page_lines[i]
words = [
word.strip()
for word in next_page_line.lower().split()
if len(word.strip()) > 0
]
if len(words) == 0:
continue
temp_found_words = found_words + words
new_jacard_similarity = self.similarity.jaccard_similarity(
search_paragraph_text_words, temp_found_words
)
if new_jacard_similarity > jacard_similarity:
found_line_index = i
break
if found_line_index != -1:
new_found_words = found_words
new_found_lines = []
found_matched = False
for index, line in enumerate(next_page_lines):
if index < found_line_index:
continue
words = [
word.strip()
for word in line.lower().split()
if len(word.strip()) > 0
]
if len(words) == 0:
continue
new_found_words.extend(words)
new_jacard_similarity = (
self.similarity.jaccard_similarity(
search_paragraph_text_words, new_found_words
)
)
if new_jacard_similarity > jacard_similarity:
jacard_similarity = new_jacard_similarity
new_found_lines.append(line)
else:
break
if len(new_found_lines) > 0:
total_matching_val_area = []
for line in new_found_lines:
matching_val_area = next_page.search_for(line)
if len(matching_val_area) == 0:
matching_val_area = page.search_for(line.strip())
if len(matching_val_area) == 0:
continue
elif len(matching_val_area) == 1:
total_matching_val_area.extend(matching_val_area)
else:
y1_list = [area.y1 for area in matching_val_area]
if len(total_matching_val_area) == 0:
y1_min = max(y1_list)
y1_min_index = y1_list.index(y1_min)
total_matching_val_area.append(
matching_val_area[y1_min_index]
)
else:
last_y1 = total_matching_val_area[-1].y1
latest_bigger_y1_list = max(
[y1 for y1 in y1_list if y1 > last_y1]
)
latest_bigger_y1_index = y1_list.index(
latest_bigger_y1_list
)
total_matching_val_area.append(
matching_val_area[latest_bigger_y1_index]
)
# get min x0, min y0, max x1, max y1 from total_matching_val_area
x0_list = [area.x0 for area in total_matching_val_area]
y0_list = [area.y0 for area in total_matching_val_area]
x1_list = [area.x1 for area in total_matching_val_area]
y1_list = [area.y1 for area in total_matching_val_area]
min_x0 = min(x0_list)
min_y0 = min(y0_list)
max_x1 = max(x1_list)
max_y1 = max(y1_list)
matching_val_area = [
fitz.Rect(min_x0, min_y0, max_x1, max_y1)
]
highlight = next_page.add_highlight_annot(matching_val_area)
new_bbox_list = [[min_x0, min_y0, max_x1, max_y1]]
content["found_page"] = next_page_index
content["bbox"] = new_bbox_list
content_text = json.dumps(content)
highlight.set_info(content=content_text, title=title)
highlight.update()
found_lines_dict[next_page_index] = {
"bbox_list": new_bbox_list,
"found_lines": new_found_lines,
}
found_lines_dict_keys = list(found_lines_dict.keys())
exact_match = True
exact_match_search_paragraph_text = search_paragraph_text
if len(found_lines_dict_keys) > 0 and len(sibling_paragraph_text_list) > 0:
found_line_list = []
for key in found_lines_dict_keys:
found_line_list.extend(found_lines_dict[key]["found_lines"])
found_line_words = []
for line in found_line_list:
words = [
word.strip()
for word in line.lower().split()
if len(word.strip()) > 0
]
found_line_words.extend(words)
max_sibling_jacard_similarity = 0
max_sibling_jacard_similarity_index = -1
for index, sibling_paragraph_text in enumerate(
sibling_paragraph_text_list
):
sibling_paragraph_text_words = [
word.strip()
for word in sibling_paragraph_text.lower().split()
if len(word.strip()) > 0
]
sibling_jacard_similarity = self.similarity.jaccard_similarity(
sibling_paragraph_text_words, found_line_words
)
if sibling_jacard_similarity > max_sibling_jacard_similarity:
max_sibling_jacard_similarity = sibling_jacard_similarity
max_sibling_jacard_similarity_index = index
if max_sibling_jacard_similarity > jacard_similarity:
exact_match = False
exact_match_search_paragraph_text = sibling_paragraph_text_list[
max_sibling_jacard_similarity_index
]
return {
"found_lines_dict": found_lines_dict,
"exact_match": exact_match,
"exact_match_search_paragraph_text": exact_match_search_paragraph_text,
}
def get_page_range_by_keywords(
self,
pdf_doc,
start_keywords,
end_keywords,
return_page_text_list=False,
):
"""
Get page range by keywords
pdf_doc: pdf document
page_range_start_keywords: list of start keywords
page_range_end_keywords: list of end keywords
return_page_text_list: return page text list or not
"""
start_page = -1
end_page = -1
if len(start_keywords) == 0 or len(end_keywords) == 0:
start_page = 0
if len(start_keywords) == 0 and len(end_keywords) == 0:
end_page = pdf_doc.page_count - 1
search_start = 0
# avoid to search the TOC part
if pdf_doc.page_count > 20:
search_start = 8
for page_index in range(search_start, pdf_doc.page_count):
if start_page >= 0 and end_page >= 0:
break
page = pdf_doc[page_index]
page_text = page.get_text("text").strip()
page_text_list = [
split.strip()
for split in page_text.split("\n")
if len(split.strip()) > 0
]
if start_page == -1:
find = self.find_keywords_in_text_list(page_text_list, start_keywords)
if find:
start_page = page_index
if start_page >= 0 and end_page == -1:
find = self.find_keywords_in_text_list(page_text_list, end_keywords)
if find:
end_page = page_index
break
# return page_list which starts from start_page and ends at end_page
page_text_list = []
if start_page >= 0 and end_page >= 0:
page_list = [i for i in range(start_page, end_page)]
if return_page_text_list:
for page_index in page_list:
page = pdf_doc[page_index]
page_text = page.get_text("text").strip()
page_text_list.append(page_text)
else:
page_list = []
return page_list, page_text_list
def exist_keywords_in_text_list(self, page_text, keywords):
page_text_list = [
split.strip() for split in page_text.split("\n") if len(split.strip()) > 0
]
find = self.find_keywords_in_text_list(page_text_list, keywords)
return find
def find_keywords_in_text_list(self, text_list, keywords):
"""
Find keywords in text list
"""
find = False
for keyword in keywords:
for index, line in enumerate(text_list):
if line.lower().startswith(keyword.lower()):
lower_case_begin_words_count = (
self.get_lower_case_begin_words_count(line)
)
if lower_case_begin_words_count > 3:
continue
if line.upper() == line:
find = True
break
if index != 0:
lower_case_begin_words_count = (
self.get_lower_case_begin_words_count(text_list[index - 1])
)
if lower_case_begin_words_count > 3:
continue
if index > 5:
if "." in line or "," in line:
continue
find = True
break
if find:
break
return find
def get_lower_case_begin_words_count(self, text):
count = 0
for word in text.split():
if word[0].islower():
count += 1
return count
def process_data(
self,
output_file: str,
dp_Value_info: dict,
pages: Tuple = None,
action: str = "Highlight",
):
"""
Process the pages of the PDF File
1. Open the input file.
2. Create a memory buffer for storing temporarily the output file.
3. Initialize a variable for storing the total number of matches of the string we were searching for.
4. Iterate throughout the selected pages of the input file and split the current page into lines.
5. Search for the string within the page.
6. Apply the corresponding action (i.e "Redact", "Frame", "Highlight", etc.)
7. Display a message signaling the status of the search process.
8. Save and close the input file.
9. Save the memory buffer to the output file.
output_file: The path of the PDF file to generate after processing.
dp_Value_info: The information for data points.
pages: The pages to consider while processing the PDF file.
action: The action to perform on the PDF file.
"""
logger.info(f"Processing {self.pdf_file}")
data_list = []
try:
# Save the generated PDF to memory buffer
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
output_buffer = BytesIO()
find_value_dp_list = []
matching_val_area_list = []
page_list = [i for i in range(pdf_doc.page_count)]
dp_range_page_list = {}
for dp_name, dp_detail in dp_Value_info.items():
if not isinstance(dp_detail, dict):
continue
page_range_start_keywords = dp_detail.get(
"page_range_start_keywords", []
)
page_range_end_keywords = dp_detail.get("page_range_end_keywords", [])
if (
len(page_range_start_keywords) > 0
and len(page_range_end_keywords) > 0
):
page_list, page_text_list = self.get_page_range_by_keywords(
pdf_doc,
page_range_start_keywords,
page_range_end_keywords,
return_page_text_list=False,
)
dp_range_page_list[dp_name] = page_list
# Iterate through pages
next_page_found_lines = []
for page_index in page_list:
# If required for specific pages
if pages:
if page_index not in pages:
continue
# Select the page
page = pdf_doc[page_index]
# Get Matching Data
# Split page by lines
page_text = page.get_text("text")
# if page_index in [24, 25]:
# print(page_text)
for dp_name, dp_detail in dp_Value_info.items():
if not isinstance(dp_detail, dict):
continue
dp_biz_name = dp_detail.get("biz_name", "")
dp_level = dp_detail.get("level", "")
dp_value = dp_detail.get("value", "")
value_text_type = dp_detail.get("value_text_type", "string")
if value_text_type == "string":
dp_value_text = dp_detail.get("value_text", "")
elif value_text_type == "list":
dp_value_text = dp_detail.get("value_text", [])
else:
dp_value_text = dp_detail.get("value_text", "")
value_text_structure = dp_detail.get("value_text_structure", "word")
inner_context_regex = dp_detail.get("inner_context_regex", "")
text_value_dict = dp_detail.get("text_value_dict", {})
search_str_list = dp_detail.get("regex_list", [])
only_hightlight_value_text = dp_detail.get(
"only_hightlight_value_text", False
)
only_hightlight_first = dp_detail.get(
"only_hightlight_first", False
)
# logger.info(f"Processing Data Point: {dp_name}")
page_range_start_keywords = dp_detail.get(
"page_range_start_keywords", []
)
page_range_end_keywords = dp_detail.get(
"page_range_end_keywords", []
)
if (
len(page_range_start_keywords) > 0
and len(page_range_end_keywords) > 0
):
if not page_index in dp_range_page_list.get(dp_name, []):
continue
# find_value = False
if value_text_structure == "paragraph":
found = dp_detail.get("found", False)
if found:
continue
if (
dp_detail.get("matched_page", -1) != -1
and len(dp_detail.get("bbox_list", [])) > 0
):
continue
sibling_paragraph_text_list = dp_detail.get(
"sibling_paragraph_text_list", []
)
content = {
"data_point": dp_biz_name,
"data_point_db_name": dp_name,
"data_point_level": dp_level,
"found_page": page_index,
"bbox": None,
}
found_dict = self.highlight_matching_paragraph_text(
pdf_doc=pdf_doc,
page_index=page_index,
search_paragraph_text=dp_value_text,
sibling_paragraph_text_list=sibling_paragraph_text_list,
next_page_found_lines=next_page_found_lines,
content=content,
title=dp_biz_name,
)
found_lines_dict = found_dict.get("found_lines_dict", {})
exact_match = found_dict.get("exact_match", True)
exact_match_search_paragraph_text = found_dict.get(
"exact_match_search_paragraph_text", dp_value_text
)
found_lines_dict_keys = list(found_lines_dict.keys())
if len(found_lines_dict_keys) > 0:
found_next_page_lines = False
if exact_match:
dp_detail["found"] = True
for found_page_index, found_lines_info in found_lines_dict.items():
bbox_list = found_lines_info.get("bbox_list", [])
bbox_normalized_list = self.get_bbox_normalized(
page, bbox_list
)
found_lines = found_lines_info.get("found_lines", [])
if found_page_index == page_index + 1:
next_page_found_lines = found_lines
found_next_page_lines = True
found_text = " ".join(found_lines).strip()
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": dp_value_text,
"ground_truth_text": dp_value_text,
"search_str": "",
"found_page": found_page_index,
"found_value": found_text,
"found_value_context": found_text,
"found_bbox": bbox_list,
"found_bbox_normalized": bbox_normalized_list,
"output_file": output_file,
"action": action,
"comment": "found page number is page number, as page number starts from 0.",
}
if dp_name not in find_value_dp_list:
find_value_dp_list.append(dp_name)
data_list.append(data)
else:
for dp_name, dp_detail in dp_Value_info.items():
value_text_structure = dp_detail.get("value_text_structure", "word")
if value_text_structure == "paragraph":
dp_value_text = dp_detail.get("value_text", "")
if dp_value_text == exact_match_search_paragraph_text:
dp_detail["found"] = True
break
for found_page_index, found_lines_info in found_lines_dict.items():
bbox_list = found_lines_info.get("bbox_list", [])
bbox_normalized_list = self.get_bbox_normalized(
page, bbox_list
)
found_lines = found_lines_info.get("found_lines", [])
if found_page_index == page_index + 1:
next_page_found_lines = found_lines
found_next_page_lines = True
found_text = " ".join(found_lines).strip()
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": exact_match_search_paragraph_text,
"ground_truth_text": exact_match_search_paragraph_text,
"search_str": "",
"found_page": found_page_index,
"found_value": found_text,
"found_value_context": found_text,
"found_bbox": bbox_list,
"found_bbox_normalized": bbox_normalized_list,
"output_file": output_file,
"action": action,
"comment": "found page number is page number, as page number starts from 0.",
}
if dp_name not in find_value_dp_list:
find_value_dp_list.append(dp_name)
data_list.append(data)
if not found_next_page_lines:
next_page_found_lines = []
else:
if search_str_list is not None and len(search_str_list) > 0:
matched_blocks = []
for search_str in search_str_list:
found_blocks = self.search_for_text(
page_text, search_str
)
if found_blocks:
matched_blocks.extend(found_blocks)
else:
# get matched blocks by similarity for dp_value_text
# the data point value is string, not totally same as the value in the database.
# such as, dp_name is FundName or ShareClassName or Advisor or Strategy
matched_blocks = []
if matched_blocks:
for matched_block in matched_blocks:
dp_value = ""
if inner_context_regex != "":
dp_value_text_search = re.search(inner_context_regex, matched_block, re.IGNORECASE)
if dp_value_text_search is not None:
dp_value_text = dp_value_text_search.group(0).strip()
# remove special characters
dp_value_text = re.sub(r"\W+", " ", dp_value_text).strip()
if dp_value_text == "may":
continue
if dp_value_text != "" and len(text_value_dict.keys()) > 0:
dp_value = text_value_dict.get(dp_value_text.lower(), "")
else:
dp_value_text = matched_block
dp_value = matched_block
else:
if dp_value_text == "":
dp_value_text = matched_block
dp_value = matched_block
content = {
"data_point_name": dp_biz_name,
"data_point_db_name": dp_name,
"data_point_level": dp_level,
"page_index": page_index,
"dp_value_text": dp_value_text,
"dp_value": dp_value,
"bbox": None,
}
if only_hightlight_value_text and dp_value_text != "" and dp_value_text != matched_block:
matching_val_area = self.highlight_matching_data(
page=page,
text_block=matched_block,
highlight_text_inside_block=dp_value_text,
content=content,
title=dp_biz_name,
only_hightlight_first=only_hightlight_first,
)
else:
matching_val_area = self.highlight_matching_data(
page=page,
text_block=matched_block,
highlight_text_inside_block=None,
content=content,
title=dp_biz_name,
only_hightlight_first=only_hightlight_first,
)
if len(matching_val_area) > 0:
matching_val_area_list.extend(matching_val_area)
if len(matching_val_area) > 0:
bbox_list = []
for area in matching_val_area:
bbox_list.append(
[area.x0, area.y0, area.x1, area.y1]
)
bbox_normalized_list = self.get_bbox_normalized(
page, bbox_list
)
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": dp_value,
"ground_truth_text": dp_value_text,
"search_str": search_str,
"found_page": page_index,
"found_value_text": dp_value_text,
"found_value": dp_value,
"found_value_context": matched_block.strip(),
"found_bbox": bbox_list,
"found_bbox_normalized": bbox_normalized_list,
"output_file": output_file,
"action": action,
"comment": "found page number is page number, as page number starts from 0.",
}
if dp_name not in find_value_dp_list:
find_value_dp_list.append(dp_name)
data_list.append(data)
# find_value = True
if only_hightlight_first:
break
if len(find_value_dp_list) == 0:
output_file = ""
for dp_name, dp_detail in dp_Value_info.items():
if dp_name not in find_value_dp_list:
dp_biz_name = dp_detail.get("biz_name", "")
dp_level = dp_detail.get("level", "")
dp_value = dp_detail.get("value", "")
dp_value_text = dp_detail.get("value_text", "")
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": dp_name,
"dp_biz_name": dp_biz_name,
"dp_level": dp_level,
"ground_truth": dp_value,
"ground_truth_text": dp_value_text,
"search_str": "",
"found_page": -1,
"found_value_text": "",
"found_value": -1,
"found_value_context": "",
"found_bbox": [],
"found_bbox_normalized": [],
"output_file": output_file,
"action": action,
"comment": f"Not found {dp_biz_name} in the document.",
}
data_list.append(data)
logger.info(
f"{len(matching_val_area_list)} Match(es) In Input File: {self.pdf_file}"
)
# Save to output
pdf_doc.save(output_buffer)
pdf_doc.close()
if len(find_value_dp_list) > 0 and \
output_file is not None and \
output_file != "":
# Save the output buffer to the output file
with open(output_file, mode="wb") as f:
f.write(output_buffer.getbuffer())
logger.info(f"File saved to {output_file}")
except Exception as e:
logger.error(f"Error processing file: {e}")
print_exc()
if len(data_list) == 0:
data = {
"pdf_file": self.simple_pdf_file,
"dp_name": "",
"dp_biz_name": "",
"dp_level": "",
"ground_truth": "",
"ground_truth_text": "",
"search_str": "",
"found_page": -1,
"found_value_text": "",
"found_value": -1,
"found_value_context": "",
"found_bbox": [],
"found_bbox_normalized": [],
"output_file": output_file,
"action": action,
"comment": "",
}
data_list.append(data)
return data_list
def get_bbox_normalized(self, page, bbox):
page_width = page.rect.width
page_height = page.rect.height
bbox_normalized = []
for box in bbox:
x0 = box[0] / page_width
y0 = box[1] / page_height
x1 = box[2] / page_width
y1 = box[3] / page_height
bbox_normalized.append([x0, y0, x1, y1])
return bbox_normalized
def find_value_by_regex(self, page_text, search_str: str):
pass
def get_high_similarity_text(
self, page_text, search_str: str, threshold: float = 0.8
):
matched_values = []
page_text_list = page_text.split("\n")
return matched_values
def remove_highlght(self, output_file: str, pages: Tuple = None):
"""
1. Open the input file.
2. Create a memory buffer for storing temporarily the output file.
3. Iterate throughout the pages of the input file and checks if annotations are found.
4. Delete these annotations.
5. Display a message signaling the status of this process.
6. Close the input file.
7. Save the memory buffer to the output file.
"""
logger.info(f"Removing Highlights from {self.pdf_file}")
try:
# Save the generated PDF to memory buffer
pdf_doc = fitz.open(self.pdf_file)
try:
pdf_encrypted = pdf_doc.isEncrypted
except:
pdf_encrypted = pdf_doc.is_encrypted
if pdf_encrypted:
pdf_doc.authenticate("")
output_buffer = BytesIO()
# Initialize a counter for annotations
annot_found = 0
# Iterate through pages
for pg in range(pdf_doc.page_count):
# If required for specific pages
if pages:
if str(pg) not in pages:
continue
# Select the page
page = pdf_doc[pg]
annot = page.first_annot
while annot:
annot_found += 1
page.delete_annot(annot)
annot = annot.next
if annot_found >= 0:
print(f"Annotation(s) Found In The Input File: {self.pdf_file}")
# Save to output
pdf_doc.save(output_buffer)
pdf_doc.close()
# Save the output buffer to the output file
with open(output_file, mode="wb") as f:
f.write(output_buffer.getbuffer())
except Exception as e:
logger.error(f"Error removing highlights: {e}")
print_exc()
def process_file(
self,
output_file: str,
dp_Value_info: dict = None,
pages: Tuple = None,
action: str = "Highlight",
):
"""
To process one single file
Redact, Frame, Highlight... one PDF File
Remove Highlights from a single PDF File
action: Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove
"""
logger.info(f"Processing {self.pdf_file}")
if output_file is None:
output_file = self.pdf_file
data_list = []
# Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove
if action == "Remove":
# Remove the Highlights except Redactions
self.remove_highlght(output_file=output_file, pages=pages)
else:
data_list = self.process_data(
output_file=output_file,
dp_Value_info=dp_Value_info,
pages=pages,
action=action,
)
return data_list