# Import Libraries from typing import Tuple from io import BytesIO import os import argparse import re import fitz import json from traceback import print_exc from tqdm import tqdm import base64 from copy import deepcopy from utils.similarity import Similarity from utils.biz_utils import total_currency_list from utils.logger import logger import requests from bs4 import BeautifulSoup import dotenv # loads .env file with your OPENAI_API_KEY dotenv.load_dotenv() class PDFUtil: def __init__(self, pdf_file: str) -> None: self.pdf_file = pdf_file self.simple_pdf_file = os.path.basename(self.pdf_file) self.is_valid_path() self.similarity = Similarity() def is_valid_path(self): """ Validates the path inputted and checks whether it is a file path or a folder path """ if not self.pdf_file: raise ValueError(f"Invalid Path") if os.path.isfile(self.pdf_file) and self.pdf_file.endswith(".pdf"): return True else: raise ValueError( f"Invalid Path {self.pdf_file}, please input the correct pdf file path." ) def extract_info(self) -> Tuple[bool, dict]: """ Extracts file info """ logger.info(f"Extracting file info from {self.pdf_file}") # Open the PDF pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted output = { "File": self.pdf_file, "Encrypted": ("True" if pdf_encrypted else "False"), } # If PDF is encrypted the file metadata cannot be extracted if not pdf_encrypted: for key, value in pdf_doc.metadata.items(): output[key] = value # To Display File Info logger.info( "## File Information ##################################################" ) logger.info("\n".join("{}:{}".format(i, j) for i, j in output.items())) logger.info( "######################################################################" ) pdf_doc.close() return True, output def extract_text(self, output_folder: str = None) -> Tuple[bool, str, dict]: """ Extracts text from PDF """ # Extract text try: logger.info(f"Extracting text from {self.pdf_file}") text = "" page_text_dict = {} pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") for page in pdf_doc: page_text = page.get_text() text += page_text + "\n" page_text_dict[page.number] = page_text # To Display Extracted Text # logger.info( # "## Extracted Text ####################################################" # ) # logger.info(text) # logger.info( # "######################################################################" # ) # Save to file if output_folder: txt_output_folder = os.path.join(output_folder, 'pdf_text/') os.makedirs(txt_output_folder, exist_ok=True) txt_file = os.path.join(txt_output_folder, self.simple_pdf_file.replace(".pdf", ".txt")) with open(txt_file, "w", encoding="utf-8") as file: file.write(text.strip()) json_output_folder = os.path.join(output_folder, 'pdf_json/') os.makedirs(json_output_folder, exist_ok=True) json_file = os.path.join(json_output_folder, self.simple_pdf_file.replace(".pdf", ".json")) with open(json_file, "w", encoding="utf-8") as file: json.dump(page_text_dict, file, indent=4) pdf_doc.close() return True, text, page_text_dict except Exception as e: logger.error(f"Error extracting text: {e}") print_exc() return False, str(e), {} def extract_images(self, zoom:float = 2.0, pdf_page_index_list: list = None, output_folder: str = None): try: pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") if pdf_page_index_list is None or len(pdf_page_index_list) == 0: pdf_page_index_list = range(pdf_doc.page_count) pdf_base_name = os.path.basename(self.pdf_file).replace(".pdf", "") mat = fitz.Matrix(zoom, zoom) output_data = {} for page_num in tqdm(pdf_page_index_list, disable=False): page = pdf_doc[page_num] pix = page.get_pixmap(matrix=mat) img_buffer = pix.tobytes(output='png') output_data[page_num] = {} img_base64 = base64.b64encode(img_buffer).decode('utf-8') if output_folder and len(output_folder) > 0: os.makedirs(output_folder, exist_ok=True) image_file = os.path.join(output_folder, f"{pdf_base_name}_{page_num}.png") pix.save(image_file) output_data[page_num]["img_file"] = image_file output_data[page_num]["img_base64"] = img_base64 return output_data except Exception as e: logger.error(f"Error extracting images: {e}") print_exc() return {} def extract_image_from_page(self, page_index: int, zoom:float = 2.0, output_folder: str = None): try: pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") pdf_base_name = os.path.basename(self.pdf_file).replace(".pdf", "") mat = fitz.Matrix(zoom, zoom) page = pdf_doc[page_index] pix = page.get_pixmap(matrix=mat) img_buffer = pix.tobytes(output='png') img_base64 = base64.b64encode(img_buffer).decode('utf-8') if output_folder and len(output_folder) > 0: os.makedirs(output_folder, exist_ok=True) image_file = os.path.join(output_folder, f"{pdf_base_name}_{page_index}.png") pix.save(image_file) pdf_doc.close() return img_base64 except Exception as e: logger.error(f"Error extracting image from page: {e}") print_exc() return None def parse_blocks_page(self, page: fitz.Page): blocks = page.get_text("blocks") list_of_blocks = [] for block in blocks: x0, y0, x1, y1, lines_in_the_block, block_no, block_type = block list_of_blocks.append( { "bbox": [x0, y0, x1, y1], "lines_in_the_block": lines_in_the_block, "block_no": block_no, "block_type": block_type, } ) return list_of_blocks def parse_all_blocks(self): pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") pdf_blocks = {} for page_num in tqdm(range(pdf_doc.page_count), disable=False): page = pdf_doc[page_num] blocks = self.parse_blocks_page(page) pdf_blocks[page_num] = blocks return pdf_blocks def search_for_text(self, page_text, search_str): """ Search for the search string within the document lines """ # Find all matches within one line result_iter = re.finditer(search_str, page_text, re.IGNORECASE) results = [ result.group() for result in result_iter if result.group().strip() != "" ] # In case multiple matches within one line return results def redact_matching_data(self, page, matched_value): """ Redacts matching values """ logger.info(f"Redacting matching values in {self.pdf_file}") matching_val_area = page.search_for(matched_value) # Redact matching values [ page.add_redact_annot(area, text=" ", fill=(0, 0, 0)) for area in matching_val_area ] # Apply the redaction page.apply_redactions() return matching_val_area def frame_matching_data(self, page, matched_value): """ frames matching values """ matching_val_area = page.search_for(matched_value) for area in matching_val_area: if isinstance(area, fitz.fitz.Rect): # Draw a rectangle around matched values annot = page.add_redact_annot(area) # , fill = fitz.utils.getColor('black') annot.setColors(stroke=fitz.utils.getColor("red")) # If you want to remove matched data # page.addFreetextAnnot(area, ' ') annot.update() return matching_val_area def highlight_rectangle(self, pdf_doc: fitz.Document, page_index: int, bbox: list, title: str = "", content: dict = {}): """ Highlight rectangle """ rectangle = fitz.Rect(bbox[0], bbox[1], bbox[2], bbox[3]) page = pdf_doc[page_index] highlight = page.add_highlight_annot([rectangle]) content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() def batch_drilldown(self, drilldown_data_list: list, output_pdf_folder: str = None): pdf_doc = fitz.open(self.pdf_file) annotation_list = [] for drilldown_data in drilldown_data_list: page_index = drilldown_data["page_index"] data_point = drilldown_data["data_point"] if isinstance(data_point, list): data_point = ", ".join(data_point) parent_text_block = drilldown_data.get("parent_text_block", None) highlight_value = drilldown_data["value"] annotation_attribute = drilldown_data.get("annotation_attribute", {}) if isinstance(highlight_value, str): annotation_list.append(self.highlight_pdf_doc( pdf_doc=pdf_doc, page_index=page_index, highlight_value=highlight_value, parent_text_block=parent_text_block, data_point=data_point, annotation_attribute=annotation_attribute )) elif isinstance(highlight_value, list): for value in highlight_value: annotation_list.append(self.highlight_pdf_doc( pdf_doc=pdf_doc, page_index=page_index, highlight_value=value, parent_text_block=parent_text_block, data_point=data_point, annotation_attribute=annotation_attribute )) elif isinstance(highlight_value, dict): for key, value in highlight_value.items(): annotation_list.append(self.highlight_pdf_doc( pdf_doc=pdf_doc, page_index=page_index, highlight_value=value, parent_text_block=parent_text_block, data_point=f"{data_point}, {key}", annotation_attribute=annotation_attribute )) else: highlight_value = str(highlight_value) annotation_list.append(self.highlight_pdf_doc( pdf_doc=pdf_doc, page_index=page_index, highlight_value=highlight_value, parent_text_block=parent_text_block, data_point=data_point, annotation_attribute=annotation_attribute )) if output_pdf_folder is not None and len(output_pdf_folder) > 0: os.makedirs(output_pdf_folder, exist_ok=True) pdf_file_path = self.save_annotated_pdf(pdf_doc=pdf_doc, output_pdf_folder=output_pdf_folder) result = {"drilldown_pdf_doc": pdf_doc, "annotation_list": annotation_list} return result def save_annotated_pdf(self, pdf_doc: fitz.Document, output_pdf_folder: str): try: if output_pdf_folder is None or len(output_pdf_folder) == 0 or not os.path.exists(output_pdf_folder): return if pdf_doc is None and pdf_doc.is_closed: return pdf_file_name = os.path.basename(self.pdf_file) pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf") output_pdf_dir = os.path.join(output_pdf_folder, "pdf/") os.makedirs(output_pdf_dir, exist_ok=True) pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name) output_buffer = BytesIO() pdf_doc.save(output_buffer) # Save the output buffer to the output file with open(pdf_file_path, mode="wb") as f: f.write(output_buffer.getbuffer()) pdf_doc.close() logger.info(f"File saved to {pdf_file_path}") return pdf_file_path except Exception as e: print_exc() logger.error(f"Error when save output file: {e}") def highlight_pdf_doc(self, pdf_doc: fitz.Document, page_index: int, highlight_value: str, parent_text_block: str = None, data_point: str = None, annotation_attribute: dict = {}): page = pdf_doc[page_index] page_text = page.get_text() parent_text_block_search_text = None if parent_text_block is not None: parent_text_block_regex = self.add_slash_to_text_as_regex(parent_text_block) parent_text_block_search = re.search(parent_text_block_regex, page_text) parent_text_block_search_text = None if parent_text_block_search is not None: parent_text_block_search_text = parent_text_block_search.group() highlight_value_search_text = "" if highlight_value is not None and len(highlight_value.strip()) > 0: pure_highlight_value = highlight_value.strip() highlight_value_search_text = None if len(pure_highlight_value.split()) == 1 and \ (len(pure_highlight_value) < 3 or pure_highlight_value[0].upper() == pure_highlight_value[0]): highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value) highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, page_text, ignore_case=False) else: highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value, match_special_char_after_space=False) highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, page_text, ignore_case=True) if highlight_value_search_text is None: highlight_value_regex = self.add_slash_to_text_as_regex(pure_highlight_value, match_special_char_after_space=True) highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, page_text, ignore_case=False) if highlight_value_search_text is None: pure_highlight_value_splits = pure_highlight_value.split() if pure_highlight_value_splits[-1].upper() in total_currency_list: highlight_value_regex = self.add_slash_to_text_as_regex(' '.join(pure_highlight_value_splits[0:-1]), match_special_char_after_space=False) highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, page_text, ignore_case=False) if highlight_value_search_text is None: # If still can't find the highlight value, search in the previous page previous_page_index = page_index - 1 if previous_page_index >= 0: previous_page = pdf_doc[previous_page_index] previous_page_text = previous_page.get_text() highlight_value_search_text = self.get_proper_search_text(pure_highlight_value, highlight_value_regex, previous_page_text, ignore_case=False) if highlight_value_search_text is not None: page_index = previous_page_index page = previous_page annotation_data = {"pdf_file": self.simple_pdf_file, "page_index": page_index, "data_point": data_point, "value": highlight_value, "matching_val_area": [], "normalized_bbox": []} if highlight_value_search_text is not None: content = { "data_point": data_point, "data_value": highlight_value } # append annotation_attribute to content content.update(annotation_attribute) if len(highlight_value_search_text.strip().split()) > 3: merge_nearby_lines = True else: merge_nearby_lines = False if parent_text_block_search_text is not None: matching_val_area = self.highlight_matching_data( page=page, text_block=parent_text_block_search_text, highlight_text_inside_block=highlight_value_search_text, content=content, title=data_point, only_hightlight_first=False, merge_nearby_lines=merge_nearby_lines ) else: matching_val_area = self.highlight_matching_data( page=page, text_block=highlight_value_search_text, content=content, title=data_point, only_hightlight_first=False, merge_nearby_lines=merge_nearby_lines ) bbox_list = [] for area in matching_val_area: bbox = [area.x0, area.y0, area.x1, area.y1] bbox_list.append(bbox) # order bbox_list by y0, x0, y1, x1 bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2])) annotation_data["matching_val_area"] = bbox_list if len(bbox_list) > 0: annotation_data["normalized_bbox"] = self.get_bbox_normalized(page, bbox_list) else: annotation_data["normalized_bbox"] = [] return annotation_data def get_proper_search_text(self, raw_value: str, highlight_value_regex: str, page_text: str, ignore_case: bool = True): if ignore_case: highlight_value_search_iter = re.finditer(highlight_value_regex, page_text, re.IGNORECASE) else: highlight_value_search_iter = re.finditer(highlight_value_regex, page_text) highlight_value_search_text = None for highlight_value_search in highlight_value_search_iter: highlight_value_search_text = highlight_value_search.group().strip() if highlight_value_search_text == raw_value: return highlight_value_search_text return highlight_value_search_text def add_slash_to_text_as_regex(self, text: str, match_special_char_after_space: bool = True): if text is None or len(text) == 0: return text special_char_iter = re.finditer("\W", text) for special_iter in special_char_iter: if len(special_iter.group().strip()) == 0: continue replace = r"\{0}".format(special_iter.group()) if replace not in text: special_iter_text = special_iter.group() if special_iter_text == ")" and text.strip()[-1] == ")" and \ text.strip().count(")") == 1: text = text.replace(")", r"\)") else: text = re.sub(replace, r"\\W", text) text = re.sub(r"( ){2,}", " ", text) if match_special_char_after_space: text = text.replace(" ", r"\s*\W*") else: text = text.replace(" ", r"\s*") return text def highlight_matching_data( self, page, text_block, within_bbox: list = None, highlight_text_inside_block: str = None, content: dict = {}, title: str = "", only_hightlight_first: bool = False, exact_match: bool = False, merge_nearby_lines: bool = False, ): """ Highlight matching values page: page object in fitz.Document within_bbox: bounding box to search for the text text_block: text to search for in page text highlight_text_inside_block: text to highlight inside parameter: text_block content: content as JSON format to add to the highlight annotation, please customize according to relevant business logic title: title of the highlight annotation only_hightlight_first: only highlight the first match exact_match: exact match or not merge_nearby_lines: merge nearby lines or not """ # logger.info(f"Highlighting matching values in {self.pdf_file}") if text_block is None or len(text_block.strip()) == 0: return [] if within_bbox is not None: matching_val_area = page.search_for( text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]) ) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''), clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('-\n', ''), clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block) else: matching_val_area = page.search_for(text_block) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.strip()) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', '')) if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.replace('-\n', '')) if len(matching_val_area) > 0 and len(text_block.strip().split()) == 1: new_matching_val_area = [] for area in matching_val_area: # get text by text_bbox pure_text_block = text_block.strip() raw_area_text = page.get_text("text", clip=area).strip() if len(text_block.strip()) < 3 and text_block.strip() != raw_area_text.strip(): continue copy_area = deepcopy(area) copy_area.x0 -= 15 copy_area.x1 += 15 text = page.get_text("text", clip=copy_area).strip() if text == pure_text_block: new_matching_val_area.append(area) else: # get start and end index of the pure_text_block in text start_index = text.find(pure_text_block) if start_index == -1: continue if start_index > 0: previous_char = text[start_index - 1].strip() if previous_char not in ["", " ", "("]: continue end_index = start_index + len(pure_text_block) if end_index < len(text): next_char = text[end_index].strip() if next_char not in ["", " ", "%", ")", "0"]: continue new_matching_val_area.append(area) matching_val_area = new_matching_val_area if ( highlight_text_inside_block is not None and len(highlight_text_inside_block) > 0 and len(matching_val_area) > 0 ): highlight_bbox_list = [] merged_matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines) pure_number_regex = re.compile(r"^\d+$") for area in merged_matching_val_area: text_bbox_area = page.search_for( highlight_text_inside_block, clip=[area.x0, area.y0, area.x1, area.y1], ) if text_bbox_area is not None and len(text_bbox_area) > 0: if only_hightlight_first: highlight_bbox_list.append(text_bbox_area[0]) break else: pure_number_match = pure_number_regex.match(highlight_text_inside_block) if pure_number_match is not None and pure_number_match.group() == highlight_text_inside_block: for area in text_bbox_area: # get text by text_bbox copy_area = deepcopy(area) copy_area.x0 -= 15 copy_area.x1 += 15 text = page.get_text("text", clip=copy_area).strip() if text == highlight_text_inside_block: highlight_bbox_list.append(area) else: # get start and end index of the highlight_text_inside_block in text start_index = text.find(highlight_text_inside_block) if start_index > 0: previous_char = text[start_index - 1].strip() if previous_char not in ["", " ", "("]: continue end_index = start_index + len(highlight_text_inside_block) if end_index < len(text): next_char = text[end_index].strip() if next_char not in ["", " ", "%", ")"]: continue highlight_bbox_list.append(area) else: highlight_bbox_list.extend(text_bbox_area) if len(highlight_bbox_list) == 0 and len(highlight_text_inside_block.strip().split()) > 2: highlight_bbox_list = text_bbox_area = page.search_for( highlight_text_inside_block ) matching_val_area = highlight_bbox_list else: if only_hightlight_first: matching_val_area = [matching_val_area[0]] if matching_val_area is not None and len(matching_val_area) > 0: if (highlight_text_inside_block is not None and len(highlight_text_inside_block.strip().split()) > 1) or \ (highlight_text_inside_block is None and len(text_block.strip().split()) > 1): matching_val_area = self.merge_matching_val_area(matching_val_area, merge_nearby_lines) if exact_match: matching_val_area = self.get_exact_match_area(page, matching_val_area, text_block) # matching_val_area = self.merge_matching_val_area(matching_val_area) # get annotation list from current page xrefs = [annot.xref for annot in page.annots()] annotated_list = [] for xref in xrefs: annotated = page.load_annot(xref) real_bbox_tuple = annotated.vertices[0] + annotated.vertices[3] bbox = fitz.Rect(real_bbox_tuple) annotated_list.append(bbox) for area in matching_val_area: if area in annotated_list: continue highlight = page.add_highlight_annot([area]) bbox_list = [area.x0, area.y0, area.x1, area.y1] content["bbox"] = bbox_list normalized_bbox = self.get_bbox_normalized(page, [bbox_list]) if len(normalized_bbox) > 0: content["normalized_bbox"] = normalized_bbox[0] else: content["normalized_bbox"] = [] content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() return matching_val_area def get_exact_match_area(self, page, matching_val_area, search_text): results = [] for area in matching_val_area: area_text = page.get_text("text", clip=area).strip() area_text_list = area_text.split() search_text_list = search_text.split() capital_not_match = False any_word_match = False for search_split in search_text_list: if search_split in area_text_list: any_word_match = True search_split_lower = search_split.lower() if search_split_lower in area_text_list and \ search_split not in area_text_list: capital_not_match = True break if capital_not_match: continue elif any_word_match: results.append(area) else: pass return results def merge_matching_val_area(self, matching_val_area, merge_nearby_lines=False): """ Merge the matching val areas which with same y0 and y1, the x0 is the min x0, x1 is the max x1 """ if matching_val_area is None or len(matching_val_area) == 0: return matching_val_area if len(matching_val_area) == 1: return matching_val_area # unify the y0 and y1 which are close to each other (less than 5 pixels) y0_list = [] y1_list = [] for area in matching_val_area: y0 = area.y0 y1 = area.y1 if len(y0_list) == 0: y0_list.append(y0) y1_list.append(y1) else: for t_y0 in y0_list: if abs(t_y0 - y0) < 5: area.y0 = t_y0 else: if y0 not in y0_list: y0_list.append(y0) for t_y1 in y1_list: if abs(t_y1 - y1) < 5: area.y1 = t_y1 else: if y1 not in y1_list: y1_list.append(y1) # get area list which with same y0 and y1 y0_y1_list = list(set([(area.y0, area.y1) for area in matching_val_area])) new_matching_val_area = [] for y0_y1 in y0_y1_list: y0 = y0_y1[0] y1 = y0_y1[1] x0_list = [area.x0 for area in matching_val_area if area.y0 == y0 and area.y1 == y1] x1_list = [area.x1 for area in matching_val_area if area.y0 == y0 and area.y1 == y1] min_x0 = min(x0_list) max_x1 = max(x1_list) new_matching_val_area.append(fitz.Rect(min_x0, y0, max_x1, y1)) if merge_nearby_lines and len(new_matching_val_area) > 1: new_matching_val_area = self.merge_nearby_lines(new_matching_val_area) # merge again if len(new_matching_val_area) > 1: new_matching_val_area = self.merge_nearby_lines(new_matching_val_area) elif len(new_matching_val_area) > 1: new_matching_val_area = self.remove_small_pitches(new_matching_val_area) else: pass return new_matching_val_area def remove_small_pitches(self, matching_val_area): x_mini_threshold = 5 new_matching_val_area = [] for area in matching_val_area: if area.x1 - area.x0 > x_mini_threshold: new_matching_val_area.append(area) return new_matching_val_area def merge_nearby_lines(self, matching_val_area): bbox_list = [] for bbox in matching_val_area: bbox = [bbox.x0, bbox.y0, bbox.x1, bbox.y1] bbox_list.append(bbox) # order bbox_list by y0, x0, y1, x1 bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2])) new_matching_val_area = [] last_x0 = None last_x1 = None last_y0 = None last_y1 = None x_mini_threshold = 5 y_threshold = 15 x_threshold = 10 for index, bbox in enumerate(bbox_list): if bbox[2] - bbox[0] <= x_mini_threshold: continue if index == 0 or last_x0 is None: last_x0 = bbox[0] last_y0 = bbox[1] last_x1 = bbox[2] last_y1 = bbox[3] continue x0 = bbox[0] y0 = bbox[1] x1 = bbox[2] y1 = bbox[3] last_x0_x1_range = [i for i in range(int(last_x0), int(last_x1))] x0_x1_range = [i for i in range(int(x0), int(x1))] x_intersection = list(set(last_x0_x1_range).intersection(set(x0_x1_range))) # abs(y0 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold) # exist nearby line as vertical direction, # the horizontal coordinates are intersected or the horizontal coordinates are close to each other # abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold # exist nearby line as horizontal direction, # last sentence is the begin of the current sentence # abs(y1 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold) # last sentence and current sentence are in the same horizontal line # the horizontal coordinates of are last sentence and current sentence intersected # or the horizontal coordinates are close to each other if (abs(y0 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)) or \ (abs(y0 - last_y0) <= y_threshold and abs(x0 - last_x1) <= x_threshold) or \ (abs(y1 - last_y1) <= y_threshold and (len(x_intersection) > 0 or abs(x0 - last_x1) <= x_threshold)): last_x0 = min(last_x0, x0) last_x1 = max(last_x1, x1) last_y0 = min(last_y0, y0) last_y1 = max(last_y1, y1) else: new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1)) last_x0 = x0 last_x1 = x1 last_y0 = y0 last_y1 = y1 new_matching_val_area.append(fitz.Rect(last_x0, last_y0, last_x1, last_y1)) return new_matching_val_area def highlight_matching_paragraph_text( self, pdf_doc: fitz.Document, page_index: int, search_paragraph_text: str, sibling_paragraph_text_list: list = [], next_page_found_lines: list = [], content: dict = {}, title: str = "", ): page = pdf_doc[page_index] page_text = page.get_text("text") page_lines = [ line for line in page_text.split("\n") if len(line.strip()) > 0 ] matching_val_area = [] find_begin = False search_paragraph_text_words = [ word.strip() for word in search_paragraph_text.lower().split() if len(word.strip()) > 0 ] found_words = [] found_lines = [] jacard_similarity = 0 found_matched = False found_lines_dict = {} for index, line in enumerate(page_lines): if len(next_page_found_lines) > 0: if line in next_page_found_lines: continue words = [ word.strip() for word in line.lower().split() if len(word.strip()) > 0 ] if len(words) == 0: continue if find_begin: found_words.extend(words) new_jacard_similarity = self.similarity.jaccard_similarity( search_paragraph_text_words, found_words ) if new_jacard_similarity > jacard_similarity: jacard_similarity = new_jacard_similarity found_lines.append(line) else: if jacard_similarity > 0.4: found_matched = True break else: if search_paragraph_text_words[0].lower() in line.lower() and \ search_paragraph_text_words[1].lower() in line.lower() and \ search_paragraph_text_words[2].lower() in line.lower(): jacard_similarity = self.similarity.jaccard_similarity( search_paragraph_text_words, words ) if jacard_similarity > 0.05: find_begin = True found_words.extend(words) found_lines.append(line) if jacard_similarity > 0.4: found_matched = True if found_matched and len(found_lines) > 0: total_matching_val_area = [] for line in found_lines: matching_val_area = page.search_for(line) if len(matching_val_area) == 0: matching_val_area = page.search_for(line.strip()) if len(matching_val_area) == 0: continue elif len(matching_val_area) == 1: total_matching_val_area.extend(matching_val_area) else: y1_list = [area.y1 for area in matching_val_area] if len(total_matching_val_area) == 0: y1_min = max(y1_list) y1_min_index = y1_list.index(y1_min) total_matching_val_area.append(matching_val_area[y1_min_index]) else: last_y1 = total_matching_val_area[-1].y1 latest_bigger_y1_list = max( [y1 for y1 in y1_list if y1 > last_y1] ) latest_bigger_y1_index = y1_list.index(latest_bigger_y1_list) total_matching_val_area.append( matching_val_area[latest_bigger_y1_index] ) # get min x0, min y0, max x1, max y1 from total_matching_val_area x0_list = [area.x0 for area in total_matching_val_area] y0_list = [area.y0 for area in total_matching_val_area] x1_list = [area.x1 for area in total_matching_val_area] y1_list = [area.y1 for area in total_matching_val_area] min_x0 = min(x0_list) min_y0 = min(y0_list) max_x1 = max(x1_list) max_y1 = max(y1_list) matching_val_area = [fitz.Rect(min_x0, min_y0, max_x1, max_y1)] highlight = page.add_highlight_annot(matching_val_area) bbox_list = [[min_x0, min_y0, max_x1, max_y1]] content["bbox"] = bbox_list content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() found_lines_dict = { page_index: {"bbox_list": bbox_list, "found_lines": found_lines} } # jacard_similarity is between 0.4 and 0.9, # perhaps there are left lines in next page, so we need to check the next page. if jacard_similarity > 0.4 and jacard_similarity < 0.9: next_page_index = page_index + 1 if next_page_index < pdf_doc.page_count: next_page = pdf_doc[next_page_index] next_page_text = next_page.get_text("text") next_page_lines = [ line for line in next_page_text.split("\n") if len(line.strip()) > 0 ] found_line_index = -1 for i in range(10): if len(next_page_lines) < i + 1: break next_page_line = next_page_lines[i] words = [ word.strip() for word in next_page_line.lower().split() if len(word.strip()) > 0 ] if len(words) == 0: continue temp_found_words = found_words + words new_jacard_similarity = self.similarity.jaccard_similarity( search_paragraph_text_words, temp_found_words ) if new_jacard_similarity > jacard_similarity: found_line_index = i break if found_line_index != -1: new_found_words = found_words new_found_lines = [] found_matched = False for index, line in enumerate(next_page_lines): if index < found_line_index: continue words = [ word.strip() for word in line.lower().split() if len(word.strip()) > 0 ] if len(words) == 0: continue new_found_words.extend(words) new_jacard_similarity = ( self.similarity.jaccard_similarity( search_paragraph_text_words, new_found_words ) ) if new_jacard_similarity > jacard_similarity: jacard_similarity = new_jacard_similarity new_found_lines.append(line) else: break if len(new_found_lines) > 0: total_matching_val_area = [] for line in new_found_lines: matching_val_area = next_page.search_for(line) if len(matching_val_area) == 0: matching_val_area = page.search_for(line.strip()) if len(matching_val_area) == 0: continue elif len(matching_val_area) == 1: total_matching_val_area.extend(matching_val_area) else: y1_list = [area.y1 for area in matching_val_area] if len(total_matching_val_area) == 0: y1_min = max(y1_list) y1_min_index = y1_list.index(y1_min) total_matching_val_area.append( matching_val_area[y1_min_index] ) else: last_y1 = total_matching_val_area[-1].y1 latest_bigger_y1_list = max( [y1 for y1 in y1_list if y1 > last_y1] ) latest_bigger_y1_index = y1_list.index( latest_bigger_y1_list ) total_matching_val_area.append( matching_val_area[latest_bigger_y1_index] ) # get min x0, min y0, max x1, max y1 from total_matching_val_area x0_list = [area.x0 for area in total_matching_val_area] y0_list = [area.y0 for area in total_matching_val_area] x1_list = [area.x1 for area in total_matching_val_area] y1_list = [area.y1 for area in total_matching_val_area] min_x0 = min(x0_list) min_y0 = min(y0_list) max_x1 = max(x1_list) max_y1 = max(y1_list) matching_val_area = [ fitz.Rect(min_x0, min_y0, max_x1, max_y1) ] highlight = next_page.add_highlight_annot(matching_val_area) new_bbox_list = [[min_x0, min_y0, max_x1, max_y1]] content["found_page"] = next_page_index content["bbox"] = new_bbox_list content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() found_lines_dict[next_page_index] = { "bbox_list": new_bbox_list, "found_lines": new_found_lines, } found_lines_dict_keys = list(found_lines_dict.keys()) exact_match = True exact_match_search_paragraph_text = search_paragraph_text if len(found_lines_dict_keys) > 0 and len(sibling_paragraph_text_list) > 0: found_line_list = [] for key in found_lines_dict_keys: found_line_list.extend(found_lines_dict[key]["found_lines"]) found_line_words = [] for line in found_line_list: words = [ word.strip() for word in line.lower().split() if len(word.strip()) > 0 ] found_line_words.extend(words) max_sibling_jacard_similarity = 0 max_sibling_jacard_similarity_index = -1 for index, sibling_paragraph_text in enumerate( sibling_paragraph_text_list ): sibling_paragraph_text_words = [ word.strip() for word in sibling_paragraph_text.lower().split() if len(word.strip()) > 0 ] sibling_jacard_similarity = self.similarity.jaccard_similarity( sibling_paragraph_text_words, found_line_words ) if sibling_jacard_similarity > max_sibling_jacard_similarity: max_sibling_jacard_similarity = sibling_jacard_similarity max_sibling_jacard_similarity_index = index if max_sibling_jacard_similarity > jacard_similarity: exact_match = False exact_match_search_paragraph_text = sibling_paragraph_text_list[ max_sibling_jacard_similarity_index ] return { "found_lines_dict": found_lines_dict, "exact_match": exact_match, "exact_match_search_paragraph_text": exact_match_search_paragraph_text, } def get_page_range_by_keywords( self, pdf_doc, start_keywords, end_keywords, return_page_text_list=False, ): """ Get page range by keywords pdf_doc: pdf document page_range_start_keywords: list of start keywords page_range_end_keywords: list of end keywords return_page_text_list: return page text list or not """ start_page = -1 end_page = -1 if len(start_keywords) == 0 or len(end_keywords) == 0: start_page = 0 if len(start_keywords) == 0 and len(end_keywords) == 0: end_page = pdf_doc.page_count - 1 search_start = 0 # avoid to search the TOC part if pdf_doc.page_count > 20: search_start = 8 for page_index in range(search_start, pdf_doc.page_count): if start_page >= 0 and end_page >= 0: break page = pdf_doc[page_index] page_text = page.get_text("text").strip() page_text_list = [ split.strip() for split in page_text.split("\n") if len(split.strip()) > 0 ] if start_page == -1: find = self.find_keywords_in_text_list(page_text_list, start_keywords) if find: start_page = page_index if start_page >= 0 and end_page == -1: find = self.find_keywords_in_text_list(page_text_list, end_keywords) if find: end_page = page_index break # return page_list which starts from start_page and ends at end_page page_text_list = [] if start_page >= 0 and end_page >= 0: page_list = [i for i in range(start_page, end_page)] if return_page_text_list: for page_index in page_list: page = pdf_doc[page_index] page_text = page.get_text("text").strip() page_text_list.append(page_text) else: page_list = [] return page_list, page_text_list def exist_keywords_in_text_list(self, page_text, keywords): page_text_list = [ split.strip() for split in page_text.split("\n") if len(split.strip()) > 0 ] find = self.find_keywords_in_text_list(page_text_list, keywords) return find def find_keywords_in_text_list(self, text_list, keywords): """ Find keywords in text list """ find = False for keyword in keywords: for index, line in enumerate(text_list): if line.lower().startswith(keyword.lower()): lower_case_begin_words_count = ( self.get_lower_case_begin_words_count(line) ) if lower_case_begin_words_count > 3: continue if line.upper() == line: find = True break if index != 0: lower_case_begin_words_count = ( self.get_lower_case_begin_words_count(text_list[index - 1]) ) if lower_case_begin_words_count > 3: continue if index > 5: if "." in line or "," in line: continue find = True break if find: break return find def get_lower_case_begin_words_count(self, text): count = 0 for word in text.split(): if word[0].islower(): count += 1 return count def process_data( self, output_file: str, dp_Value_info: dict, pages: Tuple = None, action: str = "Highlight", ): """ Process the pages of the PDF File 1. Open the input file. 2. Create a memory buffer for storing temporarily the output file. 3. Initialize a variable for storing the total number of matches of the string we were searching for. 4. Iterate throughout the selected pages of the input file and split the current page into lines. 5. Search for the string within the page. 6. Apply the corresponding action (i.e "Redact", "Frame", "Highlight", etc.) 7. Display a message signaling the status of the search process. 8. Save and close the input file. 9. Save the memory buffer to the output file. output_file: The path of the PDF file to generate after processing. dp_Value_info: The information for data points. pages: The pages to consider while processing the PDF file. action: The action to perform on the PDF file. """ logger.info(f"Processing {self.pdf_file}") data_list = [] try: # Save the generated PDF to memory buffer pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") output_buffer = BytesIO() find_value_dp_list = [] matching_val_area_list = [] page_list = [i for i in range(pdf_doc.page_count)] dp_range_page_list = {} for dp_name, dp_detail in dp_Value_info.items(): if not isinstance(dp_detail, dict): continue page_range_start_keywords = dp_detail.get( "page_range_start_keywords", [] ) page_range_end_keywords = dp_detail.get("page_range_end_keywords", []) if ( len(page_range_start_keywords) > 0 and len(page_range_end_keywords) > 0 ): page_list, page_text_list = self.get_page_range_by_keywords( pdf_doc, page_range_start_keywords, page_range_end_keywords, return_page_text_list=False, ) dp_range_page_list[dp_name] = page_list # Iterate through pages next_page_found_lines = [] for page_index in page_list: # If required for specific pages if pages: if page_index not in pages: continue # Select the page page = pdf_doc[page_index] # Get Matching Data # Split page by lines page_text = page.get_text("text") # if page_index in [24, 25]: # print(page_text) for dp_name, dp_detail in dp_Value_info.items(): if not isinstance(dp_detail, dict): continue dp_biz_name = dp_detail.get("biz_name", "") dp_level = dp_detail.get("level", "") dp_value = dp_detail.get("value", "") value_text_type = dp_detail.get("value_text_type", "string") if value_text_type == "string": dp_value_text = dp_detail.get("value_text", "") elif value_text_type == "list": dp_value_text = dp_detail.get("value_text", []) else: dp_value_text = dp_detail.get("value_text", "") value_text_structure = dp_detail.get("value_text_structure", "word") inner_context_regex = dp_detail.get("inner_context_regex", "") text_value_dict = dp_detail.get("text_value_dict", {}) search_str_list = dp_detail.get("regex_list", []) only_hightlight_value_text = dp_detail.get( "only_hightlight_value_text", False ) only_hightlight_first = dp_detail.get( "only_hightlight_first", False ) # logger.info(f"Processing Data Point: {dp_name}") page_range_start_keywords = dp_detail.get( "page_range_start_keywords", [] ) page_range_end_keywords = dp_detail.get( "page_range_end_keywords", [] ) if ( len(page_range_start_keywords) > 0 and len(page_range_end_keywords) > 0 ): if not page_index in dp_range_page_list.get(dp_name, []): continue # find_value = False if value_text_structure == "paragraph": found = dp_detail.get("found", False) if found: continue if ( dp_detail.get("matched_page", -1) != -1 and len(dp_detail.get("bbox_list", [])) > 0 ): continue sibling_paragraph_text_list = dp_detail.get( "sibling_paragraph_text_list", [] ) content = { "data_point": dp_biz_name, "data_point_db_name": dp_name, "data_point_level": dp_level, "found_page": page_index, "bbox": None, } found_dict = self.highlight_matching_paragraph_text( pdf_doc=pdf_doc, page_index=page_index, search_paragraph_text=dp_value_text, sibling_paragraph_text_list=sibling_paragraph_text_list, next_page_found_lines=next_page_found_lines, content=content, title=dp_biz_name, ) found_lines_dict = found_dict.get("found_lines_dict", {}) exact_match = found_dict.get("exact_match", True) exact_match_search_paragraph_text = found_dict.get( "exact_match_search_paragraph_text", dp_value_text ) found_lines_dict_keys = list(found_lines_dict.keys()) if len(found_lines_dict_keys) > 0: found_next_page_lines = False if exact_match: dp_detail["found"] = True for found_page_index, found_lines_info in found_lines_dict.items(): bbox_list = found_lines_info.get("bbox_list", []) bbox_normalized_list = self.get_bbox_normalized( page, bbox_list ) found_lines = found_lines_info.get("found_lines", []) if found_page_index == page_index + 1: next_page_found_lines = found_lines found_next_page_lines = True found_text = " ".join(found_lines).strip() data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": dp_value_text, "ground_truth_text": dp_value_text, "search_str": "", "found_page": found_page_index, "found_value": found_text, "found_value_context": found_text, "found_bbox": bbox_list, "found_bbox_normalized": bbox_normalized_list, "output_file": output_file, "action": action, "comment": "found page number is page number, as page number starts from 0.", } if dp_name not in find_value_dp_list: find_value_dp_list.append(dp_name) data_list.append(data) else: for dp_name, dp_detail in dp_Value_info.items(): value_text_structure = dp_detail.get("value_text_structure", "word") if value_text_structure == "paragraph": dp_value_text = dp_detail.get("value_text", "") if dp_value_text == exact_match_search_paragraph_text: dp_detail["found"] = True break for found_page_index, found_lines_info in found_lines_dict.items(): bbox_list = found_lines_info.get("bbox_list", []) bbox_normalized_list = self.get_bbox_normalized( page, bbox_list ) found_lines = found_lines_info.get("found_lines", []) if found_page_index == page_index + 1: next_page_found_lines = found_lines found_next_page_lines = True found_text = " ".join(found_lines).strip() data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": exact_match_search_paragraph_text, "ground_truth_text": exact_match_search_paragraph_text, "search_str": "", "found_page": found_page_index, "found_value": found_text, "found_value_context": found_text, "found_bbox": bbox_list, "found_bbox_normalized": bbox_normalized_list, "output_file": output_file, "action": action, "comment": "found page number is page number, as page number starts from 0.", } if dp_name not in find_value_dp_list: find_value_dp_list.append(dp_name) data_list.append(data) if not found_next_page_lines: next_page_found_lines = [] else: if search_str_list is not None and len(search_str_list) > 0: matched_blocks = [] for search_str in search_str_list: found_blocks = self.search_for_text( page_text, search_str ) if found_blocks: matched_blocks.extend(found_blocks) else: # get matched blocks by similarity for dp_value_text # the data point value is string, not totally same as the value in the database. # such as, dp_name is FundName or ShareClassName or Advisor or Strategy matched_blocks = [] if matched_blocks: for matched_block in matched_blocks: dp_value = "" if inner_context_regex != "": dp_value_text_search = re.search(inner_context_regex, matched_block, re.IGNORECASE) if dp_value_text_search is not None: dp_value_text = dp_value_text_search.group(0).strip() # remove special characters dp_value_text = re.sub(r"\W+", " ", dp_value_text).strip() if dp_value_text == "may": continue if dp_value_text != "" and len(text_value_dict.keys()) > 0: dp_value = text_value_dict.get(dp_value_text.lower(), "") else: dp_value_text = matched_block dp_value = matched_block else: if dp_value_text == "": dp_value_text = matched_block dp_value = matched_block content = { "data_point_name": dp_biz_name, "data_point_db_name": dp_name, "data_point_level": dp_level, "page_index": page_index, "dp_value_text": dp_value_text, "dp_value": dp_value, "bbox": None, } if only_hightlight_value_text and dp_value_text != "" and dp_value_text != matched_block: matching_val_area = self.highlight_matching_data( page=page, text_block=matched_block, highlight_text_inside_block=dp_value_text, content=content, title=dp_biz_name, only_hightlight_first=only_hightlight_first, ) else: matching_val_area = self.highlight_matching_data( page=page, text_block=matched_block, highlight_text_inside_block=None, content=content, title=dp_biz_name, only_hightlight_first=only_hightlight_first, ) if len(matching_val_area) > 0: matching_val_area_list.extend(matching_val_area) if len(matching_val_area) > 0: bbox_list = [] for area in matching_val_area: bbox_list.append( [area.x0, area.y0, area.x1, area.y1] ) bbox_normalized_list = self.get_bbox_normalized( page, bbox_list ) data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": dp_value, "ground_truth_text": dp_value_text, "search_str": search_str, "found_page": page_index, "found_value_text": dp_value_text, "found_value": dp_value, "found_value_context": matched_block.strip(), "found_bbox": bbox_list, "found_bbox_normalized": bbox_normalized_list, "output_file": output_file, "action": action, "comment": "found page number is page number, as page number starts from 0.", } if dp_name not in find_value_dp_list: find_value_dp_list.append(dp_name) data_list.append(data) # find_value = True if only_hightlight_first: break if len(find_value_dp_list) == 0: output_file = "" for dp_name, dp_detail in dp_Value_info.items(): if dp_name not in find_value_dp_list: dp_biz_name = dp_detail.get("biz_name", "") dp_level = dp_detail.get("level", "") dp_value = dp_detail.get("value", "") dp_value_text = dp_detail.get("value_text", "") data = { "pdf_file": self.simple_pdf_file, "dp_name": dp_name, "dp_biz_name": dp_biz_name, "dp_level": dp_level, "ground_truth": dp_value, "ground_truth_text": dp_value_text, "search_str": "", "found_page": -1, "found_value_text": "", "found_value": -1, "found_value_context": "", "found_bbox": [], "found_bbox_normalized": [], "output_file": output_file, "action": action, "comment": f"Not found {dp_biz_name} in the document.", } data_list.append(data) logger.info( f"{len(matching_val_area_list)} Match(es) In Input File: {self.pdf_file}" ) # Save to output pdf_doc.save(output_buffer) pdf_doc.close() if len(find_value_dp_list) > 0 and \ output_file is not None and \ output_file != "": # Save the output buffer to the output file with open(output_file, mode="wb") as f: f.write(output_buffer.getbuffer()) logger.info(f"File saved to {output_file}") except Exception as e: logger.error(f"Error processing file: {e}") print_exc() if len(data_list) == 0: data = { "pdf_file": self.simple_pdf_file, "dp_name": "", "dp_biz_name": "", "dp_level": "", "ground_truth": "", "ground_truth_text": "", "search_str": "", "found_page": -1, "found_value_text": "", "found_value": -1, "found_value_context": "", "found_bbox": [], "found_bbox_normalized": [], "output_file": output_file, "action": action, "comment": "", } data_list.append(data) return data_list def get_bbox_normalized(self, page, bbox): page_width = page.rect.width page_height = page.rect.height bbox_normalized = [] for box in bbox: x0 = box[0] / page_width y0 = box[1] / page_height x1 = box[2] / page_width y1 = box[3] / page_height bbox_normalized.append([x0, y0, x1, y1]) return bbox_normalized def find_value_by_regex(self, page_text, search_str: str): pass def get_high_similarity_text( self, page_text, search_str: str, threshold: float = 0.8 ): matched_values = [] page_text_list = page_text.split("\n") return matched_values def remove_highlght(self, output_file: str, pages: Tuple = None): """ 1. Open the input file. 2. Create a memory buffer for storing temporarily the output file. 3. Iterate throughout the pages of the input file and checks if annotations are found. 4. Delete these annotations. 5. Display a message signaling the status of this process. 6. Close the input file. 7. Save the memory buffer to the output file. """ logger.info(f"Removing Highlights from {self.pdf_file}") try: # Save the generated PDF to memory buffer pdf_doc = fitz.open(self.pdf_file) try: pdf_encrypted = pdf_doc.isEncrypted except: pdf_encrypted = pdf_doc.is_encrypted if pdf_encrypted: pdf_doc.authenticate("") output_buffer = BytesIO() # Initialize a counter for annotations annot_found = 0 # Iterate through pages for pg in range(pdf_doc.page_count): # If required for specific pages if pages: if str(pg) not in pages: continue # Select the page page = pdf_doc[pg] annot = page.first_annot while annot: annot_found += 1 page.delete_annot(annot) annot = annot.next if annot_found >= 0: print(f"Annotation(s) Found In The Input File: {self.pdf_file}") # Save to output pdf_doc.save(output_buffer) pdf_doc.close() # Save the output buffer to the output file with open(output_file, mode="wb") as f: f.write(output_buffer.getbuffer()) except Exception as e: logger.error(f"Error removing highlights: {e}") print_exc() def process_file( self, output_file: str, dp_Value_info: dict = None, pages: Tuple = None, action: str = "Highlight", ): """ To process one single file Redact, Frame, Highlight... one PDF File Remove Highlights from a single PDF File action: Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove """ logger.info(f"Processing {self.pdf_file}") if output_file is None: output_file = self.pdf_file data_list = [] # Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove if action == "Remove": # Remove the Highlights except Redactions self.remove_highlght(output_file=output_file, pages=pages) else: data_list = self.process_data( output_file=output_file, dp_Value_info=dp_Value_info, pages=pages, action=action, ) return data_list def pdf_to_html_with_docid(doc_id, para): headers = { 'user': 'visitor', 'Accept': 'application/json', } args = { 'docId': doc_id, 'parameters': json.dumps(para) } pdf2html_url = os.getenv("pdf2html_url") response = requests.post(pdf2html_url, data=args, headers=headers) response.encoding = 'utf-8' text = response.text return text def pdf_to_html(pdf_path, para): headers = { "user": "visitor", "Accept": "application/json", } args = { "parameters": json.dumps(para) } with open(pdf_path, mode='rb') as f: file_bytes = f.read() files = {"file": ("tempName.pdf", file_bytes)} pdf2html_url = os.getenv("pdf2html_url") response = requests.post(pdf2html_url, data=args, files=files, headers=headers) response.encoding = 'utf-8' text = response.text return text def get_pdf_pages_by_html(pdf_info: str, pdf_info_type: str="doc_id"): # Convert pdf to html para = { "detectTable": True } if pdf_info_type == "doc_id": html = pdf_to_html_with_docid(pdf_info, para) else: html = pdf_to_html(pdf_info, para) html = BeautifulSoup(html, 'html.parser') pages = html.find_all('div', attrs={'page-idx': True}) page_text_dict = {} for index, page in enumerate(pages): page_text_dict[index] = page.get_text().strip() return page_text_dict