# Import Libraries from typing import Tuple from io import BytesIO import os import argparse import re import fitz import json from traceback import print_exc from tqdm import tqdm from utils.similarity import Similarity from utils.logger import logger class PDFUtil: def __init__(self, pdf_file: str) -> None: self.pdf_file = pdf_file self.simple_pdf_file = os.path.basename(self.pdf_file) self.is_valid_path() self.similarity = Similarity() def is_valid_path(self): """ Validates the path inputted and checks whether it is a file path or a folder path """ if not self.pdf_file: raise ValueError(f"Invalid Path") if os.path.isfile(self.pdf_file) and self.pdf_file.endswith(".pdf"): return True else: raise ValueError( f"Invalid Path {self.pdf_file}, please input the correct pdf file path." ) def extract_info(self) -> Tuple[bool, dict]: """ Extracts file info """ logger.info(f"Extracting file info from {self.pdf_file}") # Open the PDF pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted output = { "File": self.pdf_file, "Encrypted": ("True" if pdf_encrypted else "False"), } # If PDF is encrypted the file metadata cannot be extracted if not pdf_encrypted: for key, value in pdf_doc.metadata.items(): output[key] = value # To Display File Info logger.info( "## File Information ##################################################" ) logger.info("\n".join("{}:{}".format(i, j) for i, j in output.items())) logger.info( "######################################################################" ) pdf_doc.close() return True, output def extract_text(self, output_folder: str = None) -> Tuple[bool, str, dict]: """ Extracts text from PDF """ # Extract text try: logger.info(f"Extracting text from {self.pdf_file}") text = "" page_text_dict = {} pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") for page in pdf_doc: page_text = page.get_text() text += page_text + "\n" page_text_dict[page.number] = page_text # To Display Extracted Text # logger.info( # "## Extracted Text ####################################################" # ) # logger.info(text) # logger.info( # "######################################################################" # ) # Save to file if output_folder: txt_output_folder = os.path.join(output_folder, 'pdf_text/') os.makedirs(txt_output_folder, exist_ok=True) txt_file = os.path.join(txt_output_folder, self.simple_pdf_file.replace(".pdf", ".txt")) with open(txt_file, "w", encoding="utf-8") as file: file.write(text.strip()) json_output_folder = os.path.join(output_folder, 'pdf_json/') os.makedirs(json_output_folder, exist_ok=True) json_file = os.path.join(json_output_folder, self.simple_pdf_file.replace(".pdf", ".json")) with open(json_file, "w", encoding="utf-8") as file: json.dump(page_text_dict, file, indent=4) pdf_doc.close() return True, text, page_text_dict except Exception as e: logger.error(f"Error extracting text: {e}") print_exc() return False, str(e), {} def parse_blocks_page(self, page: fitz.Page): blocks = page.get_text("blocks") list_of_blocks = [] for block in blocks: x0, y0, x1, y1, lines_in_the_block, block_no, block_type = block list_of_blocks.append( { "bbox": [x0, y0, x1, y1], "lines_in_the_block": lines_in_the_block, "block_no": block_no, "block_type": block_type, } ) return list_of_blocks def parse_all_blocks(self): pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") pdf_blocks = {} for page_num in tqdm(range(pdf_doc.page_count), disable=False): page = pdf_doc[page_num] blocks = self.parse_blocks_page(page) pdf_blocks[page_num] = blocks return pdf_blocks def search_for_text(self, page_text, search_str): """ Search for the search string within the document lines """ # Find all matches within one line result_iter = re.finditer(search_str, page_text, re.IGNORECASE) results = [ result.group() for result in result_iter if result.group().strip() != "" ] # In case multiple matches within one line return results def redact_matching_data(self, page, matched_value): """ Redacts matching values """ logger.info(f"Redacting matching values in {self.pdf_file}") matching_val_area = page.search_for(matched_value) # Redact matching values [ page.add_redact_annot(area, text=" ", fill=(0, 0, 0)) for area in matching_val_area ] # Apply the redaction page.apply_redactions() return matching_val_area def frame_matching_data(self, page, matched_value): """ frames matching values """ matching_val_area = page.search_for(matched_value) for area in matching_val_area: if isinstance(area, fitz.fitz.Rect): # Draw a rectangle around matched values annot = page.add_redact_annot(area) # , fill = fitz.utils.getColor('black') annot.setColors(stroke=fitz.utils.getColor("red")) # If you want to remove matched data # page.addFreetextAnnot(area, ' ') annot.update() return matching_val_area def highlight_rectangle(self, pdf_doc: fitz.Document, page_index: int, bbox: list, title: str = "", content: dict = {}): """ Highlight rectangle """ rectangle = fitz.Rect(bbox[0], bbox[1], bbox[2], bbox[3]) page = pdf_doc[page_index] highlight = page.add_highlight_annot([rectangle]) content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() def highlight_matching_data( self, page, text_block, within_bbox: list = None, highlight_text_inside_block: str = None, content: dict = {}, title: str = "", only_hightlight_first: bool = False, exact_match: bool = False, ): """ Highlight matching values """ # logger.info(f"Highlighting matching values in {self.pdf_file}") if within_bbox is not None: matching_val_area = page.search_for( text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]) ) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''), clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('-\n', ''), clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block) else: matching_val_area = page.search_for(text_block) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', '')) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('-\n', '')) if ( highlight_text_inside_block is not None and len(highlight_text_inside_block) > 0 ): highlight_bbox_list = [] for area in matching_val_area: text_bbox_area = page.search_for( highlight_text_inside_block, clip=[area.x0, area.y0, area.x1, area.y1], ) if text_bbox_area is not None and len(text_bbox_area) > 0: if only_hightlight_first: highlight_bbox_list.append(text_bbox_area[0]) break else: highlight_bbox_list.extend(text_bbox_area) matching_val_area = highlight_bbox_list else: if only_hightlight_first: matching_val_area = [matching_val_area[0]] if matching_val_area is not None and len(matching_val_area) > 0: matching_val_area = self.merge_matching_val_area(matching_val_area) if exact_match: matching_val_area = self.get_exact_match_area(page, matching_val_area, text_block) # matching_val_area = self.merge_matching_val_area(matching_val_area) for area in matching_val_area: highlight = page.add_highlight_annot([area]) bbox_list = [area.x0, area.y0, area.x1, area.y1] content["bbox"] = bbox_list content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() return matching_val_area def get_exact_match_area(self, page, matching_val_area, search_text): results = [] for area in matching_val_area: area_text = page.get_text("text", clip=area).strip() area_text_list = area_text.split() search_text_list = search_text.split() capital_not_match = False any_word_match = False for search_split in search_text_list: if search_split in area_text_list: any_word_match = True search_split_lower = search_split.lower() if search_split_lower in area_text_list and \ search_split not in area_text_list: capital_not_match = True break if capital_not_match: continue elif any_word_match: results.append(area) else: pass return results def merge_matching_val_area(self, matching_val_area): """ Merge the matching val areas which with same y0 and y1, the x0 is the min x0, x1 is the max x1 """ if matching_val_area is None or len(matching_val_area) == 0: return matching_val_area if len(matching_val_area) == 1: return matching_val_area # unify the y0 and y1 which are close to each other (less than 5 pixels) y0_list = [] y1_list = [] for area in matching_val_area: y0 = area.y0 y1 = area.y1 if len(y0_list) == 0: y0_list.append(y0) y1_list.append(y1) else: for t_y0 in y0_list: if abs(t_y0 - y0) < 5: area.y0 = t_y0 else: if y0 not in y0_list: y0_list.append(y0) for t_y1 in y1_list: if abs(t_y1 - y1) < 5: area.y1 = t_y1 else: if y1 not in y1_list: y1_list.append(y1) # get area list which with same y0 and y1 y0_y1_list = list(set([(area.y0, area.y1) for area in matching_val_area])) new_matching_val_area = [] for y0_y1 in y0_y1_list: y0 = y0_y1[0] y1 = y0_y1[1] x0_list = [area.x0 for area in matching_val_area if area.y0 == y0 and area.y1 == y1] x1_list = [area.x1 for area in matching_val_area if area.y0 == y0 and area.y1 == y1] min_x0 = min(x0_list) max_x1 = max(x1_list) new_matching_val_area.append(fitz.Rect(min_x0, y0, max_x1, y1)) return new_matching_val_area def highlight_matching_paragraph_text( self, pdf_doc: fitz.Document, page_index: int, search_paragraph_text: str, sibling_paragraph_text_list: list = [], next_page_found_lines: list = [], content: dict = {}, title: str = "", ): page = pdf_doc[page_index] page_text = page.get_text("text") page_lines = [ line for line in page_text.split("\n") if len(line.strip()) > 0 ] matching_val_area = [] find_begin = False search_paragraph_text_words = [ word.strip() for word in search_paragraph_text.lower().split() if len(word.strip()) > 0 ] found_words = [] found_lines = [] jacard_similarity = 0 found_matched = False found_lines_dict = {} for index, line in enumerate(page_lines): if len(next_page_found_lines) > 0: if line in next_page_found_lines: continue words = [ word.strip() for word in line.lower().split() if len(word.strip()) > 0 ] if len(words) == 0: continue if find_begin: found_words.extend(words) new_jacard_similarity = self.similarity.jaccard_similarity( search_paragraph_text_words, found_words ) if new_jacard_similarity > jacard_similarity: jacard_similarity = new_jacard_similarity found_lines.append(line) else: if jacard_similarity > 0.4: found_matched = True break else: if search_paragraph_text_words[0].lower() in line.lower() and \ search_paragraph_text_words[1].lower() in line.lower() and \ search_paragraph_text_words[2].lower() in line.lower(): jacard_similarity = self.similarity.jaccard_similarity( search_paragraph_text_words, words ) if jacard_similarity > 0.05: find_begin = True found_words.extend(words) found_lines.append(line) if jacard_similarity > 0.4: found_matched = True if found_matched and len(found_lines) > 0: total_matching_val_area = [] for line in found_lines: matching_val_area = page.search_for(line) if len(matching_val_area) == 0: matching_val_area = page.search_for(line.strip()) if len(matching_val_area) == 0: continue elif len(matching_val_area) == 1: total_matching_val_area.extend(matching_val_area) else: y1_list = [area.y1 for area in matching_val_area] if len(total_matching_val_area) == 0: y1_min = max(y1_list) y1_min_index = y1_list.index(y1_min) total_matching_val_area.append(matching_val_area[y1_min_index]) else: last_y1 = total_matching_val_area[-1].y1 latest_bigger_y1_list = max( [y1 for y1 in y1_list if y1 > last_y1] ) latest_bigger_y1_index = y1_list.index(latest_bigger_y1_list) total_matching_val_area.append( matching_val_area[latest_bigger_y1_index] ) # get min x0, min y0, max x1, max y1 from total_matching_val_area x0_list = [area.x0 for area in total_matching_val_area] y0_list = [area.y0 for area in total_matching_val_area] x1_list = [area.x1 for area in total_matching_val_area] y1_list = [area.y1 for area in total_matching_val_area] min_x0 = min(x0_list) min_y0 = min(y0_list) max_x1 = max(x1_list) max_y1 = max(y1_list) matching_val_area = [fitz.Rect(min_x0, min_y0, max_x1, max_y1)] highlight = page.add_highlight_annot(matching_val_area) bbox_list = [[min_x0, min_y0, max_x1, max_y1]] content["bbox"] = bbox_list content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() found_lines_dict = { page_index: {"bbox_list": bbox_list, "found_lines": found_lines} } # jacard_similarity is between 0.4 and 0.9, # perhaps there are left lines in next page, so we need to check the next page. if jacard_similarity > 0.4 and jacard_similarity < 0.9: next_page_index = page_index + 1 if next_page_index < pdf_doc.page_count: next_page = pdf_doc[next_page_index] next_page_text = next_page.get_text("text") next_page_lines = [ line for line in next_page_text.split("\n") if len(line.strip()) > 0 ] found_line_index = -1 for i in range(10): if len(next_page_lines) < i + 1: break next_page_line = next_page_lines[i] words = [ word.strip() for word in next_page_line.lower().split() if len(word.strip()) > 0 ] if len(words) == 0: continue temp_found_words = found_words + words new_jacard_similarity = self.similarity.jaccard_similarity( search_paragraph_text_words, temp_found_words ) if new_jacard_similarity > jacard_similarity: found_line_index = i break if found_line_index != -1: new_found_words = found_words new_found_lines = [] found_matched = False for index, line in enumerate(next_page_lines): if index < found_line_index: continue words = [ word.strip() for word in line.lower().split() if len(word.strip()) > 0 ] if len(words) == 0: continue new_found_words.extend(words) new_jacard_similarity = ( self.similarity.jaccard_similarity( search_paragraph_text_words, new_found_words ) ) if new_jacard_similarity > jacard_similarity: jacard_similarity = new_jacard_similarity new_found_lines.append(line) else: break if len(new_found_lines) > 0: total_matching_val_area = [] for line in new_found_lines: matching_val_area = next_page.search_for(line) if len(matching_val_area) == 0: matching_val_area = page.search_for(line.strip()) if len(matching_val_area) == 0: continue elif len(matching_val_area) == 1: total_matching_val_area.extend(matching_val_area) else: y1_list = [area.y1 for area in matching_val_area] if len(total_matching_val_area) == 0: y1_min = max(y1_list) y1_min_index = y1_list.index(y1_min) total_matching_val_area.append( matching_val_area[y1_min_index] ) else: last_y1 = total_matching_val_area[-1].y1 latest_bigger_y1_list = max( [y1 for y1 in y1_list if y1 > last_y1] ) latest_bigger_y1_index = y1_list.index( latest_bigger_y1_list ) total_matching_val_area.append( matching_val_area[latest_bigger_y1_index] ) # get min x0, min y0, max x1, max y1 from total_matching_val_area x0_list = [area.x0 for area in total_matching_val_area] y0_list = [area.y0 for area in total_matching_val_area] x1_list = [area.x1 for area in total_matching_val_area] y1_list = [area.y1 for area in total_matching_val_area] min_x0 = min(x0_list) min_y0 = min(y0_list) max_x1 = max(x1_list) max_y1 = max(y1_list) matching_val_area = [ fitz.Rect(min_x0, min_y0, max_x1, max_y1) ] highlight = next_page.add_highlight_annot(matching_val_area) new_bbox_list = [[min_x0, min_y0, max_x1, max_y1]] content["found_page"] = next_page_index content["bbox"] = new_bbox_list content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() found_lines_dict[next_page_index] = { "bbox_list": new_bbox_list, "found_lines": new_found_lines, } found_lines_dict_keys = list(found_lines_dict.keys()) exact_match = True exact_match_search_paragraph_text = search_paragraph_text if len(found_lines_dict_keys) > 0 and len(sibling_paragraph_text_list) > 0: found_line_list = [] for key in found_lines_dict_keys: found_line_list.extend(found_lines_dict[key]["found_lines"]) found_line_words = [] for line in found_line_list: words = [ word.strip() for word in line.lower().split() if len(word.strip()) > 0 ] found_line_words.extend(words) max_sibling_jacard_similarity = 0 max_sibling_jacard_similarity_index = -1 for index, sibling_paragraph_text in enumerate( sibling_paragraph_text_list ): sibling_paragraph_text_words = [ word.strip() for word in sibling_paragraph_text.lower().split() if len(word.strip()) > 0 ] sibling_jacard_similarity = self.similarity.jaccard_similarity( sibling_paragraph_text_words, found_line_words ) if sibling_jacard_similarity > max_sibling_jacard_similarity: max_sibling_jacard_similarity = sibling_jacard_similarity max_sibling_jacard_similarity_index = index if max_sibling_jacard_similarity > jacard_similarity: exact_match = False exact_match_search_paragraph_text = sibling_paragraph_text_list[ max_sibling_jacard_similarity_index ] return { "found_lines_dict": found_lines_dict, "exact_match": exact_match, "exact_match_search_paragraph_text": exact_match_search_paragraph_text, } def get_page_range_by_keywords( self, pdf_doc, start_keywords, end_keywords, return_page_text_list=False, ): """ Get page range by keywords pdf_doc: pdf document page_range_start_keywords: list of start keywords page_range_end_keywords: list of end keywords return_page_text_list: return page text list or not """ start_page = -1 end_page = -1 if len(start_keywords) == 0 or len(end_keywords) == 0: start_page = 0 if len(start_keywords) == 0 and len(end_keywords) == 0: end_page = pdf_doc.page_count - 1 search_start = 0 # avoid to search the TOC part if pdf_doc.page_count > 20: search_start = 8 for page_index in range(search_start, pdf_doc.page_count): if start_page >= 0 and end_page >= 0: break page = pdf_doc[page_index] page_text = page.get_text("text").strip() page_text_list = [ split.strip() for split in page_text.split("\n") if len(split.strip()) > 0 ] if start_page == -1: find = self.find_keywords_in_text_list(page_text_list, start_keywords) if find: start_page = page_index if start_page >= 0 and end_page == -1: find = self.find_keywords_in_text_list(page_text_list, end_keywords) if find: end_page = page_index break # return page_list which starts from start_page and ends at end_page page_text_list = [] if start_page >= 0 and end_page >= 0: page_list = [i for i in range(start_page, end_page)] if return_page_text_list: for page_index in page_list: page = pdf_doc[page_index] page_text = page.get_text("text").strip() page_text_list.append(page_text) else: page_list = [] return page_list, page_text_list def exist_keywords_in_text_list(self, page_text, keywords): page_text_list = [ split.strip() for split in page_text.split("\n") if len(split.strip()) > 0 ] find = self.find_keywords_in_text_list(page_text_list, keywords) return find def find_keywords_in_text_list(self, text_list, keywords): """ Find keywords in text list """ find = False for keyword in keywords: for index, line in enumerate(text_list): if line.lower().startswith(keyword.lower()): lower_case_begin_words_count = ( self.get_lower_case_begin_words_count(line) ) if lower_case_begin_words_count > 3: continue if line.upper() == line: find = True break if index != 0: lower_case_begin_words_count = ( self.get_lower_case_begin_words_count(text_list[index - 1]) ) if lower_case_begin_words_count > 3: continue if index > 5: if "." in line or "," in line: continue find = True break if find: break return find def get_lower_case_begin_words_count(self, text): count = 0 for word in text.split(): if word[0].islower(): count += 1 return count def process_data( self, output_file: str, dp_Value_info: dict, pages: Tuple = None, action: str = "Highlight", ): """ Process the pages of the PDF File 1. Open the input file. 2. Create a memory buffer for storing temporarily the output file. 3. Initialize a variable for storing the total number of matches of the string we were searching for. 4. Iterate throughout the selected pages of the input file and split the current page into lines. 5. Search for the string within the page. 6. Apply the corresponding action (i.e "Redact", "Frame", "Highlight", etc.) 7. Display a message signaling the status of the search process. 8. Save and close the input file. 9. Save the memory buffer to the output file. output_file: The path of the PDF file to generate after processing. dp_Value_info: The information for data points. pages: The pages to consider while processing the PDF file. action: The action to perform on the PDF file. """ logger.info(f"Processing {self.pdf_file}") data_list = [] try: # Save the generated PDF to memory buffer pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") output_buffer = BytesIO() find_value_dp_list = [] matching_val_area_list = [] page_list = [i for i in range(pdf_doc.page_count)] dp_range_page_list = {} for dp_name, dp_detail in dp_Value_info.items(): if not isinstance(dp_detail, dict): continue page_range_start_keywords = dp_detail.get( "page_range_start_keywords", [] ) page_range_end_keywords = dp_detail.get("page_range_end_keywords", []) if ( len(page_range_start_keywords) > 0 and len(page_range_end_keywords) > 0 ): page_list, page_text_list = self.get_page_range_by_keywords( pdf_doc, page_range_start_keywords, page_range_end_keywords, return_page_text_list=False, ) dp_range_page_list[dp_name] = page_list # Iterate through pages next_page_found_lines = [] for page_index in page_list: # If required for specific pages if pages: if page_index not in pages: continue # Select the page page = pdf_doc[page_index] # Get Matching Data # Split page by lines page_text = page.get_text("text") # if page_index in [24, 25]: # print(page_text) for dp_name, dp_detail in dp_Value_info.items(): if not isinstance(dp_detail, dict): continue dp_biz_name = dp_detail.get("biz_name", "") dp_level = dp_detail.get("level", "") dp_value = dp_detail.get("value", "") value_text_type = dp_detail.get("value_text_type", "string") if value_text_type == "string": dp_value_text = dp_detail.get("value_text", "") elif value_text_type == "list": dp_value_text = dp_detail.get("value_text", []) else: dp_value_text = dp_detail.get("value_text", "") value_text_structure = dp_detail.get("value_text_structure", "word") inner_context_regex = dp_detail.get("inner_context_regex", "") text_value_dict = dp_detail.get("text_value_dict", {}) search_str_list = dp_detail.get("regex_list", []) only_hightlight_value_text = dp_detail.get( "only_hightlight_value_text", False ) only_hightlight_first = dp_detail.get( "only_hightlight_first", False ) # logger.info(f"Processing Data Point: {dp_name}") page_range_start_keywords = dp_detail.get( "page_range_start_keywords", [] ) page_range_end_keywords = dp_detail.get( "page_range_end_keywords", [] ) if ( len(page_range_start_keywords) > 0 and len(page_range_end_keywords) > 0 ): if not page_index in dp_range_page_list.get(dp_name, []): continue # find_value = False if value_text_structure == "paragraph": found = dp_detail.get("found", False) if found: continue if ( dp_detail.get("matched_page", -1) != -1 and len(dp_detail.get("bbox_list", [])) > 0 ): continue sibling_paragraph_text_list = dp_detail.get( "sibling_paragraph_text_list", [] ) content = { "data_point": dp_biz_name, "data_point_db_name": dp_name, "data_point_level": dp_level, "found_page": page_index, "bbox": None, } found_dict = self.highlight_matching_paragraph_text( pdf_doc=pdf_doc, page_index=page_index, search_paragraph_text=dp_value_text, sibling_paragraph_text_list=sibling_paragraph_text_list, next_page_found_lines=next_page_found_lines, content=content, title=dp_biz_name, ) found_lines_dict = found_dict.get("found_lines_dict", {}) exact_match = found_dict.get("exact_match", True) exact_match_search_paragraph_text = found_dict.get( "exact_match_search_paragraph_text", dp_value_text ) found_lines_dict_keys = list(found_lines_dict.keys()) if len(found_lines_dict_keys) > 0: found_next_page_lines = False if exact_match: dp_detail["found"] = True for found_page_index, found_lines_info in found_lines_dict.items(): bbox_list = found_lines_info.get("bbox_list", []) bbox_normalized_list = self.get_bbox_normalized( page, bbox_list ) found_lines = found_lines_info.get("found_lines", []) if found_page_index == page_index + 1: next_page_found_lines = found_lines found_next_page_lines = True found_text = " ".join(found_lines).strip() data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": dp_value_text, "ground_truth_text": dp_value_text, "search_str": "", "found_page": found_page_index, "found_value": found_text, "found_value_context": found_text, "found_bbox": bbox_list, "found_bbox_normalized": bbox_normalized_list, "output_file": output_file, "action": action, "comment": "found page number is page number, as page number starts from 0.", } if dp_name not in find_value_dp_list: find_value_dp_list.append(dp_name) data_list.append(data) else: for dp_name, dp_detail in dp_Value_info.items(): value_text_structure = dp_detail.get("value_text_structure", "word") if value_text_structure == "paragraph": dp_value_text = dp_detail.get("value_text", "") if dp_value_text == exact_match_search_paragraph_text: dp_detail["found"] = True break for found_page_index, found_lines_info in found_lines_dict.items(): bbox_list = found_lines_info.get("bbox_list", []) bbox_normalized_list = self.get_bbox_normalized( page, bbox_list ) found_lines = found_lines_info.get("found_lines", []) if found_page_index == page_index + 1: next_page_found_lines = found_lines found_next_page_lines = True found_text = " ".join(found_lines).strip() data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": exact_match_search_paragraph_text, "ground_truth_text": exact_match_search_paragraph_text, "search_str": "", "found_page": found_page_index, "found_value": found_text, "found_value_context": found_text, "found_bbox": bbox_list, "found_bbox_normalized": bbox_normalized_list, "output_file": output_file, "action": action, "comment": "found page number is page number, as page number starts from 0.", } if dp_name not in find_value_dp_list: find_value_dp_list.append(dp_name) data_list.append(data) if not found_next_page_lines: next_page_found_lines = [] else: if search_str_list is not None and len(search_str_list) > 0: matched_blocks = [] for search_str in search_str_list: found_blocks = self.search_for_text( page_text, search_str ) if found_blocks: matched_blocks.extend(found_blocks) else: # get matched blocks by similarity for dp_value_text # the data point value is string, not totally same as the value in the database. # such as, dp_name is FundName or ShareClassName or Advisor or Strategy matched_blocks = [] if matched_blocks: for matched_block in matched_blocks: dp_value = "" if inner_context_regex != "": dp_value_text_search = re.search(inner_context_regex, matched_block, re.IGNORECASE) if dp_value_text_search is not None: dp_value_text = dp_value_text_search.group(0).strip() # remove special characters dp_value_text = re.sub(r"\W+", " ", dp_value_text).strip() if dp_value_text == "may": continue if dp_value_text != "" and len(text_value_dict.keys()) > 0: dp_value = text_value_dict.get(dp_value_text.lower(), "") else: dp_value_text = matched_block dp_value = matched_block else: if dp_value_text == "": dp_value_text = matched_block dp_value = matched_block content = { "data_point_name": dp_biz_name, "data_point_db_name": dp_name, "data_point_level": dp_level, "page_index": page_index, "dp_value_text": dp_value_text, "dp_value": dp_value, "bbox": None, } if only_hightlight_value_text and dp_value_text != "" and dp_value_text != matched_block: matching_val_area = self.highlight_matching_data( page=page, text_block=matched_block, highlight_text_inside_block=dp_value_text, content=content, title=dp_biz_name, only_hightlight_first=only_hightlight_first, ) else: matching_val_area = self.highlight_matching_data( page=page, text_block=matched_block, highlight_text_inside_block=None, content=content, title=dp_biz_name, only_hightlight_first=only_hightlight_first, ) if len(matching_val_area) > 0: matching_val_area_list.extend(matching_val_area) if len(matching_val_area) > 0: bbox_list = [] for area in matching_val_area: bbox_list.append( [area.x0, area.y0, area.x1, area.y1] ) bbox_normalized_list = self.get_bbox_normalized( page, bbox_list ) data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": dp_value, "ground_truth_text": dp_value_text, "search_str": search_str, "found_page": page_index, "found_value_text": dp_value_text, "found_value": dp_value, "found_value_context": matched_block.strip(), "found_bbox": bbox_list, "found_bbox_normalized": bbox_normalized_list, "output_file": output_file, "action": action, "comment": "found page number is page number, as page number starts from 0.", } if dp_name not in find_value_dp_list: find_value_dp_list.append(dp_name) data_list.append(data) # find_value = True if only_hightlight_first: break if len(find_value_dp_list) == 0: output_file = "" for dp_name, dp_detail in dp_Value_info.items(): if dp_name not in find_value_dp_list: dp_biz_name = dp_detail.get("biz_name", "") dp_level = dp_detail.get("level", "") dp_value = dp_detail.get("value", "") dp_value_text = dp_detail.get("value_text", "") data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": dp_value, "ground_truth_text": dp_value_text, "search_str": "", "found_page": -1, "found_value_text": "", "found_value": -1, "found_value_context": "", "found_bbox": [], "found_bbox_normalized": [], "output_file": output_file, "action": action, "comment": f"Not found {dp_biz_name} in the document.", } data_list.append(data) logger.info( f"{len(matching_val_area_list)} Match(es) In Input File: {self.pdf_file}" ) # Save to output pdf_doc.save(output_buffer) pdf_doc.close() if len(find_value_dp_list) > 0 and \ output_file is not None and \ output_file != "": # Save the output buffer to the output file with open(output_file, mode="wb") as f: f.write(output_buffer.getbuffer()) logger.info(f"File saved to {output_file}") except Exception as e: logger.error(f"Error processing file: {e}") print_exc() if len(data_list) == 0: data = { "pdf_file": self.simple_pdf_file, "dp_name": "", "dp_biz_name": "", "dp_level": "", "ground_truth": "", "ground_truth_text": "", "search_str": "", "found_page": -1, "found_value_text": "", "found_value": -1, "found_value_context": "", "found_bbox": [], "found_bbox_normalized": [], "output_file": output_file, "action": action, "comment": "", } data_list.append(data) return data_list def get_bbox_normalized(self, page, bbox): page_width = page.rect.width page_height = page.rect.height bbox_normalized = [] for box in bbox: x0 = box[0] / page_width y0 = box[1] / page_height x1 = box[2] / page_width y1 = box[3] / page_height bbox_normalized.append([x0, y0, x1, y1]) return bbox_normalized def find_value_by_regex(self, page_text, search_str: str): pass def get_high_similarity_text( self, page_text, search_str: str, threshold: float = 0.8 ): matched_values = [] page_text_list = page_text.split("\n") return matched_values def remove_highlght(self, output_file: str, pages: Tuple = None): """ 1. Open the input file. 2. Create a memory buffer for storing temporarily the output file. 3. Iterate throughout the pages of the input file and checks if annotations are found. 4. Delete these annotations. 5. Display a message signaling the status of this process. 6. Close the input file. 7. Save the memory buffer to the output file. """ logger.info(f"Removing Highlights from {self.pdf_file}") try: # Save the generated PDF to memory buffer pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") output_buffer = BytesIO() # Initialize a counter for annotations annot_found = 0 # Iterate through pages for pg in range(pdf_doc.page_count): # If required for specific pages if pages: if str(pg) not in pages: continue # Select the page page = pdf_doc[pg] annot = page.first_annot while annot: annot_found += 1 page.delete_annot(annot) annot = annot.next if annot_found >= 0: print(f"Annotation(s) Found In The Input File: {self.pdf_file}") # Save to output pdf_doc.save(output_buffer) pdf_doc.close() # Save the output buffer to the output file with open(output_file, mode="wb") as f: f.write(output_buffer.getbuffer()) except Exception as e: logger.error(f"Error removing highlights: {e}") print_exc() def process_file( self, output_file: str, dp_Value_info: dict = None, pages: Tuple = None, action: str = "Highlight", ): """ To process one single file Redact, Frame, Highlight... one PDF File Remove Highlights from a single PDF File action: Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove """ logger.info(f"Processing {self.pdf_file}") if output_file is None: output_file = self.pdf_file data_list = [] # Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove if action == "Remove": # Remove the Highlights except Redactions self.remove_highlght(output_file=output_file, pages=pages) else: data_list = self.process_data( output_file=output_file, dp_Value_info=dp_Value_info, pages=pages, action=action, ) return data_list