import os import json import pandas as pd from glob import glob from tqdm import tqdm import time from utils.logger import logger from utils.pdf_download import download_pdf_from_documents_warehouse from utils.sql_query_util import query_document_fund_mapping from core.page_filter import FilterPages from core.data_extraction import DataExtraction from core.data_mapping import DataMapping from core.metrics import Metrics class EMEA_AR_Parsing: def __init__( self, doc_id: str, pdf_folder: str = r"/data/emea_ar/pdf/", output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/", extract_way: str = "text", ) -> None: self.doc_id = doc_id self.pdf_folder = pdf_folder os.makedirs(self.pdf_folder, exist_ok=True) self.pdf_file = self.download_pdf() self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) if extract_way is None or len(extract_way) == 0: extract_way = "text" self.extract_way = extract_way self.output_extract_image_folder = None if self.extract_way == "image": self.output_extract_image_folder = ( r"/data/emea_ar/output/extract_data/images/" ) os.makedirs(self.output_extract_image_folder, exist_ok=True) if output_extract_data_folder is None or len(output_extract_data_folder) == 0: output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/" if not output_extract_data_folder.endswith("/"): output_extract_data_folder = f"{output_extract_data_folder}/" if extract_way is not None and len(extract_way) > 0: output_extract_data_folder = ( f"{output_extract_data_folder}by_{extract_way}/" ) self.output_extract_data_folder = output_extract_data_folder os.makedirs(self.output_extract_data_folder, exist_ok=True) if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0: output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/" if not output_mapping_data_folder.endswith("/"): output_mapping_data_folder = f"{output_mapping_data_folder}/" if extract_way is not None and len(extract_way) > 0: output_mapping_data_folder = ( f"{output_mapping_data_folder}by_{extract_way}/" ) self.output_mapping_data_folder = output_mapping_data_folder os.makedirs(self.output_mapping_data_folder, exist_ok=True) self.filter_pages = FilterPages( self.doc_id, self.pdf_file, self.document_mapping_info_df ) self.page_text_dict = self.filter_pages.page_text_dict self.datapoint_page_info, self.result_details = self.get_datapoint_page_info() self.datapoints = self.get_datapoints_from_datapoint_page_info() def download_pdf(self) -> str: pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id) return pdf_file def get_datapoint_page_info(self) -> tuple: datapoint_page_info, result_details = self.filter_pages.start_job() return datapoint_page_info, result_details def get_datapoints_from_datapoint_page_info(self) -> list: datapoints = list(self.datapoint_page_info.keys()) if "doc_id" in datapoints: datapoints.remove("doc_id") return datapoints def extract_data( self, re_run: bool = False, ) -> list: if not re_run: output_data_json_folder = os.path.join( self.output_extract_data_folder, "json/" ) os.makedirs(output_data_json_folder, exist_ok=True) json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") if os.path.exists(json_file): logger.info( f"The document: {self.doc_id} has been parsed, loading data from {json_file}" ) with open(json_file, "r", encoding="utf-8") as f: data_from_gpt = json.load(f) return data_from_gpt data_extraction = DataExtraction( self.doc_id, self.pdf_file, self.output_extract_data_folder, self.page_text_dict, self.datapoint_page_info, self.datapoints, self.document_mapping_info_df, extract_way=self.extract_way, output_image_folder=self.output_extract_image_folder, ) data_from_gpt = data_extraction.extract_data() return data_from_gpt def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list: if not re_run: output_data_json_folder = os.path.join( self.output_mapping_data_folder, "json/" ) os.makedirs(output_data_json_folder, exist_ok=True) json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") if os.path.exists(json_file): logger.info( f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}" ) with open(json_file, "r", encoding="utf-8") as f: doc_mapping_data = json.load(f) return doc_mapping_data """ doc_id, datapoints: list, raw_document_data_list: list, document_mapping_info_df: pd.DataFrame, output_data_folder: str, """ data_mapping = DataMapping( self.doc_id, self.datapoints, data_from_gpt, self.document_mapping_info_df, self.output_mapping_data_folder, ) return data_mapping.mapping_raw_data() def filter_pages(doc_id: str, pdf_folder: str) -> None: logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing(doc_id, pdf_folder) datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info() return datapoint_page_info, result_details def extract_data( doc_id: str, pdf_folder: str, output_data_folder: str, extract_way: str = "text", re_run: bool = False, ) -> None: logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, pdf_folder, output_extract_data_folder=output_data_folder, extract_way=extract_way, ) data_from_gpt = emea_ar_parsing.extract_data(re_run) return data_from_gpt def mapping_data( doc_id: str, pdf_folder: str, output_extract_data_folder: str, output_mapping_folder: str, extract_way: str = "text", re_run_extract_data: bool = False, re_run_mapping_data: bool = False, ) -> None: logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, pdf_folder, output_extract_data_folder=output_extract_data_folder, output_mapping_data_folder=output_mapping_folder, extract_way=extract_way, ) doc_data_from_gpt = emea_ar_parsing.extract_data(re_run=re_run_extract_data) doc_mapping_data = emea_ar_parsing.mapping_data( data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data ) return doc_data_from_gpt, doc_mapping_data def batch_extract_data( pdf_folder: str, doc_data_excel_file: str = None, output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_total_folder: str = r"/data/emea_ar/output/extract_data/total/", extract_way: str = "text", special_doc_id_list: list = None, re_run: bool = False, ) -> None: pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] if doc_list is not None and doc_id not in doc_list: continue data_from_gpt = extract_data( doc_id=doc_id, pdf_folder=pdf_folder, output_data_folder=output_child_folder, extract_way=extract_way, re_run=re_run, ) result_list.extend(data_from_gpt) if special_doc_id_list is None or len(special_doc_id_list) == 0: result_df = pd.DataFrame(result_list) result_df.reset_index(drop=True, inplace=True) logger.info(f"Saving the result to {output_total_folder}") os.makedirs(output_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_total_folder, f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_df.to_excel(writer, index=False, sheet_name="extract_data_info") def batch_start_job( pdf_folder: str, doc_data_excel_file: str = None, output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/", output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/", extract_way: str = "text", special_doc_id_list: list = None, re_run_extract_data: bool = False, re_run_mapping_data: bool = False, force_save_total_data: bool = False, ): pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_extract_data_list = [] result_mapping_data_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] if doc_list is not None and doc_id not in doc_list: continue doc_data_from_gpt, doc_mapping_data_list = mapping_data( doc_id=doc_id, pdf_folder=pdf_folder, output_extract_data_folder=output_extract_data_child_folder, output_mapping_folder=output_mapping_child_folder, extract_way=extract_way, re_run_extract_data=re_run_extract_data, re_run_mapping_data=re_run_mapping_data, ) result_extract_data_list.extend(doc_data_from_gpt) result_mapping_data_list.extend(doc_mapping_data_list) if force_save_total_data or ( special_doc_id_list is None or len(special_doc_id_list) == 0 ): result_extract_data_df = pd.DataFrame(result_extract_data_list) result_extract_data_df.reset_index(drop=True, inplace=True) result_mappingdata_df = pd.DataFrame(result_mapping_data_list) result_mappingdata_df.reset_index(drop=True, inplace=True) logger.info(f"Saving extract data to {output_extract_data_total_folder}") unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist() os.makedirs(output_extract_data_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_extract_data_total_folder, f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_extract_data_df.to_excel( writer, index=False, sheet_name="extract_data_info" ) logger.info(f"Saving mapping data to {output_mapping_total_folder}") unique_doc_ids = result_mappingdata_df["doc_id"].unique().tolist() os.makedirs(output_mapping_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_mapping_total_folder, f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_mappingdata_df.to_excel( writer, index=False, sheet_name="mapping_data" ) result_extract_data_df.to_excel( writer, index=False, sheet_name="extract_data" ) prediction_sheet_name = "mapping_data" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" ground_truth_sheet_name = "mapping_data" metrics_output_folder = r"/data/emea_ar/output/metrics/" logger.info(f"Calculating metrics for data extraction") missing_error_list, metrics_list, metrics_file = get_metrics( "data_extraction", output_file, prediction_sheet_name, ground_truth_file, ground_truth_sheet_name, metrics_output_folder, ) logger.info(f"Calculating metrics for investment mapping") missing_error_list, metrics_list, metrics_file = get_metrics( "investment_mapping", output_file, prediction_sheet_name, ground_truth_file, ground_truth_sheet_name, metrics_output_folder, ) def batch_filter_pdf_files( pdf_folder: str, doc_data_excel_file: str = None, output_folder: str = r"/data/emea_ar/output/filter_pages/", special_doc_id_list: list = None, ) -> None: pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_list = [] result_details = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] if doc_list is not None and doc_id not in doc_list: continue doc_datapoint_page_info, doc_result_details = filter_pages( doc_id=doc_id, pdf_folder=pdf_folder ) result_list.append(doc_datapoint_page_info) result_details.extend(doc_result_details) result_df = pd.DataFrame(result_list) result_df.reset_index(drop=True, inplace=True) result_details_df = pd.DataFrame(result_details) result_details_df.reset_index(drop=True, inplace=True) logger.info(f"Saving the result to {output_folder}") os.makedirs(output_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_folder, f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_df.to_excel(writer, index=False, sheet_name="dp_page_info") result_details_df.to_excel( writer, index=False, sheet_name="dp_page_info_details" ) if len(special_doc_id_list) == 0: logger.info(f"Calculating metrics for {output_file}") metrics_output_folder = r"/data/emea_ar/output/metrics/" missing_error_list, metrics_list, metrics_file = get_metrics( data_type="page_filter", prediction_file=output_file, prediction_sheet_name="dp_page_info", ground_truth_file=doc_data_excel_file, output_folder=metrics_output_folder, ) return missing_error_list, metrics_list, metrics_file def get_metrics( data_type: str, prediction_file: str, prediction_sheet_name: str, ground_truth_file: str, ground_truth_sheet_name: str = None, output_folder: str = None, ) -> None: metrics = Metrics( data_type=data_type, prediction_file=prediction_file, prediction_sheet_name=prediction_sheet_name, ground_truth_file=ground_truth_file, ground_truth_sheet_name=ground_truth_sheet_name, output_folder=output_folder, ) missing_error_list, metrics_list, metrics_file = metrics.get_metrics() return missing_error_list, metrics_list, metrics_file def test_auto_generate_instructions(): """ doc_id: str, pdf_file: str, page_text_dict: dict, datapoint_page_info: dict, document_mapping_info_df: pd.DataFrame """ doc_id = "402397014" pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf" document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df) page_text_dict = filter_pages.page_text_dict datapoint_page_info, datapoint_page_info_details = filter_pages.start_job() datapoint_list = list(datapoint_page_info.keys()) datapoint_list.remove("doc_id") data_extraction = DataExtraction( doc_id, pdf_file, page_text_dict, datapoint_page_info, document_mapping_info_df ) page_index_list = list(page_text_dict.keys()) if len(page_index_list) > 0: page_text = "" for datapoint in datapoint_list: if len(datapoint_page_info[datapoint]) > 0: page_index_list = datapoint_page_info[datapoint] page_text = page_text_dict[page_index_list[0]] break output_folder = ( r"/data/emea_ar/basic_information/prompts_example/generate_by_config/" ) os.makedirs(output_folder, exist_ok=True) tor_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["tor"] ) with open( os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(tor_instructions_text) ter_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ter"] ) with open( os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(ter_instructions_text) ogc_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ogc"] ) with open( os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(ogc_instructions_text) performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["performance_fee"] ) ) with open( os.path.join(output_folder, "performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(performance_fee_instructions_text) ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ter", "ogc"] ) with open( os.path.join(output_folder, "ter_ogc_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ter_ogc_instructions_text) ter_performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["ter", "performance_fee"] ) ) with open( os.path.join(output_folder, "ter_performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ter_performance_fee_instructions_text) ogc_ter_performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["ogc", "ter", "performance_fee"] ) ) with open( os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ogc_ter_performance_fee_instructions_text) def test_data_extraction_metrics(): data_type = "data_extraction" # prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx" prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_text_20240922152517.xlsx" # prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx" prediction_sheet_name = "mapping_data" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" ground_truth_sheet_name = "mapping_data" metrics_output_folder = r"/data/emea_ar/output/metrics/" missing_error_list, metrics_list, metrics_file = get_metrics( data_type, prediction_file, prediction_sheet_name, ground_truth_file, ground_truth_sheet_name, metrics_output_folder, ) def test_mapping_raw_name(): doc_id = "391456740" raw_name = "Robeco Multi Asset Sustainable D EUR" output_folder = r"/data/emea_ar/output/mapping_data/docs/by_text/" data_mapping = DataMapping( doc_id, datapoints=None, raw_document_data_list=None, document_mapping_info_df=None, output_data_folder=output_folder, ) mapping_info = data_mapping.matching_with_database( raw_name=raw_name, parent_id=None, matching_type="share" ) print(mapping_info) if __name__ == "__main__": pdf_folder = r"/data/emea_ar/small_pdf/" page_filter_ground_truth_file = ( r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx" ) prediction_output_folder = r"/data/emea_ar/output/filter_pages/" metrics_output_folder = r"/data/emea_ar/output/metrics/" special_doc_id_list = [] # batch_filter_pdf_files( # pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list # ) # data_type = "page_filter" # prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx" # missing_error_list, metrics_list, metrics_file = get_metrics( # data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder # ) # test_auto_generate_instructions() output_extract_data_child_folder = r"/data/emea_ar/output/extract_data/docs/" output_extract_data_total_folder = r"/data/emea_ar/output/extract_data/total/" # batch_extract_data( # pdf_folder, # page_filter_ground_truth_file, # output_extract_data_child_folder, # output_extract_data_total_folder, # special_doc_id_list, # re_run, # ) # doc_id = "476492237" # extract_way = "image" # extract_data(doc_id, # pdf_folder, # output_extract_data_child_folder, # extract_way, # re_run_extract_data) # special_doc_id_list = ["505174428", "510326848", "349679479"] check_mapping_doc_id_list = [ "327956364", "391456740", "391736837", "458359181", "486383912", "497497599", "529925114", "321733631", "334718372", "344636875", "362246081", "445256897", "449623976", "458291624", "478585901", "492121213", "502821436", "507967525", "481475385", "508854243", "520879048", "402181770", "463081566", "502693599", "509845549", "389171486", "323390570", "366179419", "486378555", "506559375", "479793787", "333207452" ] special_doc_id_list = check_mapping_doc_id_list # special_doc_id_list = ["333207452"] output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/" output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/" re_run_extract_data = False re_run_mapping_data = True force_save_total_data = True extract_ways = ["text"] for extract_way in extract_ways: batch_start_job( pdf_folder, page_filter_ground_truth_file, output_extract_data_child_folder, output_mapping_child_folder, output_extract_data_total_folder, output_mapping_total_folder, extract_way, special_doc_id_list, re_run_extract_data, re_run_mapping_data, force_save_total_data=force_save_total_data, ) # test_data_extraction_metrics() # test_mapping_raw_name()