From 0f14bf4a7a045125297053d7f31da9f017186d87 Mon Sep 17 00:00:00 2001 From: Blade He Date: Mon, 23 Sep 2024 17:21:02 -0500 Subject: [PATCH] 1. get document/ provider mapping data 2. optimize metrics algorithm 3. Expand max token length since switch ChatGPT4o to 2024-08-06 version. --- core/data_extraction.py | 10 ++-- core/data_mapping.py | 12 +++-- core/metrics.py | 19 +++++-- core/page_filter.py | 2 +- main.py | 15 +++--- prepare_data.py | 115 +++++++++++++++++++++++++++++++++++++--- utils/gpt_utils.py | 5 +- utils/sql_query_util.py | 54 +++++++++++++------ 8 files changed, 191 insertions(+), 41 deletions(-) diff --git a/core/data_extraction.py b/core/data_extraction.py index fba6bb1..2679d20 100644 --- a/core/data_extraction.py +++ b/core/data_extraction.py @@ -68,7 +68,7 @@ class DataExtraction: ) provider_mapping_list = [] for provider_id in provider_id_list: - provider_mapping_list.append(query_investment_by_provider(provider_id)) + provider_mapping_list.append(query_investment_by_provider(provider_id, rerun=False)) provider_mapping_df = pd.concat(provider_mapping_list) provider_mapping_df = provider_mapping_df.drop_duplicates() provider_mapping_df.reset_index(drop=True, inplace=True) @@ -349,9 +349,11 @@ class DataExtraction: try: # if occur error, perhaps the output length is over 4K tokens # split the context to two parts and try to get data from the two parts - data = self.chat_by_split_context( - page_text, page_datapoints, need_exclude, exclude_data - ) + # Attention: after deploy ChatGPT4o 2024-08-16 version, the max token length is 16K, + # need not to split the context. + # data = self.chat_by_split_context( + # page_text, page_datapoints, need_exclude, exclude_data + # ) if len(data.get("data", [])) == 0: data = json_repair.loads(response) except: diff --git a/core/data_mapping.py b/core/data_mapping.py index 155a9a0..4d0b55e 100644 --- a/core/data_mapping.py +++ b/core/data_mapping.py @@ -22,7 +22,7 @@ class DataMapping: self.datapoints = datapoints self.raw_document_data_list = raw_document_data_list if document_mapping_info_df is None or len(document_mapping_info_df) == 0: - self.document_mapping_info_df = query_document_fund_mapping(doc_id) + self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) else: self.document_mapping_info_df = document_mapping_info_df @@ -41,7 +41,7 @@ class DataMapping: def set_mapping_data_by_db(self, document_mapping_info_df: pd.DataFrame): logger.info("Setting document mapping data") if document_mapping_info_df is None or len(document_mapping_info_df) == 0: - self.document_mapping_info_df = query_document_fund_mapping(self.doc_id) + self.document_mapping_info_df = query_document_fund_mapping(self.doc_id, rerun=False) else: self.document_mapping_info_df = document_mapping_info_df if len(self.document_mapping_info_df) == 0: @@ -92,7 +92,7 @@ class DataMapping: ) provider_mapping_list = [] for provider_id in provider_id_list: - provider_mapping_list.append(query_investment_by_provider(provider_id)) + provider_mapping_list.append(query_investment_by_provider(provider_id, rerun=False)) provider_mapping_df = pd.concat(provider_mapping_list) provider_mapping_df = provider_mapping_df.drop_duplicates() provider_mapping_df.reset_index(drop=True, inplace=True) @@ -126,6 +126,8 @@ class DataMapping: mapped_data = { "doc_id": doc_id, "page_index": page_index, + "raw_fund_name": raw_fund_name, + "raw_share_name": raw_share_name, "raw_name": integrated_share_name, "datapoint": datapoint, "value": raw_data[datapoint], @@ -142,6 +144,8 @@ class DataMapping: mapped_data = { "doc_id": doc_id, "page_index": page_index, + "raw_fund_name": raw_fund_name, + "raw_share_name": "", "raw_name": raw_fund_name, "datapoint": datapoint, "value": raw_data[datapoint], @@ -194,6 +198,8 @@ class DataMapping: mapped_data = { "doc_id": doc_id, "page_index": page_index, + "raw_fund_name": raw_fund_name, + "raw_share_name": raw_share_name, "raw_name": raw_name, "datapoint": datapoint, "value": raw_data[datapoint], diff --git a/core/metrics.py b/core/metrics.py index 1711d26..f49e40a 100644 --- a/core/metrics.py +++ b/core/metrics.py @@ -3,7 +3,7 @@ import pandas as pd import time import json from sklearn.metrics import precision_score, recall_score, f1_score -from utils.biz_utils import get_unique_words_text, get_beginning_common_words +from utils.biz_utils import get_unique_words_text, get_beginning_common_words, remove_special_characters from utils.logger import logger @@ -73,7 +73,15 @@ class Metrics: ground_truth_df = pd.read_excel( self.ground_truth_file, sheet_name=self.ground_truth_sheet_name ) - ground_truth_df = ground_truth_df[ground_truth_df["Checked"] == 1] + if self.data_type == "page_filter": + ground_truth_df = ground_truth_df[ground_truth_df["Checked"] == 1] + elif self.data_type == "data_extraction": + ground_truth_df = ground_truth_df[ground_truth_df["rawname_checked"] == 1] + elif self.data_type == "investment_mapping": + ground_truth_df = ground_truth_df[ground_truth_df["mapping_checked"] == 1] + else: + logger.error(f"Invalid data type: {self.data_type}") + return [], [] tor_true = [] tor_pred = [] @@ -116,7 +124,7 @@ class Metrics: performance_fee_true.extend(true_data) performance_fee_pred.extend(pred_data) missing_error_list.append(missing_error_data) - else: + elif self.data_type == "data_extraction": prediction_doc_id_list = prediction_df["doc_id"].unique().tolist() ground_truth_doc_id_list = ground_truth_df["doc_id"].unique().tolist() # get intersection of doc_id_list @@ -148,6 +156,8 @@ class Metrics: performance_fee_true.extend(true_data) performance_fee_pred.extend(pred_data) missing_error_list.extend(missing_error_data) + elif self.data_type == "investment_mapping": + pass metrics_list = [] for data_point in data_point_list: @@ -500,8 +510,9 @@ class Metrics: simple_raw_name = raw_name else: simple_raw_name = raw_name + simple_raw_name = remove_special_characters(simple_raw_name) temp_splits = [word for word in simple_raw_name.split() - if word.lower() not in ["class", "usd"]] + if word.lower() not in ["class"]] if len(temp_splits) > 0: simple_raw_name = " ".join( word diff --git a/core/page_filter.py b/core/page_filter.py index 07328c3..6fd847e 100644 --- a/core/page_filter.py +++ b/core/page_filter.py @@ -17,7 +17,7 @@ class FilterPages: self.pdf_file = pdf_file self.page_text_dict = self.get_pdf_page_text_dict() if document_mapping_info_df is None or len(document_mapping_info_df) == 0: - self.document_mapping_info_df = query_document_fund_mapping(doc_id) + self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) else: self.document_mapping_info_df = document_mapping_info_df self.get_configuration_from_file() diff --git a/main.py b/main.py index 92ded42..d299a96 100644 --- a/main.py +++ b/main.py @@ -26,7 +26,7 @@ class EMEA_AR_Parsing: self.pdf_folder = pdf_folder os.makedirs(self.pdf_folder, exist_ok=True) self.pdf_file = self.download_pdf() - self.document_mapping_info_df = query_document_fund_mapping(doc_id) + self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) if extract_way is None or len(extract_way) == 0: extract_way = "text" @@ -426,7 +426,7 @@ def test_auto_generate_instructions(): """ doc_id = "402397014" pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf" - document_mapping_info_df = query_document_fund_mapping(doc_id) + document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df) page_text_dict = filter_pages.page_text_dict datapoint_page_info, datapoint_page_info_details = filter_pages.start_job() @@ -522,9 +522,9 @@ def test_auto_generate_instructions(): def test_data_extraction_metrics(): data_type = "data_extraction" - prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx" - # prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_text_20240920153730.xlsx" - # prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/469138353.xlsx" + # prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx" + prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_text_20240922152517.xlsx" + # prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx" prediction_sheet_name = "mapping_data" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" ground_truth_sheet_name = "mapping_data" @@ -560,7 +560,7 @@ if __name__ == "__main__": output_extract_data_child_folder = r"/data/emea_ar/output/extract_data/docs/" output_extract_data_total_folder = r"/data/emea_ar/output/extract_data/total/" - re_run_extract_data = True + # batch_extract_data( # pdf_folder, # page_filter_ground_truth_file, @@ -582,7 +582,8 @@ if __name__ == "__main__": special_doc_id_list = [] output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/" output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/" - re_run_mapping_data = True + re_run_extract_data = False + re_run_mapping_data = False force_save_total_data = False extract_ways = ["text"] diff --git a/prepare_data.py b/prepare_data.py index 95a7cf5..72d69b3 100644 --- a/prepare_data.py +++ b/prepare_data.py @@ -878,6 +878,100 @@ def compare_records_count_by_document_id(): ) +def get_document_extracted_share_diff_by_db(): + db_data_file = r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document_from_DocumentAcquisition.xlsx" + extract_data_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" + + doc_mapping_folder = r"/data/emea_ar/output/mapping/document/" + db_data = pd.read_excel(db_data_file, sheet_name="Sheet1") + extract_data = pd.read_excel(extract_data_file, sheet_name="mapping_data") + # only get data which investment_type is 1 + extract_data = extract_data[extract_data["investment_type"] == 1] + extract_data.reset_index(drop=True, inplace=True) + unique_doc_id = extract_data["doc_id"].unique().tolist() + + status_info = { + 1: "WIP", + 5: "Junked", + 3: "AutoSignoff", + 2: "Signoffed", + 10: "Complete", + 20: "AutoDetect", + 21: "Checkduplicate", + 22: "Mapping", + 33: "Not Matched", + 99: "Unknown", + } + document_extract_db_compare = [] + for doc_id in unique_doc_id: + doc_mapping_file = os.path.join(doc_mapping_folder, f"{doc_id}.xlsx") + if not os.path.exists(doc_mapping_file): + logger.error(f"Invalid mapping_file: {doc_mapping_file}") + doc_mapping_share_class_id_df = pd.DataFrame() + else: + doc_mapping_data = pd.read_excel(doc_mapping_file) + doc_mapping_share_class_id_df = doc_mapping_data[["SecId"]].drop_duplicates() + + ar_db_data_doc = db_data[db_data["DocumentId"] == doc_id] + try: + masterProcess_status = ar_db_data_doc["MasterProcess_Status"].values[0] + except Exception as e: + logger.error(f"Error: {e}") + masterProcess_status = 99 + masterProcess_status = int(masterProcess_status) + masterProcess_status_defination = status_info.get(masterProcess_status, "Unknown") + # get data from ar_db_data_doc which noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0 + ar_db_data_doc = ar_db_data_doc[ + (ar_db_data_doc["noTor"] == 0) + | (ar_db_data_doc["share_noTer"] == 0) + | (ar_db_data_doc["share_noOgc"] == 0) + | (ar_db_data_doc["share_noPerfFee"] == 0) + ] + + extract_data_doc = extract_data[extract_data["doc_id"] == doc_id] + # unique raw_name in extract_data_doc + unique_raw_name = extract_data_doc["raw_name"].unique().tolist() + + doc_mapping_share_class_count = len(doc_mapping_share_class_id_df) + extract_share_class_count = len(unique_raw_name) + extract_vs_doc_share_count_diff = extract_share_class_count - doc_mapping_share_class_count + db_share_class_count = len(ar_db_data_doc) + extract_vs_ar_db_share_count_diff = extract_share_class_count - db_share_class_count + document_extract_db_compare.append({ + "DocumentId": doc_id, + "status": masterProcess_status, + "status_defination": masterProcess_status_defination, + "extract_share_count": extract_share_class_count, + "doc_share_count": doc_mapping_share_class_count, + "extract_vs_doc_share_count_diff": extract_vs_doc_share_count_diff, + "ar_db_share_count": db_share_class_count, + "extract_vs_ar_db_share_count_diff": extract_vs_ar_db_share_count_diff, + }) + document_extract_db_compare_df = pd.DataFrame(document_extract_db_compare) + # output to excel + document_extract_db_compare_file = ( + r"/data/emea_ar/basic_information/English/document_extract_db_compare.xlsx" + ) + with pd.ExcelWriter(document_extract_db_compare_file) as writer: + document_extract_db_compare_df.to_excel( + writer, sheet_name="document_extract_db_compare", index=False + ) + + +def concat_mapping(mapping_folder: str, + output_file: str): + excel_files = glob(os.path.join(mapping_folder, "*.xlsx")) + logger.info(f"Total {len(excel_files)} excel files found in {mapping_folder}") + all_data_list = [] + for excel_file in excel_files: + doc_mapping_data = pd.read_excel(excel_file) + all_data_list.append(doc_mapping_data) + all_data = pd.concat(all_data_list) + all_data.reset_index(drop=True, inplace=True) + with open(output_file, "wb") as f: + all_data.to_excel(f, index=False) + + if __name__ == "__main__": doc_provider_file_path = ( r"/data/emea_ar/basic_information/English/latest_provider_ar_document.xlsx" @@ -913,12 +1007,12 @@ if __name__ == "__main__": # pdf_folder) output_data_folder = r"/data/emea_ar/basic_information/English/top_100_provider_latest_document_most_mapping/" - statistics_document(pdf_folder=pdf_folder, - doc_mapping_file_path=latest_top_100_provider_ar_data_file, - sheet_name="latest_doc_ar_data", - output_folder=output_data_folder, - output_file="latest_doc_ar_mapping_statistics.xlsx") - + # statistics_document(pdf_folder=pdf_folder, + # doc_mapping_file_path=latest_top_100_provider_ar_data_file, + # sheet_name="latest_doc_ar_data", + # output_folder=output_data_folder, + # output_file="latest_doc_ar_mapping_statistics.xlsx") + # get_document_extracted_share_diff_by_db() # statistics_provider_mapping( # provider_mapping_data_file=provider_mapping_data_file, # output_folder=basic_info_folder, @@ -926,3 +1020,12 @@ if __name__ == "__main__": # statistics_document_fund_share_count(doc_mapping_from_top_100_provider_file) # pickup_document_from_top_100_providers() # compare_records_count_by_document_id() + + document_mapping_folder = r"/data/emea_ar/output/mapping/document/" + all_data_file = r"/data/emea_ar/output/mapping/all_document_mapping.xlsx" + concat_mapping(document_mapping_folder, all_data_file) + + provider_mapping_folder = r"/data/emea_ar/output/mapping/provider/" + all_data_file = r"/data/emea_ar/output/mapping/all_provider_mapping.xlsx" + concat_mapping(provider_mapping_folder, all_data_file) + diff --git a/utils/gpt_utils.py b/utils/gpt_utils.py index b8d0e9f..253a179 100644 --- a/utils/gpt_utils.py +++ b/utils/gpt_utils.py @@ -69,10 +69,14 @@ def chat( api_key=os.getenv("OPENAI_API_KEY_GPT4o"), api_version=os.getenv("OPENAI_API_VERSION_GPT4o"), temperature: float = 0.0, + max_tokens = 10240, response_format: dict = None, image_file: str = None, image_base64: str = None, ): + if engine != "gpt-4o-2024-08-06-research": + max_tokens = 4096 + client = AzureOpenAI( azure_endpoint=azure_endpoint, api_key=api_key, api_version=api_version ) @@ -103,7 +107,6 @@ def chat( count = 0 error = "" - max_tokens = 4096 request_timeout = 120 while count < 8: try: diff --git a/utils/sql_query_util.py b/utils/sql_query_util.py index 0e8cddf..1ed64c1 100644 --- a/utils/sql_query_util.py +++ b/utils/sql_query_util.py @@ -8,18 +8,29 @@ import dotenv dotenv.load_dotenv() -def query_document_fund_mapping(doc_id): +def query_document_fund_mapping(doc_id, rerun=True, output_folder=r"/data/emea_ar/output/mapping/document/"): count = 1 while True: try: - document_mapping_info_df = query_data_by_biz_type( - biztype="getFundInfoByDocId", para=doc_id, return_df=True - ).drop_duplicates() + document_mapping_info_df = pd.DataFrame() + if rerun is False and output_folder is not None and len(output_folder) > 0 and os.path.exists(output_folder): + output_file = os.path.join(output_folder, f"{doc_id}.xlsx") + if os.path.exists(output_file): + document_mapping_info_df = pd.read_excel(output_file) if len(document_mapping_info_df) == 0: - return document_mapping_info_df - document_mapping_info_df = document_mapping_info_df.sort_values( - by=["FundName", "ShareClassName"] - ).reset_index(drop=True) + document_mapping_info_df = query_data_by_biz_type( + biztype="getFundInfoByDocId", para=doc_id, return_df=True + ).drop_duplicates() + if len(document_mapping_info_df) == 0: + return document_mapping_info_df + document_mapping_info_df = document_mapping_info_df.sort_values( + by=["FundName", "ShareClassName"] + ).reset_index(drop=True) + if output_folder is not None and len(output_folder) > 0: + os.makedirs(output_folder, exist_ok=True) + output_file = os.path.join(output_folder, f"{doc_id}.xlsx") + with pd.ExcelWriter(output_file) as writer: + document_mapping_info_df.to_excel(writer, index=False) return document_mapping_info_df except Exception as e: print(e) @@ -29,16 +40,29 @@ def query_document_fund_mapping(doc_id): count += 1 -def query_investment_by_provider(company_id: str): +def query_investment_by_provider(company_id: str, rerun=True, output_folder=r"/data/emea_ar/output/mapping/provider/"): count = 1 while True: try: - investment_by_provider_df = query_data_by_biz_type(biztype='getInvestmentByProvider', - para=company_id, - return_df=True).drop_duplicates() - investment_by_provider_df = investment_by_provider_df \ - .sort_values(by=['FundName', 'ShareClassName']) \ - .reset_index(drop=True) + investment_by_provider_df = pd.DataFrame() + if rerun is False and output_folder is not None and len(output_folder) > 0 and os.path.exists(output_folder): + output_file = os.path.join(output_folder, f"{company_id}.xlsx") + if os.path.exists(output_file): + investment_by_provider_df = pd.read_excel(output_file) + if len(investment_by_provider_df) == 0: + investment_by_provider_df = query_data_by_biz_type(biztype='getInvestmentByProvider', + para=company_id, + return_df=True).drop_duplicates() + if len(investment_by_provider_df) == 0: + return investment_by_provider_df + investment_by_provider_df = investment_by_provider_df \ + .sort_values(by=['FundName', 'ShareClassName']) \ + .reset_index(drop=True) + if output_folder is not None and len(output_folder) > 0: + os.makedirs(output_folder, exist_ok=True) + output_file = os.path.join(output_folder, f"{company_id}.xlsx") + with pd.ExcelWriter(output_file) as writer: + investment_by_provider_df.to_excel(writer, index=False) return investment_by_provider_df except Exception as e: print(e)