From 75ea5e70de0aa0fc48488e74e2249100b5bfdaa5 Mon Sep 17 00:00:00 2001 From: Blade He Date: Mon, 9 Dec 2024 17:47:42 -0600 Subject: [PATCH] 1. support fetch data from messy-code page by ChatGPT4o Vision function. 2. multilingual share features configuration --- app_emea_ar.py | 33 ++-- core/data_extraction.py | 148 ++++++++++++------ .../data_extraction_prompts_config.json | 4 +- main.py | 90 ++++++++--- utils/biz_utils.py | 12 +- utils/gpt_utils.py | 2 +- 6 files changed, 200 insertions(+), 89 deletions(-) diff --git a/app_emea_ar.py b/app_emea_ar.py index f51929d..8460a3a 100644 --- a/app_emea_ar.py +++ b/app_emea_ar.py @@ -58,19 +58,26 @@ def us_ar_data_extract(): re_run_extract_data = False re_run_mapping_data = False - emea_ar_parsing = EMEA_AR_Parsing(doc_id=doc_id, - pdf_folder=pdf_folder, - output_extract_data_folder=output_extract_data_folder, - output_mapping_data_folder=output_mapping_data_folder, - extract_way=extract_way, - drilldown_folder=drilldown_folder) - doc_data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run=re_run_extract_data) - doc_mapping_data = emea_ar_parsing.mapping_data( - data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data - ) - results = {"extract_data": doc_mapping_data, - "annotation_data": annotation_list} - return jsonify(results) + try: + emea_ar_parsing = EMEA_AR_Parsing(doc_id=doc_id, + pdf_folder=pdf_folder, + output_extract_data_folder=output_extract_data_folder, + output_mapping_data_folder=output_mapping_data_folder, + extract_way=extract_way, + drilldown_folder=drilldown_folder) + doc_data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run=re_run_extract_data) + doc_mapping_data = emea_ar_parsing.mapping_data( + data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data + ) + results = {"extract_data": doc_mapping_data, + "annotation_data": annotation_list} + return jsonify(results) + except Exception as e: + logger.error(f"Error: {e}") + results = {"extract_data": [], + "annotation_data": [], + "error": str(e)} + return jsonify(results) if __name__ == '__main__': diff --git a/core/data_extraction.py b/core/data_extraction.py index b8c9dba..7e326e0 100644 --- a/core/data_extraction.py +++ b/core/data_extraction.py @@ -171,6 +171,8 @@ class DataExtraction: previous_page_datapoints = [] previous_page_fund_name = None for page_num, page_text in self.page_text_dict.items(): + if page_num > 160: + break if page_num in handled_page_num_list: continue page_datapoints = self.get_datapoints_by_page_num(page_num) @@ -184,7 +186,9 @@ class DataExtraction: # example document: 431073795, page index 1727 to 1728 logger.info(f"Transfer previous page fund name: {previous_page_fund_name} to be the pre-fix of page text") page_text = f"\nThe last fund name of previous PDF page: {previous_page_fund_name}\n{page_text}" - + else: + previous_page_fund_name = None + extract_data = self.extract_data_by_page( page_num, page_text, @@ -201,16 +205,19 @@ class DataExtraction: previous_page_num = page_num previous_page_fund_name = page_data_list[-1].get("fund_name", "") previous_page_datapoints = page_datapoints - + extract_way = extract_data.get("extract_way", "text") current_page_data_count = len(page_data_list) if current_page_data_count > 0: count = 1 # some pdf documents have multiple pages for the same data # and the next page may without table header with data point keywords. # the purpose is try to get data from the next page - current_text = page_text + if extract_way == "text": + current_text = page_text + else: + current_text = "" - while count < 3: + while True: try: next_page_num = page_num + count if next_page_num >= pdf_page_count: @@ -231,9 +238,13 @@ class DataExtraction: next_datapoints = list(set(next_datapoints)) if not should_continue: break - next_page_text = self.page_text_dict.get(next_page_num, "") - target_text = current_text + next_page_text + if extract_way == "text": + next_page_text = self.page_text_dict.get(next_page_num, "") + target_text = current_text + next_page_text + else: + target_text = "" # try to get data by current page_datapoints + logger.info(f"Try to get data from next page {next_page_num}") next_page_extract_data = self.extract_data_by_page( next_page_num, target_text, @@ -245,7 +256,8 @@ class DataExtraction: next_page_data_list = next_page_extract_data.get( "extract_data", {} ).get("data", []) - + # print(f"next_page_data_list: {next_page_num}") + # print(next_page_data_list) if next_page_data_list is not None and len(next_page_data_list) > 0: for current_page_data in page_data_list: if current_page_data in next_page_data_list: @@ -254,6 +266,8 @@ class DataExtraction: "data" ] = next_page_data_list data_list.append(next_page_extract_data) + previous_page_num = next_page_num + previous_page_fund_name = next_page_data_list[-1].get("fund_name", "") handled_page_num_list.append(next_page_num) exist_current_page_datapoint = False for next_page_data in next_page_data_list: @@ -379,11 +393,19 @@ class DataExtraction: previous_page_last_fund: str = None) -> dict: # If can't find numberic value, e.g. 1.25 or 3,88 # apply Vision ChatGPT to extract data + special_code_regex = r"\x00|\x01|\x02|\x03|\x04|\x05|\x06|\x07|\x08|\x09|\x0a|\x0b|\x0c|\x0d|\x0e|\x0f|\x10|\x11|\x12|\x13|\x14|\x15|\x16|\x17|\x18|\x19|\x1a|\x1b|\x1c|\x1d|\x1e|\x1f" + special_code_all = [code for code in re.findall(special_code_regex, page_text) + if code != "\n"] + page_text_line_count = len(page_text.split("\n")) numeric_regex = r"\d+(\.|\,)\d+" - if not re.search(numeric_regex, page_text): + if not re.search(numeric_regex, page_text) or page_text_line_count < 3 or len(special_code_all) > 100: logger.info(f"Can't find numberic value in page {page_num}, apply Vision ChatGPT to extract data") return self.extract_data_by_page_image( - page_num, page_datapoints, need_exclude, exclude_data) + page_num=page_num, + page_datapoints=page_datapoints, + need_exclude=False, + exclude_data=None, + previous_page_last_fund=previous_page_last_fund) else: return self.extract_data_by_page_text( page_num=page_num, @@ -401,7 +423,8 @@ class DataExtraction: page_datapoints: list, need_exclude: bool = False, exclude_data: list = None, - previous_page_last_fund: str = None + previous_page_last_fund: str = None, + original_way: str = "text" ) -> dict: """ keys are @@ -427,7 +450,7 @@ class DataExtraction: data_dict["instructions"] = instructions data_dict["raw_answer"] = response data_dict["extract_data"] = {"data": []} - data_dict["extract_way"] = "text" + data_dict["extract_way"] = original_way return data_dict try: data = json.loads(response) @@ -444,7 +467,9 @@ class DataExtraction: data = json_repair.loads(response) except: data = {"data": []} - data = self.validate_data(data, previous_page_last_fund) + data = self.validate_data(extract_data_info=data, + page_text=page_text, + previous_page_last_fund=previous_page_last_fund) data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num @@ -453,7 +478,7 @@ class DataExtraction: data_dict["instructions"] = instructions data_dict["raw_answer"] = response data_dict["extract_data"] = data - data_dict["extract_way"] = "text" + data_dict["extract_way"] = original_way return data_dict def extract_data_by_page_image( @@ -462,54 +487,63 @@ class DataExtraction: page_datapoints: list, need_exclude: bool = False, exclude_data: list = None, + previous_page_last_fund: str = None ) -> dict: """ keys are doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ logger.info(f"Extracting data from page {page_num}") - image_base64 = self.get_pdf_image_base64(page_num) - instructions = self.get_instructions_by_datapoints( - "", - page_datapoints, - need_exclude=need_exclude, - exclude_data=exclude_data, - extract_way="image" - ) - response, with_error = chat( - instructions, response_format={"type": "json_object"}, image_base64=image_base64 - ) - if with_error: - logger.error(f"Error in extracting tables from page") + # image_base64 = self.get_pdf_image_base64(page_num) + page_text = self.get_image_text(page_num) + if page_text is None or len(page_text) == 0: data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["page_text"] = "" - data_dict["instructions"] = instructions - data_dict["raw_answer"] = response + data_dict["instructions"] = "" + data_dict["raw_answer"] = "" data_dict["extract_data"] = {"data": []} data_dict["extract_way"] = "image" return data_dict + else: + if previous_page_last_fund is not None and len(previous_page_last_fund) > 0: + logger.info(f"Transfer previous page fund name: {previous_page_last_fund} to be the pre-fix of page text") + page_text = f"\nThe last fund name of previous PDF page: {previous_page_last_fund}\n{page_text}" + return self.extract_data_by_page_text( + page_num=page_num, + page_text=page_text, + page_datapoints=page_datapoints, + need_exclude=need_exclude, + exclude_data=exclude_data, + previous_page_last_fund=previous_page_last_fund, + original_way="image" + ) + + def get_image_text(self, page_num: int) -> str: + image_base64 = self.get_pdf_image_base64(page_num) + instructions = self.instructions_config.get("get_image_text", "\n") + logger.info(f"Get text from image of page {page_num}") + response, with_error = chat( + instructions, response_format={"type": "json_object"}, image_base64=image_base64 + ) + text = "" + if with_error: + logger.error(f"Can't get text from current image") try: data = json.loads(response) except: try: data = json_repair.loads(response) except: - data = {"data": []} - data = self.validate_data(data, None) + pass + text = data.get("text", "") + return text - data_dict = {"doc_id": self.doc_id} - data_dict["page_index"] = page_num - data_dict["datapoints"] = ", ".join(page_datapoints) - data_dict["page_text"] = "" - data_dict["instructions"] = instructions - data_dict["raw_answer"] = response - data_dict["extract_data"] = data - data_dict["extract_way"] = "image" - return data_dict - - def validate_data(self, extract_data_info: dict, previous_page_last_fund: str=None) -> dict: + def validate_data(self, + extract_data_info: dict, + page_text: str, + previous_page_last_fund: str=None) -> dict: """ Validate data by the rules 1. Each data should be with fund name @@ -519,11 +553,29 @@ class DataExtraction: if len(data_list) == 0: return extract_data_info remove_list = [] + performance_fee_regex = r"Amount\s+of\s+the\s+performance\s+fees|Performance\s+Fees\s+amounts|Performance\s+fees\s+amounts|Commissioni\s+di\s+performance|Performance\s+Fee\s+" + nav_regex = r"based\s+on\s+(the\s+)?NAV|on\s+the\s+Share\s+Class\s+NAV|NAV\s+of\s+performance\s+fee|of\s+the\s+average\s+Net\s+Asset\s+Value|Attivi\s+in\s+gestione|Performance\s+Fee\s+of\s+NAV\s+in" + if page_text is not None and len(page_text) > 0: + performance_fee_search = re.search(performance_fee_regex, page_text) + nav_search = re.search(nav_regex, page_text) + else: + performance_fee_search = None + nav_search = None for data in data_list: + if (data.get("performance_fee", None) is not None and + performance_fee_search is not None and + nav_search is not None): + data.pop("performance_fee") + keys = [key for key in list(data.keys()) + if key not in ["fund name", "share name"]] + if len(keys) == 0: + remove_list.append(data) + continue fund_name = data.get("fund name", "").strip() if fund_name == "": remove_list.append(data) - + continue + # Clean fund name start if previous_page_last_fund is not None and len(previous_page_last_fund) > 0: previous_page_last_fund = previous_page_last_fund.strip() @@ -534,10 +586,10 @@ class DataExtraction: fund_name = self.get_fund_name(fund_name, "Fund") fund_name = self.get_fund_name(fund_name, "Bond") - remove_list = ["Market Specific Equity Sub-Funds", - "International and Regional Equity Sub-Funds", - "Equity Sub-Funds"] - for remove_item in remove_list: + remove_prefix_list = ["Market Specific Equity Sub-Funds", + "International and Regional Equity Sub-Funds", + "Equity Sub-Funds"] + for remove_item in remove_prefix_list: if fund_name.startswith(remove_item): fund_name = fund_name.replace(remove_item, "").strip() @@ -617,7 +669,7 @@ class DataExtraction: extract_data_info["data"] = new_data_list return extract_data_info - + def split_multi_share_name(self, raw_share_name: str) -> list: """ Some document, e.g. 481482392 @@ -739,6 +791,8 @@ class DataExtraction: summary = self.instructions_config.get("summary", "\n") elif extract_way == "image": summary = self.instructions_config.get("summary_image", "\n") + if page_text is not None and len(page_text) > 0: + summary += f"\nThe last fund name of previous PDF page: {page_text}\n" else: summary = self.instructions_config.get("summary", "\n") diff --git a/instructions/data_extraction_prompts_config.json b/instructions/data_extraction_prompts_config.json index c2a676a..2e4eaa7 100644 --- a/instructions/data_extraction_prompts_config.json +++ b/instructions/data_extraction_prompts_config.json @@ -1,6 +1,7 @@ { "summary": "Read the context carefully.\nMaybe exists {} data in the context.\n", "summary_image": "Read the image carefully.\nMaybe exists {} data in the image.\n", + "get_image_text": "Instructions: Please extract the text from the image. output the result as a JSON, the JSON format is like below example(s): {\"text\": \"Text from image\"} \n\nAnswer:\n", "image_features": [ "1. Identify the text in the PDF page image.", @@ -93,7 +94,8 @@ "- With \"Ongoing Charges inkl. Performance-Fee in % **)\" and \"Ongoing Charges inkl. Performance-Fee in % (inkl. Zielfonds)\", pick up the values from \"Ongoing Charges inkl. Performance-Fee in % **)\"." ], "performance_fee": [ - "The performance fees should not be the presence of the rates at which the performance fees are calculated." + "The performance fees should not be the presence of the rates at which the performance fees are calculated.", + "The reported of performance fees should not be \"% based on the NAV at the end of the accounting period\"" ] } }, diff --git a/main.py b/main.py index 8b152bc..4065458 100644 --- a/main.py +++ b/main.py @@ -131,7 +131,11 @@ class EMEA_AR_Parsing: data_from_gpt = {"data": []} # Drilldown data to relevant PDF document - annotation_list = self.drilldown_pdf_document(data_from_gpt) + annotation_list = [] + try: + annotation_list = self.drilldown_pdf_document(data_from_gpt) + except Exception as e: + logger.error(f"Error: {e}") return data_from_gpt, annotation_list def drilldown_pdf_document(self, data_from_gpt: list) -> list: @@ -195,8 +199,12 @@ class EMEA_AR_Parsing: annotation_list_df = pd.DataFrame(annotation_list) # set drilldown_result_df column order as doc_id, pdf_file, page_index, # data_point, value, matching_val_area, normalized_bbox - annotation_list_df = annotation_list_df[["doc_id", "pdf_file", "page_index", - "data_point", "value", "matching_val_area", "normalized_bbox"]] + try: + annotation_list_df = annotation_list_df[["doc_id", "pdf_file", "page_index", + "data_point", "value", "matching_val_area", + "normalized_bbox"]] + except Exception as e: + logger.error(f"Error: {e}") logger.info(f"Writing drilldown data to {drilldown_file}") try: with pd.ExcelWriter(drilldown_file) as writer: @@ -353,6 +361,7 @@ def batch_start_job( re_run_mapping_data: bool = False, force_save_total_data: bool = False, calculate_metrics: bool = False, + total_data_prefix: str = None ): pdf_files = glob(pdf_folder + "*.pdf") doc_list = [] @@ -376,17 +385,21 @@ def batch_start_job( result_extract_data_list = [] result_mapping_data_list = [] for doc_id in tqdm(doc_list): - doc_data_from_gpt, annotation_list, doc_mapping_data_list = mapping_data( - doc_id=doc_id, - pdf_folder=pdf_folder, - output_extract_data_folder=output_extract_data_child_folder, - output_mapping_folder=output_mapping_child_folder, - extract_way=extract_way, - re_run_extract_data=re_run_extract_data, - re_run_mapping_data=re_run_mapping_data, - ) - result_extract_data_list.extend(doc_data_from_gpt) - result_mapping_data_list.extend(doc_mapping_data_list) + try: + doc_data_from_gpt, annotation_list, doc_mapping_data_list = mapping_data( + doc_id=doc_id, + pdf_folder=pdf_folder, + output_extract_data_folder=output_extract_data_child_folder, + output_mapping_folder=output_mapping_child_folder, + extract_way=extract_way, + re_run_extract_data=re_run_extract_data, + re_run_mapping_data=re_run_mapping_data, + ) + result_extract_data_list.extend(doc_data_from_gpt) + result_mapping_data_list.extend(doc_mapping_data_list) + except Exception as e: + logger.error(f"Document: {doc_id} met error: {e}") + print_exc() if force_save_total_data or ( special_doc_id_list is None or len(special_doc_id_list) == 0 @@ -401,10 +414,10 @@ def batch_start_job( unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist() os.makedirs(output_extract_data_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) - output_file = os.path.join( - output_extract_data_total_folder, - f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx", - ) + file_name = f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx" + if total_data_prefix is not None and len(total_data_prefix) > 0: + file_name = f"{total_data_prefix}_{file_name}" + output_file = os.path.join(output_extract_data_total_folder, file_name) with pd.ExcelWriter(output_file) as writer: result_extract_data_df.to_excel( writer, index=False, sheet_name="extract_data_info" @@ -414,10 +427,10 @@ def batch_start_job( unique_doc_ids = result_mappingdata_df["doc_id"].unique().tolist() os.makedirs(output_mapping_total_folder, exist_ok=True) time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) - output_file = os.path.join( - output_mapping_total_folder, - f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx", - ) + file_name = f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx" + if total_data_prefix is not None and len(total_data_prefix) > 0: + file_name = f"{total_data_prefix}_{file_name}" + output_file = os.path.join(output_mapping_total_folder, file_name) doc_mapping_data_in_db = only_output_mapping_data_in_db(result_mappingdata_df) with pd.ExcelWriter(output_file) as writer: @@ -431,7 +444,6 @@ def batch_start_job( writer, index=False, sheet_name="extract_data" ) - if calculate_metrics: prediction_sheet_name = "total_mapping_data" ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" @@ -1149,8 +1161,37 @@ def batch_run_documents(): "527525440", "534535767" ] + # total_data_prefix = "complex_doc_" + # documents from EMEA Case 1.docx + check_db_mapping_doc_id_list = [ + "510483464", + "525412280", + "528208797", + "435128656", + "425480144", + "466528487", + "434902020", + "440029306", + "431073795", + "430240853", + "427637151", + "434924914", + "467595142", + "466859621", + "429564034", + "424976833", + "466860852", + "466371135", + "470515549", + "434851173", + "434710819", + "429950833", + "467788879" + ] + total_data_prefix = "complex_doc_from_word" special_doc_id_list = check_db_mapping_doc_id_list - special_doc_id_list = ["481482392"] + special_doc_id_list = ["334584772", "435128656"] + special_doc_id_list = ["514213638"] pdf_folder = r"/data/emea_ar/pdf/" page_filter_ground_truth_file = ( r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx" @@ -1179,6 +1220,7 @@ def batch_run_documents(): re_run_mapping_data, force_save_total_data=force_save_total_data, calculate_metrics=calculate_metrics, + total_data_prefix=total_data_prefix ) diff --git a/utils/biz_utils.py b/utils/biz_utils.py index d45e9d6..59248a3 100644 --- a/utils/biz_utils.py +++ b/utils/biz_utils.py @@ -380,7 +380,8 @@ def replace_share_name_for_multilingual(text: str, share_name: str): "Kategorie Anteile", "Kategorie anteile", "Clase de participaciones", "Aandelenklasse", "aandelenklasse", "Anteilklasse", "anteilklasse", - "Aktien", "Aktienklasse", "aktien", "aktienklasse"] + "Aktien", "Aktienklasse", "aktien", "aktienklasse", + "Klasse"] for multilingual_share in multilingual_share_list: if multilingual_share in text: text = text.replace(multilingual_share, "Class") @@ -879,11 +880,16 @@ def replace_abbrevation(text: str): text_splits = text.split() new_text_splits = [] for split in text_splits: - if split.lower() in ['acc', 'acc.', 'accumulating']: + if split.lower() in ['acc', 'acc.', 'accumulating', + 'thesaurierende', 'thes.', 'accumulazione', + 'akkumulation', 'acumulación', + 'accumulatie']: new_text_splits.append('Accumulation') elif split.lower() in ['inc', 'inc.']: new_text_splits.append('Income') - elif split.lower() in ['dist', 'dist.', 'dis', 'dis.', "distributing"]: + elif split.lower() in ['dist', 'dist.', 'dis', + 'dis.', 'distributing', 'ausschüttende', + 'aussch.', 'distribuzione']: new_text_splits.append('Distribution') elif split.lower() in ['inv', 'inv.']: new_text_splits.append('Investor') diff --git a/utils/gpt_utils.py b/utils/gpt_utils.py index 70cf148..f5aa511 100644 --- a/utils/gpt_utils.py +++ b/utils/gpt_utils.py @@ -107,7 +107,7 @@ def chat( count = 0 error = "" - request_timeout = 120 + request_timeout = 600 while count < 8: try: if count > 0: