import os import json import numpy as np import pandas as pd from glob import glob from tqdm import tqdm import time import fitz import re from io import BytesIO from traceback import print_exc from utils.logger import logger from utils.pdf_download import download_pdf_from_documents_warehouse from utils.sql_query_util import query_document_fund_mapping from utils.pdf_util import PDFUtil from utils.biz_utils import add_slash_to_text_as_regex from core.page_filter import FilterPages from core.data_extraction import DataExtraction from core.data_mapping import DataMapping from core.auz_nz.hybrid_solution_script import api_for_fund_matching_call from core.metrics import Metrics class EMEA_AR_Parsing: def __init__( self, doc_id: str, doc_source: str = "emea_ar", pdf_folder: str = r"/data/emea_ar/pdf/", output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/", extract_way: str = "text", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", compare_with_provider: bool = True ) -> None: self.doc_id = doc_id self.doc_source = doc_source self.pdf_folder = pdf_folder os.makedirs(self.pdf_folder, exist_ok=True) self.compare_with_provider = compare_with_provider self.pdf_file = self.download_pdf() self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) if extract_way is None or len(extract_way) == 0: extract_way = "text" self.extract_way = extract_way self.output_extract_image_folder = None if self.extract_way == "image": self.output_extract_image_folder = ( r"/data/emea_ar/output/extract_data/images/" ) os.makedirs(self.output_extract_image_folder, exist_ok=True) if output_extract_data_folder is None or len(output_extract_data_folder) == 0: output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/" if not output_extract_data_folder.endswith("/"): output_extract_data_folder = f"{output_extract_data_folder}/" if extract_way is not None and len(extract_way) > 0: output_extract_data_folder = ( f"{output_extract_data_folder}by_{extract_way}/" ) self.output_extract_data_folder = output_extract_data_folder os.makedirs(self.output_extract_data_folder, exist_ok=True) if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0: output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/" if not output_mapping_data_folder.endswith("/"): output_mapping_data_folder = f"{output_mapping_data_folder}/" if extract_way is not None and len(extract_way) > 0: output_mapping_data_folder = ( f"{output_mapping_data_folder}by_{extract_way}/" ) self.output_mapping_data_folder = output_mapping_data_folder os.makedirs(self.output_mapping_data_folder, exist_ok=True) self.filter_pages = FilterPages( self.doc_id, self.pdf_file, self.document_mapping_info_df, self.doc_source, output_pdf_text_folder, ) self.page_text_dict = self.filter_pages.page_text_dict self.datapoint_page_info, self.result_details = self.get_datapoint_page_info() self.datapoints = self.get_datapoints_from_datapoint_page_info() if drilldown_folder is None or len(drilldown_folder) == 0: drilldown_folder = r"/data/emea_ar/output/drilldown/" os.makedirs(drilldown_folder, exist_ok=True) self.drilldown_folder = drilldown_folder misc_config_file = os.path.join( f"./configuration/{doc_source}/", "misc_config.json" ) if os.path.exists(misc_config_file): with open(misc_config_file, "r", encoding="utf-8") as f: misc_config = json.load(f) self.apply_drilldown = misc_config.get("apply_drilldown", False) else: self.apply_drilldown = False def download_pdf(self) -> str: pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id) return pdf_file def get_datapoint_page_info(self) -> tuple: datapoint_page_info, result_details = self.filter_pages.start_job() return datapoint_page_info, result_details def get_datapoints_from_datapoint_page_info(self) -> list: datapoints = list(self.datapoint_page_info.keys()) if "doc_id" in datapoints: datapoints.remove("doc_id") return datapoints def extract_data( self, re_run: bool = False, ) -> list: found_data = False if not re_run: output_data_json_folder = os.path.join( self.output_extract_data_folder, "json/" ) os.makedirs(output_data_json_folder, exist_ok=True) json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") if os.path.exists(json_file): logger.info( f"The document: {self.doc_id} has been parsed, loading data from {json_file}" ) with open(json_file, "r", encoding="utf-8") as f: data_from_gpt = json.load(f) found_data = True if not found_data: try: data_extraction = DataExtraction( self.doc_source, self.doc_id, self.pdf_file, self.output_extract_data_folder, self.page_text_dict, self.datapoint_page_info, self.datapoints, self.document_mapping_info_df, extract_way=self.extract_way, output_image_folder=self.output_extract_image_folder, ) data_from_gpt = data_extraction.extract_data() except Exception as e: logger.error(f"Error: {e}") print_exc() data_from_gpt = {"data": []} # Drilldown data to relevant PDF document annotation_list = [] if self.apply_drilldown: try: annotation_list = self.drilldown_pdf_document(data_from_gpt) except Exception as e: logger.error(f"Error: {e}") return data_from_gpt, annotation_list def drilldown_pdf_document(self, data_from_gpt: list) -> list: logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}") pdf_util = PDFUtil(self.pdf_file) drilldown_data_list = [] for data in data_from_gpt: doc_id = str(data.get("doc_id", "")) page_index = data.get("page_index", -1) if page_index == -1: continue extract_data_list = data.get("extract_data", {}).get("data", []) dp_reported_name_dict = data.get("extract_data", {}).get( "dp_reported_name", {} ) highlighted_value_list = [] for extract_data in extract_data_list: for data_point, value in extract_data.items(): if value in highlighted_value_list: continue if data_point in ["ter", "ogc", "performance_fee"]: continue drilldown_data = { "doc_id": doc_id, "page_index": page_index, "data_point": data_point, "parent_text_block": None, "value": value, "annotation_attribute": {}, } drilldown_data_list.append(drilldown_data) highlighted_value_list.append(value) for data_point, reported_name in dp_reported_name_dict.items(): if reported_name in highlighted_value_list: continue data_point = f"{data_point}_reported_name" drilldown_data = { "doc_id": doc_id, "page_index": page_index, "data_point": data_point, "parent_text_block": None, "value": reported_name, "annotation_attribute": {}, } drilldown_data_list.append(drilldown_data) highlighted_value_list.append(reported_name) drilldown_result = pdf_util.batch_drilldown( drilldown_data_list=drilldown_data_list, output_pdf_folder=self.drilldown_folder, ) annotation_list = [] if len(drilldown_result) > 0: logger.info(f"Drilldown PDF document for doc_id: {doc_id} successfully") annotation_list = drilldown_result.get("annotation_list", []) for annotation in annotation_list: annotation["doc_id"] = doc_id if self.drilldown_folder is not None and len(self.drilldown_folder) > 0: drilldown_data_folder = os.path.join(self.drilldown_folder, "data/") os.makedirs(drilldown_data_folder, exist_ok=True) drilldown_file = os.path.join( drilldown_data_folder, f"{doc_id}_drilldown.xlsx" ) drilldown_source_df = pd.DataFrame(drilldown_data_list) annotation_list_df = pd.DataFrame(annotation_list) # set drilldown_result_df column order as doc_id, pdf_file, page_index, # data_point, value, matching_val_area, normalized_bbox try: annotation_list_df = annotation_list_df[ [ "doc_id", "pdf_file", "page_index", "data_point", "value", "matching_val_area", "normalized_bbox", ] ] except Exception as e: logger.error(f"Error: {e}") logger.info(f"Writing drilldown data to {drilldown_file}") try: with pd.ExcelWriter(drilldown_file) as writer: drilldown_source_df.to_excel( writer, index=False, sheet_name="source_data" ) annotation_list_df.to_excel( writer, index=False, sheet_name="drilldown_data" ) except Exception as e: logger.error(f"Error: {e}") annotation_list = annotation_list_df.to_dict(orient="records") try: drilldown_json_file = os.path.join( drilldown_data_folder, f"{doc_id}_drilldown.json" ) with open(drilldown_json_file, "w", encoding="utf-8") as f: json.dump(annotation_list, f, ensure_ascii=False, indent=4) except Exception as e: logger.error(f"Error: {e}") return annotation_list def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list: if not re_run: output_data_json_folder = os.path.join( self.output_mapping_data_folder, "json/" ) os.makedirs(output_data_json_folder, exist_ok=True) json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") if os.path.exists(json_file): logger.info( f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}" ) with open(json_file, "r", encoding="utf-8") as f: doc_mapping_data = json.load(f) return doc_mapping_data """ doc_id, datapoints: list, raw_document_data_list: list, document_mapping_info_df: pd.DataFrame, output_data_folder: str, """ data_mapping = DataMapping( self.doc_id, self.datapoints, data_from_gpt, self.document_mapping_info_df, self.output_mapping_data_folder, self.doc_source, compare_with_provider=self.compare_with_provider ) return data_mapping.mapping_raw_data_entrance() def filter_pages(doc_id: str, pdf_folder: str, doc_source: str) -> None: logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder ) datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info() return datapoint_page_info, result_details def extract_data( doc_id: str, doc_source: str, pdf_folder: str, output_data_folder: str, extract_way: str = "text", re_run: bool = False, ) -> None: logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_extract_data_folder=output_data_folder, extract_way=extract_way, ) data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run) return data_from_gpt, annotation_list def mapping_data( doc_id: str, pdf_folder: str, output_pdf_text_folder: str, output_extract_data_folder: str, output_mapping_folder: str, doc_source: str = "emea_ar", extract_way: str = "text", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", re_run_extract_data: bool = False, re_run_mapping_data: bool = False, ) -> None: logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_folder, output_mapping_data_folder=output_mapping_folder, extract_way=extract_way, drilldown_folder=drilldown_folder, compare_with_provider=False ) doc_data_from_gpt, annotation_list = emea_ar_parsing.extract_data( re_run=re_run_extract_data ) doc_mapping_data = emea_ar_parsing.mapping_data( data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data ) return doc_data_from_gpt, annotation_list, doc_mapping_data def batch_extract_data( pdf_folder: str, doc_source: str = "emea_ar", doc_data_excel_file: str = None, output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_total_folder: str = r"/data/emea_ar/output/extract_data/total/", extract_way: str = "text", special_doc_id_list: list = None, re_run: bool = False, ) -> None: pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] if doc_list is not None and doc_id not in doc_list: continue data_from_gpt = extract_data( doc_id=doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_data_folder=output_child_folder, extract_way=extract_way, re_run=re_run, ) result_list.extend(data_from_gpt) if special_doc_id_list is None or len(special_doc_id_list) == 0: result_df = pd.DataFrame(result_list) result_df.reset_index(drop=True, inplace=True) logger.info(f"Saving the result to {output_total_folder}") os.makedirs(output_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_total_folder, f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_df.to_excel(writer, index=False, sheet_name="extract_data_info") def batch_start_job( doc_source: str = "emea_ar", pdf_folder: str = "/data/emea_ar/pdf/", output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", doc_data_excel_file: str = None, document_mapping_file: str = None, output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/", output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/", extract_way: str = "text", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", special_doc_id_list: list = None, re_run_extract_data: bool = False, re_run_mapping_data: bool = False, force_save_total_data: bool = False, calculate_metrics: bool = False, total_data_prefix: str = None, ): pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] doc_list.append(doc_id) if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_extract_data_list = [] result_mapping_data_list = [] for doc_id in tqdm(doc_list): try: doc_data_from_gpt, annotation_list, doc_mapping_data_list = mapping_data( doc_id=doc_id, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_child_folder, output_mapping_folder=output_mapping_child_folder, doc_source=doc_source, extract_way=extract_way, drilldown_folder=drilldown_folder, re_run_extract_data=re_run_extract_data, re_run_mapping_data=re_run_mapping_data, ) result_extract_data_list.extend(doc_data_from_gpt) result_mapping_data_list.extend(doc_mapping_data_list) except Exception as e: logger.error(f"Document: {doc_id} met error: {e}") print_exc() if force_save_total_data or ( special_doc_id_list is None or len(special_doc_id_list) == 0 ): result_extract_data_df = pd.DataFrame(result_extract_data_list) result_extract_data_df.reset_index(drop=True, inplace=True) result_mappingdata_df = pd.DataFrame(result_mapping_data_list) result_mappingdata_df.reset_index(drop=True, inplace=True) logger.info(f"Saving extract data to {output_extract_data_total_folder}") unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist() os.makedirs(output_extract_data_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) file_name = f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx" if total_data_prefix is not None and len(total_data_prefix) > 0: file_name = f"{total_data_prefix}_{file_name}" output_file = os.path.join(output_extract_data_total_folder, file_name) with pd.ExcelWriter(output_file) as writer: result_extract_data_df.to_excel( writer, index=False, sheet_name="extract_data_info" ) logger.info(f"Saving mapping data to {output_mapping_total_folder}") unique_doc_ids = result_mappingdata_df["doc_id"].unique().tolist() os.makedirs(output_mapping_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) file_name = f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx" if total_data_prefix is not None and len(total_data_prefix) > 0: file_name = f"{total_data_prefix}_{file_name}" output_file = os.path.join(output_mapping_total_folder, file_name) doc_mapping_data_in_db = only_output_mapping_data_in_db(result_mappingdata_df) with pd.ExcelWriter(output_file) as writer: doc_mapping_data_in_db.to_excel( writer, index=False, sheet_name="data_in_doc_mapping" ) result_mappingdata_df.to_excel( writer, index=False, sheet_name="total_mapping_data" ) result_extract_data_df.to_excel( writer, index=False, sheet_name="extract_data" ) if ( doc_source == "aus_prospectus" and document_mapping_file is not None and len(document_mapping_file) > 0 and os.path.exists(document_mapping_file) ): try: merged_total_data_folder = os.path.join( output_mapping_total_folder, "merged/" ) os.makedirs(merged_total_data_folder, exist_ok=True) data_file_base_name = os.path.basename(output_file) output_merged_data_file_path = os.path.join( merged_total_data_folder, "merged_" + data_file_base_name ) merge_output_data_aus_prospectus( output_file, document_mapping_file, output_merged_data_file_path ) except Exception as e: logger.error(f"Error: {e}") if calculate_metrics: prediction_sheet_name = "data_in_doc_mapping" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" ground_truth_sheet_name = "mapping_data" metrics_output_folder = r"/data/emea_ar/output/metrics/" # logger.info(f"Calculating metrics for data extraction") # missing_error_list, metrics_list, metrics_file = get_metrics( # "data_extraction", # output_file, # prediction_sheet_name, # ground_truth_file, # ground_truth_sheet_name, # metrics_output_folder, # ) # logger.info(f"Calculating metrics for investment mapping by actual document mapping") # missing_error_list, metrics_list, metrics_file = get_metrics( # "investment_mapping", # output_file, # prediction_sheet_name, # ground_truth_file, # ground_truth_sheet_name, # metrics_output_folder, # ) logger.info( f"Calculating metrics for investment mapping by database document mapping" ) missing_error_list, metrics_list, metrics_file = get_metrics( "document_mapping_in_db", output_file, prediction_sheet_name, ground_truth_file, ground_truth_sheet_name, metrics_output_folder, ) def only_output_mapping_data_in_db(mapping_data: pd.DataFrame) -> None: doc_id_list = mapping_data["doc_id"].unique().tolist() data_in_mapping_df_list = [] for doc_id in doc_id_list: doc_mapping_data = mapping_data[mapping_data["doc_id"] == doc_id] document_mapping = query_document_fund_mapping(doc_id, rerun=False) fund_id_list = document_mapping["FundId"].unique().tolist() sec_id_list = document_mapping["SecId"].unique().tolist() id_list = fund_id_list + sec_id_list # filter doc_mapping_data by id_list or empty id filter_doc_mapping_data = doc_mapping_data[ (doc_mapping_data["investment_id"].isin(id_list)) | (doc_mapping_data["investment_id"] == "") ] data_in_mapping_df_list.append(filter_doc_mapping_data) result_mapping_data_df = pd.concat(data_in_mapping_df_list) result_mapping_data_df.reset_index(drop=True, inplace=True) return result_mapping_data_df def batch_filter_pdf_files( pdf_folder: str, doc_source: str = "emea_ar", doc_data_excel_file: str = None, output_folder: str = r"/data/emea_ar/output/filter_pages/", special_doc_id_list: list = None, ) -> None: pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_list = [] result_details = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] if doc_list is not None and doc_id not in doc_list: continue doc_datapoint_page_info, doc_result_details = filter_pages( doc_id=doc_id, pdf_folder=pdf_folder, doc_source=doc_source ) result_list.append(doc_datapoint_page_info) result_details.extend(doc_result_details) result_df = pd.DataFrame(result_list) result_df.reset_index(drop=True, inplace=True) result_details_df = pd.DataFrame(result_details) result_details_df.reset_index(drop=True, inplace=True) logger.info(f"Saving the result to {output_folder}") os.makedirs(output_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_folder, f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_df.to_excel(writer, index=False, sheet_name="dp_page_info") result_details_df.to_excel( writer, index=False, sheet_name="dp_page_info_details" ) if len(special_doc_id_list) == 0: logger.info(f"Calculating metrics for {output_file}") metrics_output_folder = r"/data/emea_ar/output/metrics/" missing_error_list, metrics_list, metrics_file = get_metrics( data_type="page_filter", prediction_file=output_file, prediction_sheet_name="dp_page_info", ground_truth_file=doc_data_excel_file, output_folder=metrics_output_folder, ) return missing_error_list, metrics_list, metrics_file def get_metrics( data_type: str, prediction_file: str, prediction_sheet_name: str, ground_truth_file: str, ground_truth_sheet_name: str = None, output_folder: str = None, ) -> None: metrics = Metrics( data_type=data_type, prediction_file=prediction_file, prediction_sheet_name=prediction_sheet_name, ground_truth_file=ground_truth_file, ground_truth_sheet_name=ground_truth_sheet_name, output_folder=output_folder, ) missing_error_list, metrics_list, metrics_file = metrics.get_metrics( strict_model=False ) return missing_error_list, metrics_list, metrics_file def test_auto_generate_instructions(): """ doc_id: str, pdf_file: str, page_text_dict: dict, datapoint_page_info: dict, document_mapping_info_df: pd.DataFrame """ doc_id = "402397014" pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf" document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df) page_text_dict = filter_pages.page_text_dict datapoint_page_info, datapoint_page_info_details = filter_pages.start_job() datapoint_list = list(datapoint_page_info.keys()) datapoint_list.remove("doc_id") data_extraction = DataExtraction( "emear_ar", doc_id, pdf_file, page_text_dict, datapoint_page_info, document_mapping_info_df, ) page_index_list = list(page_text_dict.keys()) if len(page_index_list) > 0: page_text = "" for datapoint in datapoint_list: if len(datapoint_page_info[datapoint]) > 0: page_index_list = datapoint_page_info[datapoint] page_text = page_text_dict[page_index_list[0]] break output_folder = ( r"/data/emea_ar/basic_information/prompts_example/generate_by_config/" ) os.makedirs(output_folder, exist_ok=True) tor_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["tor"] ) with open( os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(tor_instructions_text) ter_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ter"] ) with open( os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(ter_instructions_text) ogc_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ogc"] ) with open( os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(ogc_instructions_text) performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["performance_fee"] ) ) with open( os.path.join(output_folder, "performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(performance_fee_instructions_text) ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ter", "ogc"] ) with open( os.path.join(output_folder, "ter_ogc_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ter_ogc_instructions_text) ter_performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["ter", "performance_fee"] ) ) with open( os.path.join(output_folder, "ter_performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ter_performance_fee_instructions_text) ogc_ter_performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["ogc", "ter", "performance_fee"] ) ) with open( os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ogc_ter_performance_fee_instructions_text) def test_data_extraction_metrics(): data_type = "document_mapping_in_db" # prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx" prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_51_documents_by_text_20250127104008.xlsx" # prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx" prediction_sheet_name = "data_in_doc_mapping" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" ground_truth_sheet_name = "mapping_data" metrics_output_folder = r"/data/emea_ar/output/metrics/" missing_error_list, metrics_list, metrics_file = get_metrics( data_type, prediction_file, prediction_sheet_name, ground_truth_file, ground_truth_sheet_name, metrics_output_folder, ) def test_mapping_raw_name(): doc_id = "337293427" # KBC Bonds Inflation-Linked Bonds Distribution Shares # KBC Bonds Inflation-Linked Bonds Institutional B Shares raw_name = "KBC Bonds Inflation-Linked Bonds Institutional B Shares" raw_share_name = "Institutional B Shares" output_folder = r"/data/emea_ar/output/mapping_data/docs/by_text/" data_mapping = DataMapping( doc_id, datapoints=None, raw_document_data_list=None, document_mapping_info_df=None, output_data_folder=output_folder, ) process_cache = {} mapping_info = data_mapping.matching_with_database( raw_name=raw_name, raw_share_name=raw_share_name, parent_id="FSGBR051XK", matching_type="share", process_cache=process_cache, ) print(mapping_info) def test_translate_pdf(): from core.data_translate import Translate_PDF pdf_file = r"/data/emea_ar/pdf/451063582.pdf" output_folder = r"/data/translate/output/" translate_pdf = Translate_PDF(pdf_file, output_folder) translate_pdf.start_job() def test_replace_abbrevation(): from utils.biz_utils import replace_abbrevation text_list = [ "M&G European Credit Investment Fund A CHFH Acc", "M&G European Credit Investment Fund A CHFHInc", "M&G European Credit Investment Fund A USDHAcc", "M&G European High Yield Credit Investment Fund E GBPHedgedAcc", "M&G Sustainable European Credit Investment Fd Cl L GBPH Acc", "M&G Sustainable Total Return Credit Investment Fd AI HGBPInc", "M&G Total Return Credit Investment Fund Class WI GBPHedgedInc", "M&G Total Return Credit Investment Fund Class W GBP HedgedInc", "M&G Total Return Credit Investment Fund Class P CHF H Acc", "M&G Total Return Credit Investment Fund P EUR Inc", ] for text in text_list: result = replace_abbrevation(text) logger.info(f"Original text: {text}, replaced text: {result}") def test_calculate_metrics(): from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx" mapping_file = r"/data/emea_ar/basic_information/English/sample_doc/emea_doc_with_all_4_dp/doc_ar_data_with_all_4_dp.xlsx" data_df = pd.read_excel(data_file, sheet_name="data_in_doc_mapping") data_df = data_df[data_df["check"].isin([0, 1])] data_df.fillna("", inplace=True) data_df.reset_index(drop=True, inplace=True) mapping_df = pd.read_excel(mapping_file, sheet_name="doc_ar_data_in_db") mapping_fund_id = mapping_df["FundId"].unique().tolist() mapping_share_id = mapping_df["FundClassId"].unique().tolist() mapping_id_list = mapping_fund_id + mapping_share_id # filter data_df whether investment_id in mapping_id_list filter_data_df = data_df[ (data_df["investment_id"].isin(mapping_id_list)) | (data_df["investment_id"] == "") ] # Investment mapping data mapping_metrics = get_sub_metrics(filter_data_df, "investment_mapping") logger.info(f"Investment mapping metrics: {mapping_metrics}") # tor data tor_data_df = filter_data_df[filter_data_df["datapoint"] == "tor"] tor_metrics = get_sub_metrics(tor_data_df, "tor") logger.info(f"TOR metrics: {tor_metrics}") # ter data ter_data_df = filter_data_df[filter_data_df["datapoint"] == "ter"] ter_metrics = get_sub_metrics(ter_data_df, "ter") logger.info(f"TER metrics: {ter_metrics}") # ogc data ogc_data_df = filter_data_df[filter_data_df["datapoint"] == "ogc"] ogc_metrics = get_sub_metrics(ogc_data_df, "ogc") logger.info(f"OGC metrics: {ogc_metrics}") # performance_fee data performance_fee_data_df = filter_data_df[ filter_data_df["datapoint"] == "performance_fee" ] performance_fee_metrics = get_sub_metrics( performance_fee_data_df, "performance_fee" ) logger.info(f"Performance fee metrics: {performance_fee_metrics}") metrics_df = pd.DataFrame( [ mapping_metrics, tor_metrics, ter_metrics, ogc_metrics, performance_fee_metrics, ] ) metrics_df.reset_index(drop=True, inplace=True) output_folder = r"/data/emea_ar/ground_truth/data_extraction/verify/" output_metrics_file = os.path.join( output_folder, r"mapping_data_info_30_documents_all_4_datapoints_roughly_metrics.xlsx", ) with pd.ExcelWriter(output_metrics_file) as writer: metrics_df.to_excel(writer, index=False, sheet_name="metrics") def get_sub_metrics(data_df: pd.DataFrame, data_point: str) -> dict: from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score gt_list = [1] * len(data_df) pre_list = data_df["check"].tolist() # convert pre_list member to be integer pre_list = [int(pre) for pre in pre_list] for index, row in data_df.iterrows(): if row["check"] == 0 and len(row["investment_id"].strip()) > 0: pre_list.append(1) gt_list.append(0) # calculate metrics accuracy = accuracy_score(gt_list, pre_list) precision = precision_score(gt_list, pre_list) recall = recall_score(gt_list, pre_list) f1 = f1_score(gt_list, pre_list) support = len(data_df) metrics = { "DataPoint": data_point, "F1": f1, "Precision": precision, "Recall": recall, "Accuracy": accuracy, "Support": support, } return metrics def replace_rerun_data(new_data_file: str, original_data_file: str): data_in_doc_mapping_sheet = "data_in_doc_mapping" total_mapping_data_sheet = "total_mapping_data" extract_data_sheet = "extract_data" new_data_in_doc_mapping = pd.read_excel( new_data_file, sheet_name=data_in_doc_mapping_sheet ) new_total_mapping_data = pd.read_excel( new_data_file, sheet_name=total_mapping_data_sheet ) new_extract_data = pd.read_excel(new_data_file, sheet_name=extract_data_sheet) document_list = new_data_in_doc_mapping["doc_id"].unique().tolist() original_data_in_doc_mapping = pd.read_excel( original_data_file, sheet_name=data_in_doc_mapping_sheet ) original_total_mapping_data = pd.read_excel( original_data_file, sheet_name=total_mapping_data_sheet ) original_extract_data = pd.read_excel( original_data_file, sheet_name=extract_data_sheet ) # remove data in original data by document_list original_data_in_doc_mapping = original_data_in_doc_mapping[ ~original_data_in_doc_mapping["doc_id"].isin(document_list) ] original_total_mapping_data = original_total_mapping_data[ ~original_total_mapping_data["doc_id"].isin(document_list) ] original_extract_data = original_extract_data[ ~original_extract_data["doc_id"].isin(document_list) ] # merge new data to original data new_data_in_doc_mapping = pd.concat( [original_data_in_doc_mapping, new_data_in_doc_mapping] ) new_data_in_doc_mapping.reset_index(drop=True, inplace=True) new_total_mapping_data = pd.concat( [original_total_mapping_data, new_total_mapping_data] ) new_total_mapping_data.reset_index(drop=True, inplace=True) new_extract_data = pd.concat([original_extract_data, new_extract_data]) new_extract_data.reset_index(drop=True, inplace=True) with pd.ExcelWriter(original_data_file) as writer: new_data_in_doc_mapping.to_excel( writer, index=False, sheet_name=data_in_doc_mapping_sheet ) new_total_mapping_data.to_excel( writer, index=False, sheet_name=total_mapping_data_sheet ) new_extract_data.to_excel(writer, index=False, sheet_name=extract_data_sheet) def batch_run_documents( doc_source: str = "emea_ar", special_doc_id_list: list = None, pdf_folder: str = r"/data/emea_ar/pdf/", document_mapping_file: str = None, output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/", output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", re_run_extract_data: bool = True, re_run_mapping_data: bool = True, force_save_total_data: bool = False ): sample_document_list_folder = r"./sample_documents/" document_list_files = glob(sample_document_list_folder + "*.txt") page_filter_ground_truth_file = ( r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx" ) calculate_metrics = False extract_way = "text" # special_doc_id_list = [] if special_doc_id_list is None or len(special_doc_id_list) == 0: force_save_total_data = True file_base_name_candidates = [] for document_list_file in document_list_files: file_base_name = os.path.basename(document_list_file).replace(".txt", "") if ( file_base_name_candidates is not None and len(file_base_name_candidates) > 0 and file_base_name not in file_base_name_candidates ): continue with open(document_list_file, "r", encoding="utf-8") as f: doc_id_list = f.readlines() doc_id_list = [doc_id.strip() for doc_id in doc_id_list] batch_start_job( doc_source, pdf_folder, output_pdf_text_folder, page_filter_ground_truth_file, document_mapping_file, output_extract_data_child_folder, output_mapping_child_folder, output_extract_data_total_folder, output_mapping_total_folder, extract_way, drilldown_folder, doc_id_list, re_run_extract_data, re_run_mapping_data, force_save_total_data=force_save_total_data, calculate_metrics=calculate_metrics, total_data_prefix=file_base_name, ) else: batch_start_job( doc_source, pdf_folder, output_pdf_text_folder, page_filter_ground_truth_file, document_mapping_file, output_extract_data_child_folder, output_mapping_child_folder, output_extract_data_total_folder, output_mapping_total_folder, extract_way, drilldown_folder, special_doc_id_list, re_run_extract_data, re_run_mapping_data, force_save_total_data=force_save_total_data, calculate_metrics=calculate_metrics, ) def batch_initial_document( sample_document_list_folder: str = r"./sample_documents/", document_list_file: str = "sample_document_complex.txt", doc_source: str = "emea_ar", pdf_folder: str = r"/data/emea_ar/pdf/", output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", ): document_list_file_path = os.path.join( sample_document_list_folder, document_list_file ) with open(document_list_file_path, "r", encoding="utf-8") as f: doc_id_list = f.readlines() doc_id_list = [doc_id.strip() for doc_id in doc_id_list] for doc_id in tqdm(doc_id_list): logger.info(f"Start to initial document: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id=doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_child_folder, output_mapping_data_folder=output_mapping_child_folder, ) def merge_output_data( data_file_path: str, document_mapping_file: str, output_data_file_path: str ): data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data") document_mapping_df = pd.read_excel(document_mapping_file, sheet_name="doc_date") # set doc_id to be string type data_df["doc_id"] = data_df["doc_id"].astype(str) document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str) """ doc_id page_index raw_name datapoint value raw_check comment investment_type investment_id investment_name similarity 553242368 344 Deutsche MSCI World Index Fund tor 61 33 FS0000AY1Y Xtrackers MSCI World Index Fund 0.75 553242368 344 db x-trackers EUR Liquid Corporate 12.5 UCITS ETF - Klasse 1C ter 0.35 1 F000018PY1 Xtrackers EUR Corporate Green Bond UCITS ETF 1C 0.462 """ doc_id_list = data_df["doc_id"].unique().tolist() data_point_dict = { "tor": "TurnoverRatio", "ter": "NetExpenseRatio", "ogc": "OngoingCharge", "performance_fee": "PerformanceFee", } total_data_list = [] for doc_id in tqdm(doc_id_list): doc_data_list = [] doc_data_df = data_df[data_df["doc_id"] == doc_id] doc_date = str( document_mapping_df[document_mapping_df["DocumentId"] == doc_id][ "EffectiveDate" ].values[0] )[0:10] exist_raw_name_list = [] for index, row in doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] investment_type = row["investment_type"] investment_id = row["investment_id"] investment_name = row["investment_name"] exist = False for exist_raw_name_info in exist_raw_name_list: exist_raw_name = exist_raw_name_info["raw_name"] exist_investment_type = exist_raw_name_info["investment_type"] if ( exist_raw_name == raw_name and exist_investment_type == investment_type ): exist = True break if not exist: data = { "DocumentId": doc_id, "investment_type": investment_type, "investment_id": investment_id, "investment_name": investment_name, "EffectiveDate": doc_date, "page_index": [], "RawName": raw_name, "NetExpenseRatio": "", "OngoingCharge": "", "TurnoverRatio": "", "PerformanceFee": "", } exist_raw_name_list.append( {"raw_name": raw_name, "investment_type": investment_type} ) doc_data_list.append(data) # find data from total_data_list by raw_name for data in doc_data_list: if ( data["RawName"] == raw_name and data["investment_type"] == investment_type ): update_key = data_point_dict[datapoint] data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) break total_data_list.extend(doc_data_list) total_data_df = pd.DataFrame(total_data_list) total_data_df.fillna("", inplace=True) with pd.ExcelWriter(output_data_file_path) as writer: total_data_df.to_excel(writer, index=False, sheet_name="total_data") def merge_output_data_aus_prospectus( data_file_path: str, document_mapping_file: str, output_data_file_path: str ): # TODO: merge output data for aus prospectus, plan to realize it on 2025-01-16 data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data") data_df.fillna("", inplace=True) document_mapping_df = pd.read_excel( document_mapping_file, sheet_name="document_mapping" ) document_mapping_df.fillna("", inplace=True) # set doc_id to be string type data_df["doc_id"] = data_df["doc_id"].astype(str) document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str) doc_id_list = data_df["doc_id"].unique().tolist() datapoint_keyword_config_file = ( r"./configuration/aus_prospectus/datapoint_name.json" ) with open(datapoint_keyword_config_file, "r", encoding="utf-8") as f: datapoint_keyword_config = json.load(f) datapoint_name_list = list(datapoint_keyword_config.keys()) total_data_list = [] for doc_id in tqdm(doc_id_list): doc_data_list = [] doc_date = str( document_mapping_df[document_mapping_df["DocumentId"] == doc_id][ "EffectiveDate" ].values[0] )[0:10] share_doc_data_df = data_df[ (data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 1) ] exist_raw_name_list = [] for index, row in share_doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_fund_name = str(row["raw_fund_name"]) raw_share_name = str(row["raw_share_name"]) raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] investment_type = row["investment_type"] share_class_id = row["investment_id"] share_class_legal_name = row["investment_name"] fund_id = "" fund_legal_name = "" if share_class_id != "": record_row = document_mapping_df[ document_mapping_df["FundClassId"] == share_class_id ] if len(record_row) > 0: fund_id = record_row["FundId"].values[0] fund_legal_name = record_row["FundLegalName"].values[0] exist = False for exist_raw_name_info in exist_raw_name_list: exist_raw_name = exist_raw_name_info["raw_name"] exist_investment_type = exist_raw_name_info["investment_type"] exist_investment_id = exist_raw_name_info["investment_id"] if ( exist_raw_name == raw_name and exist_investment_type == investment_type ) or (len(exist_investment_id) > 0 and exist_investment_id == share_class_id): exist = True break if not exist: data = { "DocumentId": doc_id, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": raw_name, "fund_id": fund_id, "fund_name": fund_legal_name, "sec_id": share_class_id, "sec_name": share_class_legal_name, "EffectiveDate": doc_date, "page_index": [], "RawName": raw_name, } for datapoint_name in datapoint_name_list: data[datapoint_name] = "" exist_raw_name_list.append( {"raw_name": raw_name, "investment_type": investment_type, "investment_id": share_class_id} ) doc_data_list.append(data) # find data from total_data_list by raw_name for data in doc_data_list: if data["raw_name"] == raw_name: update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) break if len(share_class_id) > 0 and data["sec_id"] == share_class_id: update_key = datapoint if len(str(data[update_key])) == 0: data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) break fund_doc_data_df = data_df[ (data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 33) ] fund_doc_data_df.fillna("", inplace=True) for index, row in fund_doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_fund_name = str(row["raw_fund_name"]) raw_share_name = "" raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] fund_id = row["investment_id"] fund_legal_name = row["investment_name"] exist = False if fund_id != "": for data in doc_data_list: if (fund_id != "" and data["fund_id"] == fund_id) or ( data["raw_fund_name"] == raw_fund_name ): update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) exist = True else: for data in doc_data_list: if data["raw_name"] == raw_name: update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) exist = True if not exist: data = { "DocumentId": doc_id, "raw_fund_name": raw_fund_name, "raw_share_name": "", "raw_name": raw_name, "fund_id": fund_id, "fund_name": fund_legal_name, "sec_id": "", "sec_name": "", "EffectiveDate": doc_date, "page_index": [page_index], "RawName": raw_name, } for datapoint_name in datapoint_name_list: data[datapoint_name] = "" data[datapoint] = value doc_data_list.append(data) total_data_list.extend(doc_data_list) total_data_df = pd.DataFrame(total_data_list) total_data_df.fillna("", inplace=True) with pd.ExcelWriter(output_data_file_path) as writer: total_data_df.to_excel(writer, index=False, sheet_name="total_data") def get_aus_prospectus_document_category(): document_sample_file = ( r"./sample_documents/aus_prospectus_17_documents_sample.txt" ) with open(document_sample_file, "r", encoding="utf-8") as f: special_doc_id_list = [doc_id.strip() for doc_id in f.readlines()] pdf_folder: str = r"/data/aus_prospectus/pdf/" output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/" output_extract_data_child_folder: str = ( r"/data/aus_prospectus/output/extract_data/docs/" ) output_mapping_child_folder: str = ( r"/data/aus_prospectus/output/mapping_data/docs/" ) drilldown_folder = r"/data/aus_prospectus/output/drilldown/" doc_source = "aus_prospectus" extract_way = "text" document_category_dict = {} for doc_id in special_doc_id_list: emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_child_folder, output_mapping_data_folder=output_mapping_child_folder, extract_way=extract_way, drilldown_folder=drilldown_folder, compare_with_provider=False ) data_extraction = DataExtraction( doc_source=emea_ar_parsing.doc_source, doc_id=emea_ar_parsing.doc_id, pdf_file=emea_ar_parsing.pdf_file, output_data_folder=emea_ar_parsing.output_extract_data_folder, page_text_dict=emea_ar_parsing.page_text_dict, datapoint_page_info=emea_ar_parsing.datapoint_page_info, datapoints=emea_ar_parsing.datapoints, document_mapping_info_df=emea_ar_parsing.document_mapping_info_df, extract_way=extract_way ) logger.info(f"Document: {doc_id}, \ncategory: {data_extraction.document_category}, \nproduction: {data_extraction.document_production}") document_category_dict[doc_id] = {"category": data_extraction.document_category, "production": data_extraction.document_production} output_extract_document_category_folder: str = ( r"/data/aus_prospectus/output/document_category/" ) os.makedirs(output_extract_document_category_folder, exist_ok=True) document_sample_file_base_name = os.path.basename(document_sample_file).replace(".txt", "").replace("aus_prospectus_", "") output_file = os.path.join(output_extract_document_category_folder, f"{document_sample_file_base_name}_category_production.json") with open(output_file, "w", encoding="utf-8") as f: json.dump(document_category_dict, f, ensure_ascii=False, indent=4) logger.info(f"Document category and production: {document_category_dict}") def test_post_adjust_extract_data(): doc_id = "454036250" pdf_folder: str = r"/data/aus_prospectus/pdf/" output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/" output_extract_data_child_folder: str = ( r"/data/aus_prospectus/output/extract_data/docs/" ) output_mapping_child_folder: str = ( r"/data/aus_prospectus/output/mapping_data/docs/" ) drilldown_folder = r"/data/aus_prospectus/output/drilldown/" doc_source = "aus_prospectus" extract_way = "text" emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_child_folder, output_mapping_data_folder=output_mapping_child_folder, extract_way=extract_way, drilldown_folder=drilldown_folder, compare_with_provider=False ) data_extraction = DataExtraction(doc_source=emea_ar_parsing.doc_source, doc_id=emea_ar_parsing.doc_id, pdf_file=emea_ar_parsing.pdf_file, output_data_folder=emea_ar_parsing.output_extract_data_folder, page_text_dict=emea_ar_parsing.page_text_dict, datapoint_page_info=emea_ar_parsing.datapoint_page_info, datapoints=emea_ar_parsing.datapoints, document_mapping_info_df=emea_ar_parsing.document_mapping_info_df, extract_way=extract_way) data_folder = r"/data/aus_prospectus/output/extract_data/docs/by_text/json/" data_file = f"{doc_id}.json" data_file_path = os.path.join(data_folder, data_file) with open(data_file_path, "r", encoding="utf-8") as f: data_list = json.load(f) # data_list = data_extraction.remove_duplicate_data(data_list) data_list = data_extraction.post_adjust_for_value_with_production_name(data_list) if __name__ == "__main__": # test_post_adjust_extract_data() # get_aus_prospectus_document_category() # test_data_extraction_metrics() # data_file_path = r"/data/aus_prospectus/output/mapping_data/total/mapping_data_info_1_documents_by_text_20250226155259.xlsx" # document_mapping_file_path = r"/data/aus_prospectus/basic_information/biz_rule/phase1_document_mapping.xlsx" # merged_total_data_folder = r'/data/aus_prospectus/output/mapping_data/total/merged/' # os.makedirs(merged_total_data_folder, exist_ok=True) # data_file_base_name = os.path.basename(data_file_path) # output_data_file_path = os.path.join(merged_total_data_folder, "merged_" + data_file_base_name) # merge_output_data_aus_prospectus(data_file_path, document_mapping_file_path, output_data_file_path) doc_source = "aus_prospectus" sample_document_list_folder: str = r'./sample_documents/' document_list_file: str = "aus_prospectus_29_documents_sample.txt" pdf_folder: str = r"/data/aus_prospectus/pdf/" output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/" output_extract_data_child_folder: str = r"/data/aus_prospectus/output/extract_data/docs/" output_mapping_child_folder: str = r"/data/aus_prospectus/output/mapping_data/docs/" # batch_initial_document(sample_document_list_folder=sample_document_list_folder, # document_list_file=document_list_file, # doc_source=doc_source, # pdf_folder=pdf_folder, # output_pdf_text_folder=output_pdf_text_folder, # output_extract_data_child_folder=output_extract_data_child_folder, # output_mapping_child_folder=output_mapping_child_folder) # get_aus_prospectus_document_category() # special_doc_id_list = ["553242411"] re_run_extract_data = True re_run_mapping_data = True force_save_total_data = False doc_source = "aus_prospectus" # doc_source = "emea_ar" if doc_source == "aus_prospectus": # document_sample_file = ( # r"./sample_documents/aus_prospectus_100_documents_multi_fund_sample.txt" # ) # document_sample_file = ( # r"./sample_documents/aus_prospectus_17_documents_sample.txt" # ) document_sample_file = ( r"./sample_documents/aus_prospectus_46_documents_sample.txt" ) with open(document_sample_file, "r", encoding="utf-8") as f: special_doc_id_list = [doc_id.strip() for doc_id in f.readlines()] # document_mapping_file = r"/data/aus_prospectus/basic_information/from_2024_documents/aus_100_document_prospectus_multi_fund.xlsx" # document_mapping_file = r"/data/aus_prospectus/basic_information/biz_rule/phase1_document_mapping.xlsx" # document_mapping_file = r"/data/aus_prospectus/basic_information/17_documents/aus_prospectus_17_documents_mapping.xlsx" document_mapping_file = r"/data/aus_prospectus/basic_information/46_documents/aus_prospectus_46_documents_mapping.xlsx" # special_doc_id_list: list = ["539261734"] # special_doc_id_list: list = ["401212184"] pdf_folder: str = r"/data/aus_prospectus/pdf/" output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/" output_extract_data_child_folder: str = ( r"/data/aus_prospectus/output/extract_data/docs/" ) output_extract_data_total_folder: str = ( r"/data/aus_prospectus/output/extract_data/total/" ) output_mapping_child_folder: str = ( r"/data/aus_prospectus/output/mapping_data/docs/" ) output_mapping_total_folder: str = ( r"/data/aus_prospectus/output/mapping_data/total/" ) drilldown_folder = r"/data/aus_prospectus/output/drilldown/" batch_run_documents( doc_source=doc_source, special_doc_id_list=special_doc_id_list, pdf_folder=pdf_folder, document_mapping_file=document_mapping_file, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_child_folder=output_extract_data_child_folder, output_extract_data_total_folder=output_extract_data_total_folder, output_mapping_child_folder=output_mapping_child_folder, output_mapping_total_folder=output_mapping_total_folder, drilldown_folder=drilldown_folder, re_run_extract_data=re_run_extract_data, re_run_mapping_data=re_run_mapping_data, force_save_total_data=force_save_total_data ) elif doc_source == "emea_ar": special_doc_id_list = ["321733631"] batch_run_documents( doc_source=doc_source, special_doc_id_list=special_doc_id_list, re_run_extract_data=re_run_extract_data, re_run_mapping_data=re_run_mapping_data, force_save_total_data=force_save_total_data ) # new_data_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_15_documents_by_text_20241121154243.xlsx" # original_data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx" # replace_rerun_data(new_data_file, original_data_file) # test_calculate_metrics() # test_replace_abbrevation() # test_translate_pdf() # test_mapping_raw_name() # test_data_extraction_metrics() # batch_filter_pdf_files( # pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list # ) # data_type = "page_filter" # prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx" # missing_error_list, metrics_list, metrics_file = get_metrics( # data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder # ) # test_auto_generate_instructions() # batch_extract_data( # pdf_folder, # page_filter_ground_truth_file, # output_extract_data_child_folder, # output_extract_data_total_folder, # special_doc_id_list, # re_run, # ) # doc_id = "476492237" # extract_way = "image" # extract_data(doc_id, # pdf_folder, # output_extract_data_child_folder, # extract_way, # re_run_extract_data)