import os import json import pandas as pd from glob import glob from tqdm import tqdm import time import fitz import re from io import BytesIO from traceback import print_exc from utils.logger import logger from utils.pdf_download import download_pdf_from_documents_warehouse from utils.sql_query_util import query_document_fund_mapping from utils.pdf_util import PDFUtil from utils.biz_utils import add_slash_to_text_as_regex from core.page_filter import FilterPages from core.data_extraction import DataExtraction from core.data_mapping import DataMapping from core.auz_nz.hybrid_solution_script import api_for_fund_matching_call from core.metrics import Metrics class EMEA_AR_Parsing: def __init__( self, doc_id: str, doc_source: str = "emea_ar", pdf_folder: str = r"/data/emea_ar/pdf/", output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/", extract_way: str = "text", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", ) -> None: self.doc_id = doc_id self.doc_source = doc_source self.pdf_folder = pdf_folder os.makedirs(self.pdf_folder, exist_ok=True) self.pdf_file = self.download_pdf() self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) if extract_way is None or len(extract_way) == 0: extract_way = "text" self.extract_way = extract_way self.output_extract_image_folder = None if self.extract_way == "image": self.output_extract_image_folder = ( r"/data/emea_ar/output/extract_data/images/" ) os.makedirs(self.output_extract_image_folder, exist_ok=True) if output_extract_data_folder is None or len(output_extract_data_folder) == 0: output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/" if not output_extract_data_folder.endswith("/"): output_extract_data_folder = f"{output_extract_data_folder}/" if extract_way is not None and len(extract_way) > 0: output_extract_data_folder = ( f"{output_extract_data_folder}by_{extract_way}/" ) self.output_extract_data_folder = output_extract_data_folder os.makedirs(self.output_extract_data_folder, exist_ok=True) if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0: output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/" if not output_mapping_data_folder.endswith("/"): output_mapping_data_folder = f"{output_mapping_data_folder}/" if extract_way is not None and len(extract_way) > 0: output_mapping_data_folder = ( f"{output_mapping_data_folder}by_{extract_way}/" ) self.output_mapping_data_folder = output_mapping_data_folder os.makedirs(self.output_mapping_data_folder, exist_ok=True) self.filter_pages = FilterPages( self.doc_id, self.pdf_file, self.document_mapping_info_df, self.doc_source, output_pdf_text_folder ) self.page_text_dict = self.filter_pages.page_text_dict self.datapoint_page_info, self.result_details = self.get_datapoint_page_info() self.datapoints = self.get_datapoints_from_datapoint_page_info() if drilldown_folder is None or len(drilldown_folder) == 0: drilldown_folder = r"/data/emea_ar/output/drilldown/" os.makedirs(drilldown_folder, exist_ok=True) self.drilldown_folder = drilldown_folder misc_config_file = os.path.join(f"./configuration/{doc_source}/", "misc_config.json") if os.path.exists(misc_config_file): with open(misc_config_file, "r", encoding="utf-8") as f: misc_config = json.load(f) self.apply_drilldown = misc_config.get("apply_drilldown", False) else: self.apply_drilldown = False def download_pdf(self) -> str: pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id) return pdf_file def get_datapoint_page_info(self) -> tuple: datapoint_page_info, result_details = self.filter_pages.start_job() return datapoint_page_info, result_details def get_datapoints_from_datapoint_page_info(self) -> list: datapoints = list(self.datapoint_page_info.keys()) if "doc_id" in datapoints: datapoints.remove("doc_id") return datapoints def extract_data( self, re_run: bool = False, ) -> list: found_data = False if not re_run: output_data_json_folder = os.path.join( self.output_extract_data_folder, "json/" ) os.makedirs(output_data_json_folder, exist_ok=True) json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") if os.path.exists(json_file): logger.info( f"The document: {self.doc_id} has been parsed, loading data from {json_file}" ) with open(json_file, "r", encoding="utf-8") as f: data_from_gpt = json.load(f) found_data = True if not found_data: try: data_extraction = DataExtraction( self.doc_source, self.doc_id, self.pdf_file, self.output_extract_data_folder, self.page_text_dict, self.datapoint_page_info, self.datapoints, self.document_mapping_info_df, extract_way=self.extract_way, output_image_folder=self.output_extract_image_folder, ) data_from_gpt = data_extraction.extract_data() except Exception as e: logger.error(f"Error: {e}") data_from_gpt = {"data": []} # Drilldown data to relevant PDF document annotation_list = [] if self.apply_drilldown: try: annotation_list = self.drilldown_pdf_document(data_from_gpt) except Exception as e: logger.error(f"Error: {e}") return data_from_gpt, annotation_list def drilldown_pdf_document(self, data_from_gpt: list) -> list: logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}") pdf_util = PDFUtil(self.pdf_file) drilldown_data_list = [] for data in data_from_gpt: doc_id = str(data.get("doc_id", "")) page_index = data.get("page_index", -1) if page_index == -1: continue extract_data_list = data.get("extract_data", {}).get("data", []) dp_reported_name_dict = data.get("extract_data", {}).get( "dp_reported_name", {} ) highlighted_value_list = [] for extract_data in extract_data_list: for data_point, value in extract_data.items(): if value in highlighted_value_list: continue if data_point in ["ter", "ogc", "performance_fee"]: continue drilldown_data = { "doc_id": doc_id, "page_index": page_index, "data_point": data_point, "parent_text_block": None, "value": value, "annotation_attribute": {}, } drilldown_data_list.append(drilldown_data) highlighted_value_list.append(value) for data_point, reported_name in dp_reported_name_dict.items(): if reported_name in highlighted_value_list: continue data_point = f"{data_point}_reported_name" drilldown_data = { "doc_id": doc_id, "page_index": page_index, "data_point": data_point, "parent_text_block": None, "value": reported_name, "annotation_attribute": {}, } drilldown_data_list.append(drilldown_data) highlighted_value_list.append(reported_name) drilldown_result = pdf_util.batch_drilldown( drilldown_data_list=drilldown_data_list, output_pdf_folder=self.drilldown_folder, ) annotation_list = [] if len(drilldown_result) > 0: logger.info(f"Drilldown PDF document for doc_id: {doc_id} successfully") annotation_list = drilldown_result.get("annotation_list", []) for annotation in annotation_list: annotation["doc_id"] = doc_id if self.drilldown_folder is not None and len(self.drilldown_folder) > 0: drilldown_data_folder = os.path.join(self.drilldown_folder, "data/") os.makedirs(drilldown_data_folder, exist_ok=True) drilldown_file = os.path.join( drilldown_data_folder, f"{doc_id}_drilldown.xlsx" ) drilldown_source_df = pd.DataFrame(drilldown_data_list) annotation_list_df = pd.DataFrame(annotation_list) # set drilldown_result_df column order as doc_id, pdf_file, page_index, # data_point, value, matching_val_area, normalized_bbox try: annotation_list_df = annotation_list_df[ [ "doc_id", "pdf_file", "page_index", "data_point", "value", "matching_val_area", "normalized_bbox", ] ] except Exception as e: logger.error(f"Error: {e}") logger.info(f"Writing drilldown data to {drilldown_file}") try: with pd.ExcelWriter(drilldown_file) as writer: drilldown_source_df.to_excel( writer, index=False, sheet_name="source_data" ) annotation_list_df.to_excel( writer, index=False, sheet_name="drilldown_data" ) except Exception as e: logger.error(f"Error: {e}") annotation_list = annotation_list_df.to_dict(orient="records") return annotation_list def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list: if not re_run: output_data_json_folder = os.path.join( self.output_mapping_data_folder, "json/" ) os.makedirs(output_data_json_folder, exist_ok=True) json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") if os.path.exists(json_file): logger.info( f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}" ) with open(json_file, "r", encoding="utf-8") as f: doc_mapping_data = json.load(f) return doc_mapping_data """ doc_id, datapoints: list, raw_document_data_list: list, document_mapping_info_df: pd.DataFrame, output_data_folder: str, """ data_mapping = DataMapping( self.doc_id, self.datapoints, data_from_gpt, self.document_mapping_info_df, self.output_mapping_data_folder, self.doc_source ) return data_mapping.mapping_raw_data_entrance() def filter_pages(doc_id: str, pdf_folder: str, doc_source: str) -> None: logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder ) datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info() return datapoint_page_info, result_details def extract_data( doc_id: str, doc_source: str, pdf_folder: str, output_data_folder: str, extract_way: str = "text", re_run: bool = False, ) -> None: logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_extract_data_folder=output_data_folder, extract_way=extract_way, ) data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run) return data_from_gpt, annotation_list def mapping_data( doc_id: str, pdf_folder: str, output_pdf_text_folder: str, output_extract_data_folder: str, output_mapping_folder: str, doc_source: str = "emea_ar", extract_way: str = "text", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", re_run_extract_data: bool = False, re_run_mapping_data: bool = False, ) -> None: logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_folder, output_mapping_data_folder=output_mapping_folder, extract_way=extract_way, drilldown_folder=drilldown_folder, ) doc_data_from_gpt, annotation_list = emea_ar_parsing.extract_data( re_run=re_run_extract_data ) doc_mapping_data = emea_ar_parsing.mapping_data( data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data ) return doc_data_from_gpt, annotation_list, doc_mapping_data def batch_extract_data( pdf_folder: str, doc_source: str = "emea_ar", doc_data_excel_file: str = None, output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_total_folder: str = r"/data/emea_ar/output/extract_data/total/", extract_way: str = "text", special_doc_id_list: list = None, re_run: bool = False, ) -> None: pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] if doc_list is not None and doc_id not in doc_list: continue data_from_gpt = extract_data( doc_id=doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_data_folder=output_child_folder, extract_way=extract_way, re_run=re_run, ) result_list.extend(data_from_gpt) if special_doc_id_list is None or len(special_doc_id_list) == 0: result_df = pd.DataFrame(result_list) result_df.reset_index(drop=True, inplace=True) logger.info(f"Saving the result to {output_total_folder}") os.makedirs(output_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_total_folder, f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_df.to_excel(writer, index=False, sheet_name="extract_data_info") def batch_start_job( doc_source: str = "emea_ar", pdf_folder: str = "/data/emea_ar/pdf/", output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", doc_data_excel_file: str = None, document_mapping_file: str = None, output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/", output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/", extract_way: str = "text", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", special_doc_id_list: list = None, re_run_extract_data: bool = False, re_run_mapping_data: bool = False, force_save_total_data: bool = False, calculate_metrics: bool = False, total_data_prefix: str = None, ): pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] doc_list.append(doc_id) if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_extract_data_list = [] result_mapping_data_list = [] for doc_id in tqdm(doc_list): try: doc_data_from_gpt, annotation_list, doc_mapping_data_list = mapping_data( doc_id=doc_id, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_child_folder, output_mapping_folder=output_mapping_child_folder, doc_source=doc_source, extract_way=extract_way, drilldown_folder=drilldown_folder, re_run_extract_data=re_run_extract_data, re_run_mapping_data=re_run_mapping_data, ) result_extract_data_list.extend(doc_data_from_gpt) result_mapping_data_list.extend(doc_mapping_data_list) except Exception as e: logger.error(f"Document: {doc_id} met error: {e}") print_exc() if force_save_total_data or ( special_doc_id_list is None or len(special_doc_id_list) == 0 ): result_extract_data_df = pd.DataFrame(result_extract_data_list) result_extract_data_df.reset_index(drop=True, inplace=True) result_mappingdata_df = pd.DataFrame(result_mapping_data_list) result_mappingdata_df.reset_index(drop=True, inplace=True) logger.info(f"Saving extract data to {output_extract_data_total_folder}") unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist() os.makedirs(output_extract_data_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) file_name = f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx" if total_data_prefix is not None and len(total_data_prefix) > 0: file_name = f"{total_data_prefix}_{file_name}" output_file = os.path.join(output_extract_data_total_folder, file_name) with pd.ExcelWriter(output_file) as writer: result_extract_data_df.to_excel( writer, index=False, sheet_name="extract_data_info" ) logger.info(f"Saving mapping data to {output_mapping_total_folder}") unique_doc_ids = result_mappingdata_df["doc_id"].unique().tolist() os.makedirs(output_mapping_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) file_name = f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx" if total_data_prefix is not None and len(total_data_prefix) > 0: file_name = f"{total_data_prefix}_{file_name}" output_file = os.path.join(output_mapping_total_folder, file_name) doc_mapping_data_in_db = only_output_mapping_data_in_db(result_mappingdata_df) with pd.ExcelWriter(output_file) as writer: doc_mapping_data_in_db.to_excel( writer, index=False, sheet_name="data_in_doc_mapping" ) result_mappingdata_df.to_excel( writer, index=False, sheet_name="total_mapping_data" ) result_extract_data_df.to_excel( writer, index=False, sheet_name="extract_data" ) if document_mapping_file is not None and len(document_mapping_file) > 0 and os.path.exists(document_mapping_file): try: merged_total_data_folder = os.path.join(output_mapping_total_folder, "merged/") os.makedirs(merged_total_data_folder, exist_ok=True) data_file_base_name = os.path.basename(output_file) output_merged_data_file_path = os.path.join(merged_total_data_folder, "merged_" + data_file_base_name) merge_output_data_aus_prospectus(output_file, document_mapping_file, output_merged_data_file_path) except Exception as e: logger.error(f"Error: {e}") if calculate_metrics: prediction_sheet_name = "total_mapping_data" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" ground_truth_sheet_name = "mapping_data" metrics_output_folder = r"/data/emea_ar/output/metrics/" # logger.info(f"Calculating metrics for data extraction") # missing_error_list, metrics_list, metrics_file = get_metrics( # "data_extraction", # output_file, # prediction_sheet_name, # ground_truth_file, # ground_truth_sheet_name, # metrics_output_folder, # ) # logger.info(f"Calculating metrics for investment mapping by actual document mapping") # missing_error_list, metrics_list, metrics_file = get_metrics( # "investment_mapping", # output_file, # prediction_sheet_name, # ground_truth_file, # ground_truth_sheet_name, # metrics_output_folder, # ) logger.info( f"Calculating metrics for investment mapping by database document mapping" ) missing_error_list, metrics_list, metrics_file = get_metrics( "document_mapping_in_db", output_file, prediction_sheet_name, ground_truth_file, ground_truth_sheet_name, metrics_output_folder, ) def only_output_mapping_data_in_db(mapping_data: pd.DataFrame) -> None: doc_id_list = mapping_data["doc_id"].unique().tolist() data_in_mapping_df_list = [] for doc_id in doc_id_list: doc_mapping_data = mapping_data[mapping_data["doc_id"] == doc_id] document_mapping = query_document_fund_mapping(doc_id, rerun=False) fund_id_list = document_mapping["FundId"].unique().tolist() sec_id_list = document_mapping["SecId"].unique().tolist() id_list = fund_id_list + sec_id_list # filter doc_mapping_data by id_list or empty id filter_doc_mapping_data = doc_mapping_data[ (doc_mapping_data["investment_id"].isin(id_list)) | (doc_mapping_data["investment_id"] == "") ] data_in_mapping_df_list.append(filter_doc_mapping_data) result_mapping_data_df = pd.concat(data_in_mapping_df_list) result_mapping_data_df.reset_index(drop=True, inplace=True) return result_mapping_data_df def batch_filter_pdf_files( pdf_folder: str, doc_source: str = "emea_ar", doc_data_excel_file: str = None, output_folder: str = r"/data/emea_ar/output/filter_pages/", special_doc_id_list: list = None, ) -> None: pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] if special_doc_id_list is not None and len(special_doc_id_list) > 0: doc_list = special_doc_id_list if ( len(doc_list) == 0 and doc_data_excel_file is not None and len(doc_data_excel_file) > 0 and os.path.exists(doc_data_excel_file) ): doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] result_list = [] result_details = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) doc_id = pdf_base_name.split(".")[0] if doc_list is not None and doc_id not in doc_list: continue doc_datapoint_page_info, doc_result_details = filter_pages( doc_id=doc_id, pdf_folder=pdf_folder, doc_source=doc_source ) result_list.append(doc_datapoint_page_info) result_details.extend(doc_result_details) result_df = pd.DataFrame(result_list) result_df.reset_index(drop=True, inplace=True) result_details_df = pd.DataFrame(result_details) result_details_df.reset_index(drop=True, inplace=True) logger.info(f"Saving the result to {output_folder}") os.makedirs(output_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) output_file = os.path.join( output_folder, f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx", ) with pd.ExcelWriter(output_file) as writer: result_df.to_excel(writer, index=False, sheet_name="dp_page_info") result_details_df.to_excel( writer, index=False, sheet_name="dp_page_info_details" ) if len(special_doc_id_list) == 0: logger.info(f"Calculating metrics for {output_file}") metrics_output_folder = r"/data/emea_ar/output/metrics/" missing_error_list, metrics_list, metrics_file = get_metrics( data_type="page_filter", prediction_file=output_file, prediction_sheet_name="dp_page_info", ground_truth_file=doc_data_excel_file, output_folder=metrics_output_folder, ) return missing_error_list, metrics_list, metrics_file def get_metrics( data_type: str, prediction_file: str, prediction_sheet_name: str, ground_truth_file: str, ground_truth_sheet_name: str = None, output_folder: str = None, ) -> None: metrics = Metrics( data_type=data_type, prediction_file=prediction_file, prediction_sheet_name=prediction_sheet_name, ground_truth_file=ground_truth_file, ground_truth_sheet_name=ground_truth_sheet_name, output_folder=output_folder, ) missing_error_list, metrics_list, metrics_file = metrics.get_metrics( strict_model=False ) return missing_error_list, metrics_list, metrics_file def test_auto_generate_instructions(): """ doc_id: str, pdf_file: str, page_text_dict: dict, datapoint_page_info: dict, document_mapping_info_df: pd.DataFrame """ doc_id = "402397014" pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf" document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df) page_text_dict = filter_pages.page_text_dict datapoint_page_info, datapoint_page_info_details = filter_pages.start_job() datapoint_list = list(datapoint_page_info.keys()) datapoint_list.remove("doc_id") data_extraction = DataExtraction( "emear_ar", doc_id, pdf_file, page_text_dict, datapoint_page_info, document_mapping_info_df, ) page_index_list = list(page_text_dict.keys()) if len(page_index_list) > 0: page_text = "" for datapoint in datapoint_list: if len(datapoint_page_info[datapoint]) > 0: page_index_list = datapoint_page_info[datapoint] page_text = page_text_dict[page_index_list[0]] break output_folder = ( r"/data/emea_ar/basic_information/prompts_example/generate_by_config/" ) os.makedirs(output_folder, exist_ok=True) tor_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["tor"] ) with open( os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(tor_instructions_text) ter_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ter"] ) with open( os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(ter_instructions_text) ogc_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ogc"] ) with open( os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8" ) as f: f.write(ogc_instructions_text) performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["performance_fee"] ) ) with open( os.path.join(output_folder, "performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(performance_fee_instructions_text) ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints( page_text, ["ter", "ogc"] ) with open( os.path.join(output_folder, "ter_ogc_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ter_ogc_instructions_text) ter_performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["ter", "performance_fee"] ) ) with open( os.path.join(output_folder, "ter_performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ter_performance_fee_instructions_text) ogc_ter_performance_fee_instructions_text = ( data_extraction.get_instructions_by_datapoints( page_text, ["ogc", "ter", "performance_fee"] ) ) with open( os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"), "w", encoding="utf-8", ) as f: f.write(ogc_ter_performance_fee_instructions_text) def test_data_extraction_metrics(): data_type = "data_extraction" # prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx" prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_text_20240922152517.xlsx" # prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx" prediction_sheet_name = "mapping_data" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" ground_truth_sheet_name = "mapping_data" metrics_output_folder = r"/data/emea_ar/output/metrics/" missing_error_list, metrics_list, metrics_file = get_metrics( data_type, prediction_file, prediction_sheet_name, ground_truth_file, ground_truth_sheet_name, metrics_output_folder, ) def test_mapping_raw_name(): doc_id = "337293427" # KBC Bonds Inflation-Linked Bonds Distribution Shares # KBC Bonds Inflation-Linked Bonds Institutional B Shares raw_name = "KBC Bonds Inflation-Linked Bonds Institutional B Shares" raw_share_name = "Institutional B Shares" output_folder = r"/data/emea_ar/output/mapping_data/docs/by_text/" data_mapping = DataMapping( doc_id, datapoints=None, raw_document_data_list=None, document_mapping_info_df=None, output_data_folder=output_folder, ) process_cache = {} mapping_info = data_mapping.matching_with_database( raw_name=raw_name, raw_share_name=raw_share_name, parent_id="FSGBR051XK", matching_type="share", process_cache=process_cache, ) print(mapping_info) def test_translate_pdf(): from core.data_translate import Translate_PDF pdf_file = r"/data/emea_ar/pdf/451063582.pdf" output_folder = r"/data/translate/output/" translate_pdf = Translate_PDF(pdf_file, output_folder) translate_pdf.start_job() def test_replace_abbrevation(): from utils.biz_utils import replace_abbrevation text_list = [ "M&G European Credit Investment Fund A CHFH Acc", "M&G European Credit Investment Fund A CHFHInc", "M&G European Credit Investment Fund A USDHAcc", "M&G European High Yield Credit Investment Fund E GBPHedgedAcc", "M&G Sustainable European Credit Investment Fd Cl L GBPH Acc", "M&G Sustainable Total Return Credit Investment Fd AI HGBPInc", "M&G Total Return Credit Investment Fund Class WI GBPHedgedInc", "M&G Total Return Credit Investment Fund Class W GBP HedgedInc", "M&G Total Return Credit Investment Fund Class P CHF H Acc", "M&G Total Return Credit Investment Fund P EUR Inc", ] for text in text_list: result = replace_abbrevation(text) logger.info(f"Original text: {text}, replaced text: {result}") def test_calculate_metrics(): from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx" mapping_file = r"/data/emea_ar/basic_information/English/sample_doc/emea_doc_with_all_4_dp/doc_ar_data_with_all_4_dp.xlsx" data_df = pd.read_excel(data_file, sheet_name="data_in_doc_mapping") data_df = data_df[data_df["check"].isin([0, 1])] data_df.fillna("", inplace=True) data_df.reset_index(drop=True, inplace=True) mapping_df = pd.read_excel(mapping_file, sheet_name="doc_ar_data_in_db") mapping_fund_id = mapping_df["FundId"].unique().tolist() mapping_share_id = mapping_df["FundClassId"].unique().tolist() mapping_id_list = mapping_fund_id + mapping_share_id # filter data_df whether investment_id in mapping_id_list filter_data_df = data_df[ (data_df["investment_id"].isin(mapping_id_list)) | (data_df["investment_id"] == "") ] # Investment mapping data mapping_metrics = get_sub_metrics(filter_data_df, "investment_mapping") logger.info(f"Investment mapping metrics: {mapping_metrics}") # tor data tor_data_df = filter_data_df[filter_data_df["datapoint"] == "tor"] tor_metrics = get_sub_metrics(tor_data_df, "tor") logger.info(f"TOR metrics: {tor_metrics}") # ter data ter_data_df = filter_data_df[filter_data_df["datapoint"] == "ter"] ter_metrics = get_sub_metrics(ter_data_df, "ter") logger.info(f"TER metrics: {ter_metrics}") # ogc data ogc_data_df = filter_data_df[filter_data_df["datapoint"] == "ogc"] ogc_metrics = get_sub_metrics(ogc_data_df, "ogc") logger.info(f"OGC metrics: {ogc_metrics}") # performance_fee data performance_fee_data_df = filter_data_df[ filter_data_df["datapoint"] == "performance_fee" ] performance_fee_metrics = get_sub_metrics( performance_fee_data_df, "performance_fee" ) logger.info(f"Performance fee metrics: {performance_fee_metrics}") metrics_df = pd.DataFrame( [ mapping_metrics, tor_metrics, ter_metrics, ogc_metrics, performance_fee_metrics, ] ) metrics_df.reset_index(drop=True, inplace=True) output_folder = r"/data/emea_ar/ground_truth/data_extraction/verify/" output_metrics_file = os.path.join( output_folder, r"mapping_data_info_30_documents_all_4_datapoints_roughly_metrics.xlsx", ) with pd.ExcelWriter(output_metrics_file) as writer: metrics_df.to_excel(writer, index=False, sheet_name="metrics") def get_sub_metrics(data_df: pd.DataFrame, data_point: str) -> dict: from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score gt_list = [1] * len(data_df) pre_list = data_df["check"].tolist() # convert pre_list member to be integer pre_list = [int(pre) for pre in pre_list] for index, row in data_df.iterrows(): if row["check"] == 0 and len(row["investment_id"].strip()) > 0: pre_list.append(1) gt_list.append(0) # calculate metrics accuracy = accuracy_score(gt_list, pre_list) precision = precision_score(gt_list, pre_list) recall = recall_score(gt_list, pre_list) f1 = f1_score(gt_list, pre_list) support = len(data_df) metrics = { "DataPoint": data_point, "F1": f1, "Precision": precision, "Recall": recall, "Accuracy": accuracy, "Support": support, } return metrics def replace_rerun_data(new_data_file: str, original_data_file: str): data_in_doc_mapping_sheet = "data_in_doc_mapping" total_mapping_data_sheet = "total_mapping_data" extract_data_sheet = "extract_data" new_data_in_doc_mapping = pd.read_excel( new_data_file, sheet_name=data_in_doc_mapping_sheet ) new_total_mapping_data = pd.read_excel( new_data_file, sheet_name=total_mapping_data_sheet ) new_extract_data = pd.read_excel(new_data_file, sheet_name=extract_data_sheet) document_list = new_data_in_doc_mapping["doc_id"].unique().tolist() original_data_in_doc_mapping = pd.read_excel( original_data_file, sheet_name=data_in_doc_mapping_sheet ) original_total_mapping_data = pd.read_excel( original_data_file, sheet_name=total_mapping_data_sheet ) original_extract_data = pd.read_excel( original_data_file, sheet_name=extract_data_sheet ) # remove data in original data by document_list original_data_in_doc_mapping = original_data_in_doc_mapping[ ~original_data_in_doc_mapping["doc_id"].isin(document_list) ] original_total_mapping_data = original_total_mapping_data[ ~original_total_mapping_data["doc_id"].isin(document_list) ] original_extract_data = original_extract_data[ ~original_extract_data["doc_id"].isin(document_list) ] # merge new data to original data new_data_in_doc_mapping = pd.concat( [original_data_in_doc_mapping, new_data_in_doc_mapping] ) new_data_in_doc_mapping.reset_index(drop=True, inplace=True) new_total_mapping_data = pd.concat( [original_total_mapping_data, new_total_mapping_data] ) new_total_mapping_data.reset_index(drop=True, inplace=True) new_extract_data = pd.concat([original_extract_data, new_extract_data]) new_extract_data.reset_index(drop=True, inplace=True) with pd.ExcelWriter(original_data_file) as writer: new_data_in_doc_mapping.to_excel( writer, index=False, sheet_name=data_in_doc_mapping_sheet ) new_total_mapping_data.to_excel( writer, index=False, sheet_name=total_mapping_data_sheet ) new_extract_data.to_excel(writer, index=False, sheet_name=extract_data_sheet) def batch_run_documents( doc_source: str = "emea_ar", special_doc_id_list: list = None, pdf_folder: str = r"/data/emea_ar/pdf/", document_mapping_file: str = None, output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/", output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/", drilldown_folder: str = r"/data/emea_ar/output/drilldown/", ): sample_document_list_folder = r"./sample_documents/" document_list_files = glob(sample_document_list_folder + "*.txt") page_filter_ground_truth_file = ( r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx" ) re_run_extract_data = True re_run_mapping_data = True force_save_total_data = False calculate_metrics = False extract_way = "text" # special_doc_id_list = [] if special_doc_id_list is None or len(special_doc_id_list) == 0: force_save_total_data = True file_base_name_candidates = [] for document_list_file in document_list_files: file_base_name = os.path.basename(document_list_file).replace(".txt", "") if ( file_base_name_candidates is not None and len(file_base_name_candidates) > 0 and file_base_name not in file_base_name_candidates ): continue with open(document_list_file, "r", encoding="utf-8") as f: doc_id_list = f.readlines() doc_id_list = [doc_id.strip() for doc_id in doc_id_list] batch_start_job( doc_source, pdf_folder, output_pdf_text_folder, page_filter_ground_truth_file, document_mapping_file, output_extract_data_child_folder, output_mapping_child_folder, output_extract_data_total_folder, output_mapping_total_folder, extract_way, drilldown_folder, doc_id_list, re_run_extract_data, re_run_mapping_data, force_save_total_data=force_save_total_data, calculate_metrics=calculate_metrics, total_data_prefix=file_base_name, ) else: batch_start_job( doc_source, pdf_folder, output_pdf_text_folder, page_filter_ground_truth_file, document_mapping_file, output_extract_data_child_folder, output_mapping_child_folder, output_extract_data_total_folder, output_mapping_total_folder, extract_way, drilldown_folder, special_doc_id_list, re_run_extract_data, re_run_mapping_data, force_save_total_data=force_save_total_data, calculate_metrics=calculate_metrics, ) def batch_initial_document( sample_document_list_folder: str = r"./sample_documents/", document_list_file: str = "sample_document_complex.txt", doc_source: str = "emea_ar", pdf_folder: str = r"/data/emea_ar/pdf/", output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/", output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", ): document_list_file_path = os.path.join( sample_document_list_folder, document_list_file ) with open(document_list_file_path, "r", encoding="utf-8") as f: doc_id_list = f.readlines() doc_id_list = [doc_id.strip() for doc_id in doc_id_list] for doc_id in tqdm(doc_id_list): logger.info(f"Start to initial document: {doc_id}") emea_ar_parsing = EMEA_AR_Parsing( doc_id=doc_id, doc_source=doc_source, pdf_folder=pdf_folder, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_folder=output_extract_data_child_folder, output_mapping_data_folder=output_mapping_child_folder, ) def merge_output_data( data_file_path: str, document_mapping_file: str, output_data_file_path: str ): data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data") document_mapping_df = pd.read_excel(document_mapping_file, sheet_name="doc_date") # set doc_id to be string type data_df["doc_id"] = data_df["doc_id"].astype(str) document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str) """ doc_id page_index raw_name datapoint value raw_check comment investment_type investment_id investment_name similarity 553242368 344 Deutsche MSCI World Index Fund tor 61 33 FS0000AY1Y Xtrackers MSCI World Index Fund 0.75 553242368 344 db x-trackers EUR Liquid Corporate 12.5 UCITS ETF - Klasse 1C ter 0.35 1 F000018PY1 Xtrackers EUR Corporate Green Bond UCITS ETF 1C 0.462 """ doc_id_list = data_df["doc_id"].unique().tolist() data_point_dict = { "tor": "TurnoverRatio", "ter": "NetExpenseRatio", "ogc": "OngoingCharge", "performance_fee": "PerformanceFee", } total_data_list = [] for doc_id in tqdm(doc_id_list): doc_data_list = [] doc_data_df = data_df[data_df["doc_id"] == doc_id] doc_date = str( document_mapping_df[document_mapping_df["DocumentId"] == doc_id][ "EffectiveDate" ].values[0] )[0:10] exist_raw_name_list = [] for index, row in doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] investment_type = row["investment_type"] investment_id = row["investment_id"] investment_name = row["investment_name"] exist = False for exist_raw_name_info in exist_raw_name_list: exist_raw_name = exist_raw_name_info["raw_name"] exist_investment_type = exist_raw_name_info["investment_type"] if ( exist_raw_name == raw_name and exist_investment_type == investment_type ): exist = True break if not exist: data = { "DocumentId": doc_id, "investment_type": investment_type, "investment_id": investment_id, "investment_name": investment_name, "EffectiveDate": doc_date, "page_index": [], "RawName": raw_name, "NetExpenseRatio": "", "OngoingCharge": "", "TurnoverRatio": "", "PerformanceFee": "", } exist_raw_name_list.append( {"raw_name": raw_name, "investment_type": investment_type} ) doc_data_list.append(data) # find data from total_data_list by raw_name for data in doc_data_list: if ( data["RawName"] == raw_name and data["investment_type"] == investment_type ): update_key = data_point_dict[datapoint] data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) break total_data_list.extend(doc_data_list) total_data_df = pd.DataFrame(total_data_list) total_data_df.fillna("", inplace=True) with pd.ExcelWriter(output_data_file_path) as writer: total_data_df.to_excel(writer, index=False, sheet_name="total_data") def merge_output_data_aus_prospectus( data_file_path: str, document_mapping_file: str, output_data_file_path: str ): # TODO: merge output data for aus prospectus, plan to realize it on 2025-01-16 data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data") document_mapping_df = pd.read_excel(document_mapping_file, sheet_name="document_mapping") # set doc_id to be string type data_df["doc_id"] = data_df["doc_id"].astype(str) document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str) doc_id_list = data_df["doc_id"].unique().tolist() datapoint_keyword_config_file = r"./configuration/aus_prospectus/datapoint_name.json" with open(datapoint_keyword_config_file, "r", encoding="utf-8") as f: datapoint_keyword_config = json.load(f) datapoint_name_list = list(datapoint_keyword_config.keys()) total_data_list = [] for doc_id in tqdm(doc_id_list): doc_data_list = [] doc_date = str( document_mapping_df[document_mapping_df["DocumentId"] == doc_id][ "EffectiveDate" ].values[0] )[0:10] share_doc_data_df = data_df[(data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 1)] exist_raw_name_list = [] for index, row in share_doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_fund_name = str(row["raw_fund_name"]) raw_share_name = str(row["raw_share_name"]) raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] investment_type = row["investment_type"] share_class_id = row["investment_id"] share_class_legal_name = row["investment_name"] fund_id = "" fund_legal_name = "" if share_class_id != "": record_row = document_mapping_df[document_mapping_df["FundClassId"] == share_class_id] if len(record_row) > 0: fund_id = record_row["FundId"].values[0] fund_legal_name = record_row["FundLegalName"].values[0] exist = False for exist_raw_name_info in exist_raw_name_list: exist_raw_name = exist_raw_name_info["raw_name"] exist_investment_type = exist_raw_name_info["investment_type"] if ( exist_raw_name == raw_name and exist_investment_type == investment_type ): exist = True break if not exist: data = { "DocumentId": doc_id, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": raw_name, "fund_id": fund_id, "fund_name": fund_legal_name, "sec_id": share_class_id, "sec_name": share_class_legal_name, "EffectiveDate": doc_date, "page_index": [], "RawName": raw_name, } for datapoint_name in datapoint_name_list: data[datapoint_name] = "" exist_raw_name_list.append( {"raw_name": raw_name, "investment_type": investment_type} ) doc_data_list.append(data) # find data from total_data_list by raw_name for data in doc_data_list: if ( data["raw_name"] == raw_name ): update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) break fund_doc_data_df = data_df[(data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 33)] for index, row in fund_doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_fund_name = str(row["raw_fund_name"]) raw_share_name = "" raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] fund_id = row["investment_id"] fund_legal_name = row["investment_name"] exist = False if fund_id != "": for data in doc_data_list: if (fund_id != "" and data["fund_id"] == fund_id) or \ (data["raw_fund_name"] == raw_fund_name): update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) exist = True if not exist: data = { "DocumentId": doc_id, "raw_fund_name": raw_fund_name, "raw_share_name": "", "raw_name": raw_name, "fund_id": fund_id, "fund_name": fund_legal_name, "sec_id": "", "sec_name": "", "EffectiveDate": doc_date, "page_index": [page_index], "RawName": raw_name, } for datapoint_name in datapoint_name_list: data[datapoint_name] = "" data[datapoint] = value doc_data_list.append(data) total_data_list.extend(doc_data_list) total_data_df = pd.DataFrame(total_data_list) total_data_df.fillna("", inplace=True) with pd.ExcelWriter(output_data_file_path) as writer: total_data_df.to_excel(writer, index=False, sheet_name="total_data") if __name__ == "__main__": # data_file_path = r"/data/aus_prospectus/output/mapping_data/total/mapping_data_info_11_documents_by_text_20250116220811.xlsx" # document_mapping_file_path = r"/data/aus_prospectus/basic_information/11_documents/document_mapping.xlsx" # merged_total_data_folder = r'/data/aus_prospectus/output/mapping_data/total/merged/' # os.makedirs(merged_total_data_folder, exist_ok=True) # data_file_base_name = os.path.basename(data_file_path) # output_data_file_path = os.path.join(merged_total_data_folder, "merged_" + data_file_base_name) # merge_output_data_aus_prospectus(data_file_path, document_mapping_file_path, output_data_file_path) # doc_source = "aus_prospectus" # sample_document_list_folder: str = r'./sample_documents/' # document_list_file: str = "aus_prospectus_100_documents_multi_fund_sample.txt" # pdf_folder: str = r"/data/aus_prospectus/pdf/" # output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/" # output_extract_data_child_folder: str = r"/data/aus_prospectus/output/extract_data/docs/" # output_mapping_child_folder: str = r"/data/aus_prospectus/output/mapping_data/docs/" # batch_initial_document(sample_document_list_folder=sample_document_list_folder, # document_list_file=document_list_file, # doc_source=doc_source, # pdf_folder=pdf_folder, # output_pdf_text_folder=output_pdf_text_folder, # output_extract_data_child_folder=output_extract_data_child_folder, # output_mapping_child_folder=output_mapping_child_folder) # special_doc_id_list = ["553242411"] doc_source = "emea_ar" if doc_source == "aus_prospectus": document_sample_file = r"./sample_documents/aus_prospectus_100_documents_multi_fund_sample.txt" with open(document_sample_file, "r", encoding="utf-8") as f: special_doc_id_list = [doc_id.strip() for doc_id in f.readlines()] document_mapping_file = r"/data/aus_prospectus/basic_information/from_2024_documents/aus_100_document_prospectus_multi_fund.xlsx" # special_doc_id_list: list = [ # "539790009", # "542300403", # "542301117", # "542306317", # "547567013", # "552505237", # "552505278", # "554431052", # "554851189", # "555377021", # "555654388", # ] # special_doc_id_list: list = ["534287518"] pdf_folder: str = r"/data/aus_prospectus/pdf/" output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/" output_extract_data_child_folder: str = ( r"/data/aus_prospectus/output/extract_data/docs/" ) output_extract_data_total_folder: str = ( r"/data/aus_prospectus/output/extract_data/total/" ) output_mapping_child_folder: str = ( r"/data/aus_prospectus/output/mapping_data/docs/" ) output_mapping_total_folder: str = ( r"/data/aus_prospectus/output/mapping_data/total/" ) drilldown_folder = r"/data/aus_prospectus/output/drilldown/" batch_run_documents( doc_source=doc_source, special_doc_id_list=special_doc_id_list, pdf_folder=pdf_folder, document_mapping_file=document_mapping_file, output_pdf_text_folder=output_pdf_text_folder, output_extract_data_child_folder=output_extract_data_child_folder, output_extract_data_total_folder=output_extract_data_total_folder, output_mapping_child_folder=output_mapping_child_folder, output_mapping_total_folder=output_mapping_total_folder, drilldown_folder=drilldown_folder, ) elif doc_source == "emea_ar": special_doc_id_list = ["553242408"] batch_run_documents( doc_source=doc_source, special_doc_id_list=special_doc_id_list ) # new_data_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_15_documents_by_text_20241121154243.xlsx" # original_data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx" # replace_rerun_data(new_data_file, original_data_file) # test_calculate_metrics() # test_replace_abbrevation() # test_translate_pdf() # test_mapping_raw_name() # test_data_extraction_metrics() # batch_filter_pdf_files( # pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list # ) # data_type = "page_filter" # prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx" # missing_error_list, metrics_list, metrics_file = get_metrics( # data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder # ) # test_auto_generate_instructions() # batch_extract_data( # pdf_folder, # page_filter_ground_truth_file, # output_extract_data_child_folder, # output_extract_data_total_folder, # special_doc_id_list, # re_run, # ) # doc_id = "476492237" # extract_way = "image" # extract_data(doc_id, # pdf_folder, # output_extract_data_child_folder, # extract_way, # re_run_extract_data)