diff --git a/main.py b/main.py index 2cbd4c3..84a4b65 100644 --- a/main.py +++ b/main.py @@ -137,8 +137,7 @@ class EMEA_AR_Parsing: def drilldown_pdf_document(self, data_from_gpt: list) -> list: logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}") pdf_util = PDFUtil(self.pdf_file) - pdf_doc = self.get_pdf_doc(self.pdf_file) - highlight_annotation = False + drilldown_data_list = [] for data in data_from_gpt: page_index = data.get("page_index", -1) if page_index == -1: @@ -152,104 +151,32 @@ class EMEA_AR_Parsing: continue if data_point in ["ter", "ogc", "performance_fee"]: continue - drilldown_data = self.highlight_pdf_doc(pdf_doc=pdf_doc, - page_index=page_index, - highlight_value=value, - data_point=data_point, - pdf_util=pdf_util) - if len(drilldown_data.get("matching_val_area", [])) > 0: - highlight_annotation = True + drilldown_data = { + "page_index": page_index, + "data_point": data_point, + "parent_text_block": None, + "value": value, + "annotation_attribute": {} + } + drilldown_data_list.append(drilldown_data) highlighted_value_list.append(value) for data_point, reported_name in dp_reported_name_dict.items(): if reported_name in highlighted_value_list: continue data_point = f"{data_point}_reported_name" - drilldown_data = self.highlight_pdf_doc(pdf_doc=pdf_doc, - page_index=page_index, - highlight_value=reported_name, - data_point=data_point, - pdf_util=pdf_util) - if len(drilldown_data.get("matching_val_area", [])) > 0: - highlight_annotation = True + drilldown_data = { + "page_index": page_index, + "data_point": data_point, + "parent_text_block": None, + "value": reported_name, + "annotation_attribute": {} + } + drilldown_data_list.append(drilldown_data) highlighted_value_list.append(reported_name) - if highlight_annotation: - annotated_pdf_file = self.save_annotated_pdf(pdf_doc) - return annotated_pdf_file - - def highlight_pdf_doc(self, - pdf_doc: fitz.Document, - page_index: int, - highlight_value: str, - data_point: str = None, - pdf_util: PDFUtil = None,): - page = pdf_doc[page_index] - page_text = page.get_text() - highlight_value = str(highlight_value) - highlight_value_regex = add_slash_to_text_as_regex(highlight_value) - highlight_value_search = re.search(highlight_value_regex, page_text) - highlight_value_search_text = None - if highlight_value_search is not None: - highlight_value_search_text = highlight_value_search.group() - drilldown_data = {"DocumentId": self.doc_id, - "page_index": page_index, - "data_point": data_point, - "value": highlight_value, - "matching_val_area": []} - if highlight_value_search_text is not None: - content = { - "data_point": data_point, - "data_value": highlight_value - } - matching_val_area = pdf_util.highlight_matching_data( - page=page, - text_block=highlight_value_search_text, - content=content, - title=data_point, - only_hightlight_first=False, - merge_nearby_lines=False - ) - - bbox_list = [] - for area in matching_val_area: - bbox = [area.x0, area.y0, area.x1, area.y1] - bbox_list.append(bbox) - # order bbox_list by y0, x0, y1, x1 - bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2])) - drilldown_data["matching_val_area"] = bbox_list - return drilldown_data - - def get_pdf_doc(self, pdf_file): - pdf_doc = fitz.open(pdf_file) - try: - pdf_encrypted = pdf_doc.isEncrypted - except: - pdf_encrypted = pdf_doc.is_encrypted - if pdf_encrypted: - pdf_doc.authenticate("") - return pdf_doc - - def save_annotated_pdf(self, pdf_doc: fitz.Document): - try: - if pdf_doc is None and pdf_doc.is_closed: - return - pdf_file_name = os.path.basename(self.pdf_file) - pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf") - output_pdf_dir = os.path.join(self.drilldown_folder, "pdf/") - os.makedirs(output_pdf_dir, exist_ok=True) - pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name) - output_buffer = BytesIO() - pdf_doc.save(output_buffer) - - # Save the output buffer to the output file - with open(pdf_file_path, mode="wb") as f: - f.write(output_buffer.getbuffer()) - pdf_doc.close() - logger.info(f"File saved to {pdf_file_path}") - return pdf_file_path - except Exception as e: - print_exc() - logger.error(f"Error when save output file: {e}") + + drilldown_result = pdf_util.batch_drilldown(drilldown_data_list=drilldown_data_list, + output_pdf_folder=self.drilldown_folder) def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list: if not re_run: @@ -1183,6 +1110,7 @@ if __name__ == "__main__": "546046730", "546919329" ] + special_doc_id_list = ["501380497"] output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/" output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/" re_run_extract_data = False diff --git a/utils/pdf_util.py b/utils/pdf_util.py index d643d40..05780c8 100644 --- a/utils/pdf_util.py +++ b/utils/pdf_util.py @@ -266,6 +266,170 @@ class PDFUtil: content_text = json.dumps(content) highlight.set_info(content=content_text, title=title) highlight.update() + + def batch_drilldown(self, + drilldown_data_list: list, + output_pdf_folder: str = None): + pdf_doc = fitz.open(self.pdf_file) + annotation_list = [] + for drilldown_data in drilldown_data_list: + page_index = drilldown_data["page_index"] + data_point = drilldown_data["data_point"] + if isinstance(data_point, list): + data_point = ", ".join(data_point) + parent_text_block = drilldown_data.get("parent_text_block", None) + highlight_value = drilldown_data["value"] + annotation_attribute = drilldown_data.get("annotation_attribute", {}) + if isinstance(highlight_value, str): + annotation_list.append(self.highlight_pdf_doc( + pdf_doc=pdf_doc, + page_index=page_index, + highlight_value=highlight_value, + parent_text_block=parent_text_block, + data_point=data_point, + annotation_attribute=annotation_attribute + )) + elif isinstance(highlight_value, list): + for value in highlight_value: + annotation_list.append(self.highlight_pdf_doc( + pdf_doc=pdf_doc, + page_index=page_index, + highlight_value=value, + parent_text_block=parent_text_block, + data_point=data_point, + annotation_attribute=annotation_attribute + )) + elif isinstance(highlight_value, dict): + for key, value in highlight_value.items(): + annotation_list.append(self.highlight_pdf_doc( + pdf_doc=pdf_doc, + page_index=page_index, + highlight_value=value, + parent_text_block=parent_text_block, + data_point=f"{data_point}, {key}", + annotation_attribute=annotation_attribute + )) + else: + pass + if output_pdf_folder is not None and len(output_pdf_folder) > 0: + os.makedirs(output_pdf_folder, exist_ok=True) + pdf_file_path = self.save_annotated_pdf(pdf_doc=pdf_doc, + output_pdf_folder=output_pdf_folder) + result = {"drilldown_pdf_doc": pdf_doc, + "annotation_list": annotation_list} + return result + + def save_annotated_pdf(self, pdf_doc: fitz.Document, output_pdf_folder: str): + try: + if output_pdf_folder is None or len(output_pdf_folder) == 0 or not os.path.exists(output_pdf_folder): + return + if pdf_doc is None and pdf_doc.is_closed: + return + pdf_file_name = os.path.basename(self.pdf_file) + pdf_file_name = pdf_file_name.replace(".pdf", "_annotated.pdf") + output_pdf_dir = os.path.join(output_pdf_folder, "pdf/") + os.makedirs(output_pdf_dir, exist_ok=True) + pdf_file_path = os.path.join(output_pdf_dir, pdf_file_name) + output_buffer = BytesIO() + pdf_doc.save(output_buffer) + + # Save the output buffer to the output file + with open(pdf_file_path, mode="wb") as f: + f.write(output_buffer.getbuffer()) + pdf_doc.close() + logger.info(f"File saved to {pdf_file_path}") + return pdf_file_path + except Exception as e: + print_exc() + logger.error(f"Error when save output file: {e}") + + def highlight_pdf_doc(self, + pdf_doc: fitz.Document, + page_index: int, + highlight_value: str, + parent_text_block: str = None, + data_point: str = None, + annotation_attribute: dict = {}): + page = pdf_doc[page_index] + page_text = page.get_text() + + parent_text_block_search_text = None + if parent_text_block is not None: + parent_text_block_regex = self.add_slash_to_text_as_regex(parent_text_block) + parent_text_block_search = re.search(parent_text_block_regex, page_text) + parent_text_block_search_text = None + if parent_text_block_search is not None: + parent_text_block_search_text = parent_text_block_search.group() + + highlight_value_search_text = "" + if highlight_value is not None: + highlight_value_regex = self.add_slash_to_text_as_regex(highlight_value) + if len(highlight_value.strip().split()) == 1 and len(highlight_value.strip()) < 3: + highlight_value_search = re.search(highlight_value_regex, page_text) + else: + highlight_value_search = re.search(highlight_value_regex, page_text, re.IGNORECASE) + if highlight_value_search is not None: + highlight_value_search_text = highlight_value_search.group() + + annotation_data = {"pdf_file": self.simple_pdf_file, + "page_index": page_index, + "data_point": data_point, + "value": highlight_value, + "matching_val_area": []} + if highlight_value_search_text is not None: + content = { + "data_point": data_point, + "data_value": highlight_value + } + # append annotation_attribute to content + content.update(annotation_attribute) + + if len(highlight_value_search_text.strip().split()) > 3: + merge_nearby_lines = True + else: + merge_nearby_lines = False + if parent_text_block_search_text is not None: + matching_val_area = self.highlight_matching_data( + page=page, + text_block=parent_text_block_search_text, + highlight_text_inside_block=highlight_value_search_text, + content=content, + title=data_point, + only_hightlight_first=False, + merge_nearby_lines=merge_nearby_lines + ) + else: + matching_val_area = self.highlight_matching_data( + page=page, + text_block=highlight_value_search_text, + content=content, + title=data_point, + only_hightlight_first=False, + merge_nearby_lines=merge_nearby_lines + ) + + bbox_list = [] + for area in matching_val_area: + bbox = [area.x0, area.y0, area.x1, area.y1] + bbox_list.append(bbox) + # order bbox_list by y0, x0, y1, x1 + bbox_list = sorted(bbox_list, key=lambda x: (x[1], x[0], x[3], x[2])) + annotation_data["matching_val_area"] = bbox_list + return annotation_data + + def add_slash_to_text_as_regex(self, text: str): + if text is None or len(text) == 0: + return text + special_char_iter = re.finditer("\W", text) + for special_iter in special_char_iter: + if len(special_iter.group().strip()) == 0: + continue + replace = r"\{0}".format(special_iter.group()) + if replace not in text: + text = re.sub(replace, r"\\W", text) + text = re.sub(r"( ){2,}", " ", text) + text = text.replace(" ", r"\s*\W*") + return text def highlight_matching_data( self, @@ -293,6 +457,8 @@ class PDFUtil: merge_nearby_lines: merge nearby lines or not """ # logger.info(f"Highlighting matching values in {self.pdf_file}") + if text_block is None or len(text_block.strip()) == 0: + return [] if within_bbox is not None: matching_val_area = page.search_for( text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]) @@ -307,6 +473,7 @@ class PDFUtil: matching_val_area = page.search_for(text_block) else: matching_val_area = page.search_for(text_block) + if len(matching_val_area) == 0: matching_val_area = page.search_for(text_block.strip()) if len(matching_val_area) == 0: @@ -318,15 +485,20 @@ class PDFUtil: for area in matching_val_area: # get text by text_bbox pure_text_block = text_block.strip() + raw_area_text = page.get_text("text", clip=area).strip() + if len(text_block.strip()) < 3 and text_block.strip() != raw_area_text.strip(): + continue copy_area = deepcopy(area) - copy_area.x0 -= 10 - copy_area.x1 += 10 + copy_area.x0 -= 15 + copy_area.x1 += 15 text = page.get_text("text", clip=copy_area).strip() if text == pure_text_block: new_matching_val_area.append(area) else: # get start and end index of the pure_text_block in text start_index = text.find(pure_text_block) + if start_index == -1: + continue if start_index > 0: previous_char = text[start_index - 1] if previous_char not in [" ", "("]: @@ -361,8 +533,8 @@ class PDFUtil: for area in text_bbox_area: # get text by text_bbox copy_area = deepcopy(area) - copy_area.x0 -= 10 - copy_area.x1 += 10 + copy_area.x0 -= 15 + copy_area.x1 += 15 text = page.get_text("text", clip=copy_area).strip() if text == highlight_text_inside_block: highlight_bbox_list.append(area)