import os import json import json_repair import re import fitz import pandas as pd from traceback import print_exc from utils.gpt_utils import chat from utils.pdf_util import PDFUtil from utils.sql_query_util import query_document_fund_mapping, query_investment_by_provider from utils.logger import logger from utils.biz_utils import add_slash_to_text_as_regex, clean_text, \ get_most_similar_name, remove_abundant_data, replace_special_table_header class DataExtraction: def __init__( self, doc_source: str, doc_id: str, pdf_file: str, output_data_folder: str, page_text_dict: dict, datapoint_page_info: dict, datapoints: list, document_mapping_info_df: pd.DataFrame, extract_way: str = "text", output_image_folder: str = None, ) -> None: self.doc_source = doc_source self.doc_id = doc_id self.pdf_file = pdf_file self.configuration_folder = f"./configuration/{doc_source}/" self.instruction_folder = f"./instructions/{doc_source}/" if output_data_folder is None or len(output_data_folder) == 0: output_data_folder = r"/data/emea_ar/output/extract_data/docs/" os.makedirs(output_data_folder, exist_ok=True) self.output_data_json_folder = os.path.join(output_data_folder, "json/") os.makedirs(self.output_data_json_folder, exist_ok=True) self.output_data_excel_folder = os.path.join(output_data_folder, "excel/") os.makedirs(self.output_data_excel_folder, exist_ok=True) if page_text_dict is None or len(page_text_dict.keys()) == 0: self.page_text_dict = self.get_pdf_page_text_dict() else: self.page_text_dict = page_text_dict if document_mapping_info_df is None or len(document_mapping_info_df) == 0: self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) else: self.document_mapping_info_df = document_mapping_info_df self.fund_name_list = self.document_mapping_info_df["FundName"].unique().tolist() # get document type by DocumentType in self.document_mapping_info_df self.document_type = int(self.document_mapping_info_df["DocumentType"].iloc[0]) self.investment_objective_pages = [] if self.document_type == 1: self.investment_objective_pages = self.get_investment_objective_pages() self.provider_mapping_df = self.get_provider_mapping() if len(self.provider_mapping_df) == 0: self.provider_fund_name_list = [] else: self.provider_fund_name_list = ( self.provider_mapping_df["FundName"].unique().tolist() ) self.document_category, self.document_production = self.get_document_category_production() self.datapoint_page_info = self.get_datapoint_page_info(datapoint_page_info) self.page_nums_with_datapoints = self.get_page_nums_from_datapoint_page_info() self.datapoints = datapoints self.instructions_config = self.get_instructions_config() self.datapoint_level_config = self.get_datapoint_level() self.datapoint_type_config = self.get_datapoint_type() self.datapoint_name_config = self.get_datapoint_name() self.replace_table_header_config = self.get_replace_table_header_config() self.datapoint_reported_name_config, self.non_english_reported_name_config = \ self.get_datapoint_reported_name() self.extract_way = extract_way self.output_image_folder = output_image_folder def get_document_category_production(self): document_category = None document_production = None if self.doc_source == "aus_prospectus": first_4_page_text = "" for page_index, page_text in self.page_text_dict.items(): if page_index > 3: break first_4_page_text += page_text + "\n" first_4_page_text = clean_text(first_4_page_text) document_category_prompt_file = os.path.join(self.instruction_folder, "document_category_prompts.json") with open(document_category_prompt_file, "r", encoding="utf-8") as f: document_category_prompt = "\n".join(json.load(f).get("prompts", [])) if len(document_category_prompt) > 0: prompts = f"Context: \n{first_4_page_text}\n\Instructions: \n{document_category_prompt}" result, with_error = chat( prompt=prompts, response_format={"type": "json_object"}, max_tokens=1000 ) response = result.get("response", "") if not with_error: try: data = json.loads(response) document_category = data.get("document_category", None) document_production = data.get("document_production", None) except: pass return document_category, document_production def get_datapoint_page_info(self, datapoint_page_info: dict) -> dict: """ If document source is aus_propectus and document category is MIS then remove the administration_fee from datapoint_page_info """ if self.doc_source == "aus_prospectus" and self.document_category.upper() == "MIS": if "administration_fees" in list(datapoint_page_info.keys()): datapoint_page_info.pop("administration_fees") if "total_annual_dollar_based_charges" in list(datapoint_page_info.keys()): datapoint_page_info.pop("total_annual_dollar_based_charges") return datapoint_page_info def get_investment_objective_pages(self): investment_objective_pages = [] if self.document_type == 1: objective_strategy_regex_config_file = os.path.join(self.configuration_folder, "objective_strategy_regex.json") with open(objective_strategy_regex_config_file, "r", encoding="utf-8") as f: objective_strategy_regex_config = json.load(f) objective_start_regex = objective_strategy_regex_config.get("objective_strategy", {}).get("start", "") if objective_start_regex is not None and len(objective_start_regex) > 0: for page_index, text in self.page_text_dict.items(): if re.search(objective_start_regex, text, re.I): investment_objective_pages.append(page_index) if len(investment_objective_pages) > 0: investment_objective_pages.sort() return investment_objective_pages def get_datapoint_reported_name(self): language_config_file = os.path.join(self.configuration_folder, "language.json") self.language_config = {} with open(language_config_file, "r", encoding="utf-8") as file: self.language_config = json.load(file) self.language_id = self.document_mapping_info_df["Language"].iloc[0] self.language = self.language_config.get(self.language_id, None) datapoint_reported_name_config_file = os.path.join(self.configuration_folder, "datapoint_reported_name.json") all_datapoint_reported_name = {} with open(datapoint_reported_name_config_file, "r", encoding="utf-8") as file: all_datapoint_reported_name = json.load(file) non_english_reported_name_config = {} datapoint_reported_name_config = {} common_language = "english" for datapoint, language_reported_name in all_datapoint_reported_name.items(): reported_name_list = language_reported_name.get(common_language, []) if self.language != "english": reported_name_list.extend(language_reported_name.get(self.language, [])) non_english_reported_name_config[datapoint] = language_reported_name.get(self.language, []) # remove duplicate reported name reported_name_list = list(set(reported_name_list)) # sort the reported name reported_name_list.sort() datapoint_reported_name_config[datapoint] = reported_name_list return datapoint_reported_name_config, non_english_reported_name_config def get_provider_mapping(self): if len(self.document_mapping_info_df) == 0: return pd.DataFrame() provider_id_list = ( self.document_mapping_info_df["ProviderId"].unique().tolist() ) provider_mapping_list = [] for provider_id in provider_id_list: provider_mapping_list.append(query_investment_by_provider(provider_id, rerun=False)) provider_mapping_df = pd.concat(provider_mapping_list) provider_mapping_df = provider_mapping_df.drop_duplicates() provider_mapping_df.reset_index(drop=True, inplace=True) return provider_mapping_df def get_pdf_image_base64(self, page_index: int) -> dict: pdf_util = PDFUtil(self.pdf_file) return pdf_util.extract_image_from_page(page_index=page_index, output_folder=self.output_image_folder) def get_instructions_config(self) -> dict: instructions_config_file = os.path.join(self.instruction_folder, "data_extraction_prompts_config.json") with open(instructions_config_file, "r", encoding="utf-8") as f: instructions_config = json.load(f) return instructions_config def get_datapoint_level(self) -> dict: datapoint_level_file = os.path.join(self.configuration_folder, "datapoint_level.json") with open(datapoint_level_file, "r", encoding="utf-8") as f: datapoint_level = json.load(f) return datapoint_level def get_datapoint_type(self) -> dict: datapoint_type_file = os.path.join(self.configuration_folder, "datapoint_type.json") with open(datapoint_type_file, "r", encoding="utf-8") as f: datapoint_type = json.load(f) return datapoint_type def get_datapoint_name(self) -> dict: datapoint_name_file = os.path.join(self.configuration_folder, "datapoint_name.json") with open(datapoint_name_file, "r", encoding="utf-8") as f: datapoint_name = json.load(f) return datapoint_name def get_replace_table_header_config(self) -> str: replace_table_header_file = os.path.join(self.configuration_folder, "replace_table_header.json") if os.path.exists(replace_table_header_file): with open(replace_table_header_file, "r", encoding="utf-8") as f: replace_table_header_config = json.load(f).get("details", []) return replace_table_header_config else: return [] def get_pdf_page_text_dict(self) -> dict: pdf_util = PDFUtil(self.pdf_file) success, text, page_text_dict = pdf_util.extract_text() return page_text_dict def get_page_nums_from_datapoint_page_info(self) -> list: page_nums_with_datapoints = [] for datapoint, page_nums in self.datapoint_page_info.items(): if datapoint == "doc_id": continue page_nums_with_datapoints.extend(page_nums) page_nums_with_datapoints = list(set(page_nums_with_datapoints)) # sort the page numbers page_nums_with_datapoints.sort() return page_nums_with_datapoints def extract_data(self) -> dict: logger.info(f"Extracting data from document {self.doc_id}, extract way: {self.extract_way}") if self.extract_way == "text": data_list = self.extract_data_by_text() elif self.extract_way == "image": data_list = self.extract_data_by_image() else: data_list = self.extract_data_by_text() if self.doc_source == "aus_prospectus": data_list = self.post_supplement_data(data_list) # data_list = remove_abundant_data(data_list) self.output_data_to_file(data_list) return data_list def post_supplement_data(self, data_list: list) -> list: """ data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["page_text"] = page_text data_dict["instructions"] = instructions data_dict["raw_answer"] = response data_dict["extract_data"] = data data_dict["extract_way"] = original_way data_dict["prompt_token"] = result.get("prompt_token", 0) data_dict["completion_token"] = result.get("completion_token", 0) data_dict["total_token"] = result.get("total_token", 0) """ data_list = self.supplement_ttr_pension(data_list) data_list = self.align_fund_share_name(data_list) data_list = self.supplement_minimum_initial_investment(data_list) data_list, datapoint_list_with_production_name = self.post_adjust_for_value_with_production_name(data_list) data_list = self.remove_duplicate_data(data_list) if "management_fee" not in datapoint_list_with_production_name and "management_fee_and_costs" not in datapoint_list_with_production_name: data_list = self.post_adjust_management_fee_costs(data_list) data_list = self.check_administration_fees(data_list) data_list = self.check_benchmark(data_list) return data_list def check_benchmark(self, data_list: list): """ Remove illegal benchmark data e.g. annual growth in dividends received from the underlying companies The fund's composite benchmark is shown on page 11 benchmark A range of published indices composite benchmark fund’s composite benchmark The rules are: 1. If starts with alphabet and not starts with upper case, then remove it 2. If starts with "A range", then remove it 3. If starts with "The fund", then remove it """ for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) remove_items = [] for data_item in data: keys = list(data_item.keys()) if "benchmark_name" not in keys: continue benchmark_name = data_item.get("benchmark_name", "") if benchmark_name.startswith("A range") or benchmark_name.startswith("The fund"): data_item.pop("benchmark_name") elif benchmark_name[0].isalpha() and not benchmark_name[0].isupper(): data_item.pop("benchmark_name") else: pass keys = [key for key in keys if key not in ["fund_name", "share_name"]] if len(keys) == 0: remove_items.append(data_item) for remove_item in remove_items: if remove_item in extract_data["data"]: extract_data["data"].remove(remove_item) def align_fund_share_name(self, data_list: list): """ Align the fund name and share name to be the same format """ if self.document_production is None or len(self.document_production) == 0: return data_list fund_name_list = [] for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) for data_item in data: fund_name = data_item.get("fund_name", "") if len(fund_name) == 0: continue if fund_name not in fund_name_list: fund_name_list.append(fund_name) for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) for data_item in data: fund_name = data_item.get("fund_name", "") share_name = data_item.get("share_name", "") if len(fund_name) == 0: continue for c_fund_name in fund_name_list: if c_fund_name == fund_name: continue if len(fund_name) < len(c_fund_name) and c_fund_name.endswith(fund_name): if c_fund_name.replace(fund_name, "").strip() in self.document_production: data_item["fund_name"] = c_fund_name if share_name == fund_name: data_item["share_name"] = c_fund_name break return data_list def supplement_ttr_pension(self, data_list: list): """ If with fund name ends with "TTR" and "Pension", and exist same fund name without "TTR" or "Pension", Supplement the data of "TTR" and "Pension" to the same fund name without "TTR" or "Pension" """ ttr_fund_name_list = [] pension_fund_name_list = [] exist_ttr = False exist_pension = False for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) for data_item in data: keys = list(data_item.keys()) fund_name = data_item.get("fund_name", "") if len(fund_name) == 0: continue fund_name_splits = fund_name.split() if fund_name_splits[-1] == "TTR": ttr_fund_name_list.append(fund_name) exist_ttr = True if fund_name_splits[-1] == "Pension": pension_fund_name_list.append(fund_name) exist_pension = True if exist_ttr and exist_pension: for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) new_item_list = [] remove_item_list = [] for data_item in data: fund_name = data_item.get("fund_name", "") share_name = data_item.get("share_name", "") if len(fund_name) == 0: continue fund_name_splits = fund_name.split() if fund_name_splits[-1] == "TTR" or fund_name_splits[-1] == "Pension": continue keys = [key for key in list(data_item.keys()) if key not in ["fund_name", "share_name"]] for ttr_fund_name in ttr_fund_name_list: ttr_pure_fund_name = ttr_fund_name.replace(" TTR", "") if fund_name == ttr_pure_fund_name: new_fund_name = f"{fund_name} TTR" if share_name == fund_name: share_name = new_fund_name new_item = {"fund_name": new_fund_name, "share_name": share_name} for key in keys: new_item[key] = data_item.get(key, "") new_item_list.append(new_item) if data_item not in remove_item_list: remove_item_list.append(data_item) break for pension_fund_name in pension_fund_name_list: pernsion_pure_fund_name = pension_fund_name.replace(" Pension", "") if fund_name == pernsion_pure_fund_name: new_fund_name = f"{fund_name} Pension" if share_name == fund_name: share_name = new_fund_name new_item = {"fund_name": new_fund_name, "share_name": share_name} for key in keys: new_item[key] = data_item.get(key, "") new_item_list.append(new_item) if data_item not in remove_item_list: remove_item_list.append(data_item) break for remove_item in remove_item_list: if remove_item in data: data.remove(remove_item) data.extend(new_item_list) return data_list def check_administration_fees(self, data_list: list): """ If document source is aus_prospectus and document category is MIS, then remove the administration fees from data_list """ if self.doc_source == "aus_prospectus" and self.document_category.upper() == "MIS": for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) remove_items = [] for data_item in data: keys = list(data_item.keys()) if "administration_fees" in keys: data_item.pop("administration_fees") if "total_annual_dollar_based_charges" in keys: data_item.pop("total_annual_dollar_based_charges") keys = [key for key in list(data_item.keys()) if key not in ["fund_name", "share_name"]] if len(keys) == 0: remove_items.append(data_item) for remove_item in remove_items: try: if remove_item in extract_data["data"]: extract_data["data"].remove(remove_item) except: pass return data_list def post_adjust_for_value_with_production_name(self, data_list: list): """ If some datapoint with production name, then each fund/ share class in the same document for the datapoint should be with same value. """ raw_name_dict = self.get_raw_name_dict(data_list) raw_name_list = list(raw_name_dict.keys()) raw_name_as_production_name = None for raw_name in raw_name_list: if raw_name.lower() in self.document_production.lower(): raw_name_as_production_name = raw_name break datapoint_list_with_production_name = [] if raw_name_as_production_name is None: return data_list, datapoint_list_with_production_name raw_name_dict.pop(raw_name_as_production_name) for data_dict in data_list: # if data_dict.get("page_index", -1) > 9: # break extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) remove_item_list = [] new_dp_item_list = [] for data_item in data: keys = list(data_item.keys()) fund_name = data_item.get("fund_name", "") share_name = data_item.get("share_name", "") raw_name = self.get_raw_name(fund_name, share_name) if raw_name.lower() in self.document_production.lower(): dp_keys = [key for key in keys if key not in ["fund_name", "share_name", "management_fee_and_costs", "management_fee", "buy_spread", "sell_spread"]] for dp_key in dp_keys: if dp_key not in datapoint_list_with_production_name: datapoint_list_with_production_name.append(dp_key) remove_item_list.append(data_item) for v_raw_name, v_dict in raw_name_dict.items(): v_fund_name = v_dict.get("fund_name", "") v_share_name = v_dict.get("share_name", "") if len(share_name) > 0: new_dp_item = {"fund_name": v_fund_name, "share_name": v_share_name} else: new_dp_item = {"fund_name": v_fund_name} for dp_key in dp_keys: new_dp_item[dp_key] = data_item.get(dp_key, "") new_dp_item["source"] = "from_production_name" new_dp_item_list.append(new_dp_item) for remove_item in remove_item_list: if remove_item in extract_data["data"]: extract_data["data"].remove(remove_item) if len(new_dp_item_list) > 0: extract_data["data"].extend(new_dp_item_list) if len(datapoint_list_with_production_name) == 0: return data_list, datapoint_list_with_production_name for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) remove_item_list = [] for data_item in data: source = data_item.get("source", "") if source == "from_production_name": continue keys = list(data_item.keys()) dp_keys = [key for key in keys if key not in ["fund_name", "share_name"]] for dp_key in dp_keys: if dp_key in datapoint_list_with_production_name: data_item.pop(dp_key) keys = list(data_item.keys()) dp_keys = [key for key in keys if key not in ["fund_name", "share_name"]] if len(dp_keys) == 0: remove_item_list.append(data_item) for remove_item in remove_item_list: if remove_item in extract_data["data"]: extract_data["data"].remove(remove_item) return data_list, datapoint_list_with_production_name def remove_duplicate_data(self, data_list: list): """ The purpose is to remove duplicate data in the different pages. Reason: 1. Some pdf documents have multiple pages for the same data 2. Usually, the first data is the latest data, and the others is the older data. 3. That's why we need to remove the duplicate data in the different pages. """ handled_data_dict_list = [] for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) for data_item in data: keys = list(data_item.keys()) fund_name = data_item.get("fund_name", "") share_name = data_item.get("share_name", "") raw_name = self.get_raw_name(fund_name, share_name) dp_keys = [key for key in keys if key not in ["fund_name", "share_name"]] # sort the keys dp_keys.sort() additional_dp_keys = [dp_key for dp_key in dp_keys if dp_key not in ["management_fee", "management_fee_and_costs"]] if len(additional_dp_keys) == 0: continue for c_data_dict in data_list: if c_data_dict in handled_data_dict_list: continue if c_data_dict == data_dict: continue c_extract_data = c_data_dict.get("extract_data", {}) c_data = c_extract_data.get("data", []) remove_c_items = [] for c_data_item in c_data: c_keys = list(c_data_item.keys()) c_fund_name = c_data_item.get("fund_name", "") c_share_name = c_data_item.get("share_name", "") c_raw_name = self.get_raw_name(c_fund_name, c_share_name) if raw_name != c_raw_name: continue c_dp_keys = [key for key in c_keys if key not in ["fund_name", "share_name"]] c_dp_keys.sort() if dp_keys == c_dp_keys: remove_c_items.append(c_data_item) for remove_c_item in remove_c_items: if remove_c_item in c_data: c_data.remove(remove_c_item) handled_data_dict_list.append(data_dict) return data_list def get_raw_name(self, fund_name: str, share_name: str) -> str: raw_name = "" if len(fund_name) == 0: return raw_name if fund_name == share_name: raw_name = fund_name elif len(share_name) > 0 and share_name.startswith(fund_name): raw_name = share_name else: raw_name = f"{fund_name} {share_name}".strip() return raw_name def get_raw_name_dict(self, data_list: list) -> dict: raw_name_dict = {} for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) for data_item in data: fund_name = data_item.get("fund_name", "") share_name = data_item.get("share_name", "") raw_name = self.get_raw_name(fund_name, share_name) if len(raw_name) == 0: continue # if isinstance(self.document_production, str) and \ # raw_name.lower() in self.document_production.lower(): # continue if raw_name not in list(raw_name_dict.keys()): raw_name_dict[raw_name] = {"fund_name": fund_name, "share_name": share_name} return raw_name_dict def post_adjust_management_fee_costs(self, data_list: list): """ Adjust the management fee and management fee and costs Because maybe the management fee and costs disclose in the first pages, and the management fee disclose in the next pages. According to biz rule, if can't find management fee when found management fee and costs, the management fee should be same as management fee and costs. if can't find management fee and costs when found management fee, the management fee and costs should be same as management fee. This function is to adjust the management fee and management fee and costs according to this case. """ management_fee_costs_list = [] management_fee_list = [] complex_rule_keywords = "Recoverable expenses \nEstimated other indirect costs" for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) exist_complex_rule_keywords = False page_text = data_dict.get("page_text", "") if complex_rule_keywords in page_text: exist_complex_rule_keywords = True data = extract_data.get("data", []) for data_item in data: keys = list(data_item.keys()) fund_name = data_item.get("fund_name", "") share_name = data_item.get("share_name", "") if fund_name == "" or share_name == "": continue if "management_fee" in keys: management_fee = data_item.get("management_fee", -1) if management_fee != -1: found = False for mf in management_fee_list: mf_fund_name = mf.get("fund_name", "") mf_share_name = mf.get("share_name", "") if (mf_fund_name == fund_name and mf_share_name == share_name) or \ (len(mf_fund_name) > 0 and len(mf_share_name) > 0 and mf_fund_name == mf_share_name and (mf_share_name.endswith(share_name) or share_name.endswith(mf_share_name))): if exist_complex_rule_keywords and \ ("interposed_vehicle_performance_fee_cost" in keys or "recoverable_expenses" in keys): mfc["management_fee"] = management_fee found = True break else: mf_value = mf.get("management_fee", -1) if mf_value != -1 and mf_value >= management_fee: mf["management_fee"] = management_fee found = True break if not found: management_fee_list.append({"fund_name": fund_name, "share_name": share_name, "management_fee": management_fee}) if "management_fee_and_costs" in keys: management_fee_costs = data_item.get("management_fee_and_costs", -1) if management_fee_costs != -1: found = False for mfc in management_fee_costs_list: mfc_fund_name = mfc.get("fund_name", "") mfc_share_name = mfc.get("share_name", "") if (mfc_fund_name == fund_name and mfc_share_name == share_name) or \ (len(mfc_fund_name) > 0 and len(mfc_share_name) > 0 and mfc_fund_name == mfc_share_name and (mfc_share_name.endswith(share_name) or share_name.endswith(mfc_share_name))): if exist_complex_rule_keywords and \ ("interposed_vehicle_performance_fee_cost" in keys or "recoverable_expenses" in keys): mfc["management_fee_and_costs"] = management_fee_costs found = True break else: mfc_value = mfc.get("management_fee_and_costs", -1) if mfc_value != -1 and mfc_value <= management_fee_costs: mfc["management_fee_and_costs"] = management_fee_costs found = True break if not found: management_fee_costs_list.append({"fund_name": fund_name, "share_name": share_name, "management_fee_and_costs": management_fee_costs}) for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) for data_item in data: keys = list(data_item.keys()) fund_name = data_item.get("fund_name", "") share_name = data_item.get("share_name", "") if fund_name == "" or share_name == "": continue if "management_fee" in keys: for mf in management_fee_list: if mf.get("fund_name", "") == fund_name and mf.get("share_name", "") == share_name: data_item["management_fee"] = mf.get("management_fee", -1) break if "management_fee_and_costs" in keys: for mfc in management_fee_costs_list: if mfc.get("fund_name", "") == fund_name and mfc.get("share_name", "") == share_name: data_item["management_fee_and_costs"] = mfc.get("management_fee_and_costs", -1) break return data_list def supplement_minimum_initial_investment(self, data_list: list): """ Minimum initial investment should be same as from every fund/ share class in the same document. This function is to supplement the minimum initial investment to each fund/ share class in the same document. """ exist_minimum_initial_investment = False minimum_initial_investment = -1 mii_dict = None for data_dict in data_list: extract_data = data_dict.get("extract_data", {}) data = extract_data.get("data", []) remove_items = [] for data_item in data: keys = list(data_item.keys()) if "minimum_initial_investment" in keys: exist_minimum_initial_investment = True if minimum_initial_investment == -1: minimum_initial_investment = data_item.get("minimum_initial_investment", -1) if mii_dict is None: mii_dict = data_dict remove_items.append(data_item) for data_item in remove_items: try: data_dict["extract_data"]["data"].remove(data_item) except: pass if exist_minimum_initial_investment and minimum_initial_investment != -1: # get all of funds in data_list fund_name_list = [] for iter_data_dict in data_list: extract_data = iter_data_dict.get("extract_data", {}) data = extract_data.get("data", []) for data_item in data: keys = list(data_item.keys()) if "fund_name" in keys: fund_name = data_item.get("fund_name", "") if len(fund_name) > 0 and fund_name not in fund_name_list: fund_name_list.append(fund_name) # rewrite mii_dict, set each fund name with same minimum_initial_investment value new_mii_data_list = [] for fund_name in fund_name_list: new_data_dict = {"fund_name": fund_name, "minimum_initial_investment": minimum_initial_investment} new_mii_data_list.append(new_data_dict) mii_dict["extract_data"]["data"].extend(new_mii_data_list) return data_list def extract_data_by_text(self) -> dict: """ keys are doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ data_list = [] pdf_page_count = len(self.page_text_dict.keys()) handled_page_num_list = [] previous_page_num = -1 previous_page_datapoints = [] previous_page_fund_name = None for page_num, page_text in self.page_text_dict.items(): # if page_num not in [74]: # continue if page_num in handled_page_num_list: continue page_datapoints = self.get_datapoints_by_page_num(page_num) if len(page_datapoints) == 0: continue if previous_page_num == page_num - 1 and \ previous_page_datapoints == page_datapoints and \ previous_page_fund_name is not None: # Transfer previous page fund name to be the pre-fix of page text # The purpose is to get fund name if the first records without fund name # example document: 431073795, page index 1727 to 1728 logger.info(f"Transfer previous page fund name: {previous_page_fund_name} to be the pre-fix of page text") page_text = f"\nThe last fund name of previous PDF page: {previous_page_fund_name}\n{page_text}" else: previous_page_fund_name = None page_text = replace_special_table_header(self.replace_table_header_config, page_text) extract_data = self.extract_data_by_page( page_num, page_text, page_datapoints, need_exclude=False, exclude_data=None, previous_page_last_fund=previous_page_fund_name ) data_list.append(extract_data) page_data_list = extract_data.get("extract_data", {}).get("data", []) if len(page_data_list) > 0: previous_page_num = page_num previous_page_fund_name = page_data_list[-1].get("fund_name", "") previous_page_datapoints = page_datapoints extract_way = extract_data.get("extract_way", "text") current_page_data_count = len(page_data_list) if current_page_data_count > 0: count = 1 # some pdf documents have multiple pages for the same data # and the next page may without table header with data point keywords. # the purpose is try to get data from the next page if extract_way == "text": current_text = page_text else: current_text = "" while True: try: next_page_num = page_num + count if next_page_num >= pdf_page_count: break if self.document_type == 1 and next_page_num in self.investment_objective_pages: break next_datapoints = page_datapoints if next_page_num in self.page_nums_with_datapoints: should_continue = False next_datapoints = self.get_datapoints_by_page_num(next_page_num) if len(next_datapoints) == 0: should_continue = True else: for next_datapoint in next_datapoints: if self.doc_source == "aus_prospectus": if next_datapoint in page_datapoints: should_continue = False break else: if next_datapoint not in page_datapoints: should_continue = True break if should_continue: next_datapoints.extend(page_datapoints) # remove duplicate datapoints next_datapoints = list(set(next_datapoints)) if not should_continue: break if extract_way == "text": next_page_text = self.page_text_dict.get(next_page_num, "") with_same_structure_table = self.is_next_page_with_same_structure_table( current_text, next_page_text ) if not with_same_structure_table: break next_page_text = replace_special_table_header(self.replace_table_header_config, next_page_text) target_text = current_text + next_page_text else: target_text = "" # try to get data by current page_datapoints logger.info(f"Try to get data from next page {next_page_num}") next_page_extract_data = self.extract_data_by_page( next_page_num, target_text, next_datapoints, need_exclude=True, exclude_data=page_data_list, previous_page_last_fund=previous_page_fund_name ) next_page_data_list = next_page_extract_data.get( "extract_data", {} ).get("data", []) # print(f"next_page_data_list: {next_page_num}") # print(next_page_data_list) if next_page_data_list is not None and len(next_page_data_list) > 0: for current_page_data in page_data_list: if current_page_data in next_page_data_list: next_page_data_list.remove(current_page_data) if len(next_page_data_list) == 0: break next_page_extract_data["extract_data"][ "data" ] = next_page_data_list data_list.append(next_page_extract_data) previous_page_num = next_page_num previous_page_fund_name = next_page_data_list[-1].get("fund_name", "") handled_page_num_list.append(next_page_num) exist_current_page_datapoint = False for next_page_data in next_page_data_list: for page_datapoint in page_datapoints: if page_datapoint in list(next_page_data.keys()): exist_current_page_datapoint = True break if exist_current_page_datapoint: break if not exist_current_page_datapoint: break else: data_list.append(next_page_extract_data) break count += 1 except Exception as e: logger.error(f"Error in extracting data from next page: {e}") break # self.output_data_to_file(data_list) return data_list def is_next_page_with_same_structure_table(self, current_page_text: str, next_page_text: str) -> bool: with_same_structure_table = False compare_table_structure_prompts_file = os.path.join(self.instruction_folder, "compare_table_structure_prompts.json") with open(compare_table_structure_prompts_file, "r", encoding="utf-8") as f: compare_table_structure_prompts = "\n".join(json.load(f).get("prompts", [])) if len(compare_table_structure_prompts) > 0: prompts = f"Context: \ncurrent page contents:\n{current_page_text}\nnext page contents:\n{next_page_text}\nInstructions:\n{compare_table_structure_prompts}\n" result, with_error = chat( prompt=prompts, response_format={"type": "json_object"}, max_tokens=100 ) response = result.get("response", "") if not with_error: try: data = json.loads(response) answer = data.get("answer", "No") if answer.lower() == "yes": with_same_structure_table = True except: pass return with_same_structure_table def extract_data_by_image(self) -> dict: """ keys are doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ data_list = [] pdf_page_count = len(self.page_text_dict.keys()) handled_page_num_list = [] for page_num, page_text in self.page_text_dict.items(): if page_num in handled_page_num_list: continue page_datapoints = self.get_datapoints_by_page_num(page_num) if len(page_datapoints) == 0: continue extract_data = self.extract_data_by_page_image(page_num=page_num, page_datapoints=page_datapoints) data_list.append(extract_data) page_data_list = extract_data.get("extract_data", {}).get("data", []) current_page_data_count = len(page_data_list) if current_page_data_count > 0: count = 1 while count < 3: try: next_page_num = page_num + count if next_page_num >= pdf_page_count: break next_datapoints = page_datapoints if next_page_num in self.page_nums_with_datapoints: should_continue = False next_datapoints = self.get_datapoints_by_page_num(next_page_num) if len(next_datapoints) == 0: should_continue = True else: for next_datapoint in next_datapoints: if next_datapoint not in page_datapoints: should_continue = True break next_datapoints.extend(page_datapoints) # remove duplicate datapoints next_datapoints = list(set(next_datapoints)) if not should_continue: break # try to get data by current page_datapoints next_page_extract_data = self.extract_data_by_page_image( page_num=next_page_num, page_datapoints=next_datapoints, need_extract_text=False ) next_page_data_list = next_page_extract_data.get( "extract_data", {} ).get("data", []) if next_page_data_list is not None and len(next_page_data_list) > 0: data_list.append(next_page_extract_data) handled_page_num_list.append(next_page_num) exist_current_page_datapoint = False for next_page_data in next_page_data_list: for page_datapoint in page_datapoints: if page_datapoint in list(next_page_data.keys()): exist_current_page_datapoint = True break if exist_current_page_datapoint: break if not exist_current_page_datapoint: break else: break count += 1 except Exception as e: logger.error(f"Error in extracting data from next page: {e}") break # self.output_data_to_file(data_list) return data_list def output_data_to_file(self, data_list: list) -> None: json_data_file = os.path.join( self.output_data_json_folder, f"{self.doc_id}.json" ) with open(json_data_file, "w", encoding="utf-8") as f: json.dump(data_list, f, ensure_ascii=False, indent=4) data_df = pd.DataFrame(data_list) data_df.reset_index(drop=True, inplace=True) excel_data_file = os.path.join( self.output_data_excel_folder, f"{self.doc_id}.xlsx" ) with pd.ExcelWriter(excel_data_file) as writer: data_df.to_excel(writer, sheet_name="extract_data", index=False) def extract_data_by_page( self, page_num: int, page_text: str, page_datapoints: list, need_exclude: bool = False, exclude_data: list = None, previous_page_last_fund: str = None) -> dict: # If can't find numberic value, e.g. 1.25 or 3,88 # apply Vision ChatGPT to extract data exist_data_point_value_text = False for datapoint in page_datapoints: if self.datapoint_type_config.get(datapoint, "") == "text": exist_data_point_value_text = True break exist_numeric_value = False special_code_all = [] page_text_line_count = 100 if not exist_data_point_value_text: special_code_regex = r"\x10|\x11|\x12|\x13|\x14|\x15|\x16|\x17|\x18|\x19|\x1a|\x1b|\x1c|\x1d|\x1e|\x1f" special_code_all = [code for code in re.findall(special_code_regex, page_text) if code != "\n"] page_text_line_count = len(page_text.split("\n")) numeric_regex = r"\d+(\.|\,)\d+" exist_numeric_value = (re.search(numeric_regex, page_text) is not None) if not exist_data_point_value_text and (not exist_numeric_value or page_text_line_count < 3 or len(special_code_all) > 100): logger.info(f"Can't find numberic value in page {page_num}, apply Vision ChatGPT to extract data") return self.extract_data_by_page_image( page_num=page_num, page_datapoints=page_datapoints, need_exclude=False, exclude_data=None, previous_page_last_fund=previous_page_last_fund, need_extract_text=False) else: return self.extract_data_by_page_text( page_num=page_num, page_text=page_text, page_datapoints=page_datapoints, need_exclude=need_exclude, exclude_data=exclude_data, previous_page_last_fund=previous_page_last_fund ) def extract_data_by_page_text( self, page_num: int, page_text: str, page_datapoints: list, need_exclude: bool = False, exclude_data: list = None, previous_page_last_fund: str = None, original_way: str = "text" ) -> dict: """ keys are doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ logger.info(f"Extracting data from page {page_num}") if self.document_type == 1: # pre_context = f"The document type is prospectus. \nThe fund names in this document are {', '.join(self.fund_name_list)}." # if pre_context in page_text: # page_text = page_text.replace(pre_context, "\n").strip() pre_context = "" if len(self.investment_objective_pages) > 0: # Get the page number of the most recent investment objective at the top of the current page. diff_pages = [page_num - investment_objective_page for investment_objective_page in self.investment_objective_pages if investment_objective_page <= page_num] if len(diff_pages) > 0 and diff_pages[-1] < 5: top_nearest_investment_objective_page = self.investment_objective_pages[len(diff_pages) - 1] top_nearest_investment_objective_text = self.page_text_dict.get(top_nearest_investment_objective_page, "") if top_nearest_investment_objective_text in page_text: page_text = page_text.replace(top_nearest_investment_objective_text, "").strip() pre_context = f"\nThe most recent investment objective page text which maybe with fund name is: \n{top_nearest_investment_objective_text}.\n" # If can't find previous investment objective text, add the fund names to be the pre-fix of page text page_text = f"{pre_context}\n{page_text}".strip() instructions = self.get_instructions_by_datapoints( page_text, page_datapoints, need_exclude, exclude_data, extract_way="text" ) result, with_error = chat( prompt=instructions, response_format={"type": "json_object"} ) response = result.get("response", "") if with_error: logger.error(f"Error in extracting tables from page") data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["page_text"] = page_text data_dict["instructions"] = instructions data_dict["raw_answer"] = response data_dict["extract_data"] = {"data": []} data_dict["extract_way"] = original_way data_dict["prompt_token"] = result.get("prompt_token", 0) data_dict["completion_token"] = result.get("completion_token", 0) data_dict["total_token"] = result.get("total_token", 0) return data_dict try: data = json.loads(response) except: try: # if occur error, perhaps the output length is over 4K tokens # split the context to two parts and try to get data from the two parts # Attention: after deploy ChatGPT4o 2024-08-16 version, the max token length is 16K, # need not to split the context. # data = self.chat_by_split_context( # page_text, page_datapoints, need_exclude, exclude_data # ) if len(data.get("data", [])) == 0: data = json_repair.loads(response) except: data = {"data": []} try: data = self.validate_data(extract_data_info=data, page_text=page_text, previous_page_last_fund=previous_page_last_fund) except: pass data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["page_text"] = page_text data_dict["instructions"] = instructions data_dict["raw_answer"] = response data_dict["extract_data"] = data data_dict["extract_way"] = original_way data_dict["prompt_token"] = result.get("prompt_token", 0) data_dict["completion_token"] = result.get("completion_token", 0) data_dict["total_token"] = result.get("total_token", 0) return data_dict def extract_data_by_page_image( self, page_num: int, page_datapoints: list, need_exclude: bool = False, exclude_data: list = None, previous_page_last_fund: str = None, need_extract_text: bool = False ) -> dict: """ keys are doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ if need_extract_text: logger.info(f"Extracting data from page {page_num} with extracting text as single step.") page_text = self.get_image_text(page_num) if page_text is None or len(page_text) == 0: data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["page_text"] = "" data_dict["instructions"] = "" data_dict["raw_answer"] = "" data_dict["extract_data"] = {"data": []} data_dict["extract_way"] = "image" data_dict["prompt_token"] = 0 data_dict["completion_token"] = 0 data_dict["total_token"] = 0 return data_dict else: if previous_page_last_fund is not None and len(previous_page_last_fund) > 0: logger.info(f"Transfer previous page fund name: {previous_page_last_fund} to be the pre-fix of page text") page_text = f"\nThe last fund name of previous PDF page: {previous_page_last_fund}\n{page_text}" return self.extract_data_by_page_text( page_num=page_num, page_text=page_text, page_datapoints=page_datapoints, need_exclude=need_exclude, exclude_data=exclude_data, previous_page_last_fund=previous_page_last_fund, original_way="image" ) else: logger.info(f"Extracting data from page {page_num} without extracting text as single step.") return self.extract_data_by_pure_image( page_num=page_num, page_datapoints=page_datapoints, need_exclude=need_exclude, exclude_data=exclude_data, previous_page_last_fund=previous_page_last_fund ) def extract_data_by_pure_image( self, page_num: int, page_datapoints: list, need_exclude: bool = False, exclude_data: list = None, previous_page_last_fund: str = None ) -> dict: """ keys are doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ image_base64 = self.get_pdf_image_base64(page_num) instructions = self.get_instructions_by_datapoints( previous_page_last_fund, page_datapoints, need_exclude=need_exclude, exclude_data=exclude_data, extract_way="image" ) result, with_error = chat( prompt=instructions, response_format={"type": "json_object"}, image_base64=image_base64 ) response = result.get("response", "") if with_error: logger.error(f"Error in extracting tables from page") data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["page_text"] = "" data_dict["instructions"] = instructions data_dict["raw_answer"] = response data_dict["extract_data"] = {"data": []} data_dict["extract_way"] = "image" data_dict["prompt_token"] = result.get("prompt_token", 0) data_dict["completion_token"] = result.get("completion_token", 0) data_dict["total_token"] = result.get("total_token", 0) return data_dict try: data = json.loads(response) except: try: data = json_repair.loads(response) except: data = {"data": []} try: data = self.validate_data(data, None, previous_page_last_fund) except: pass data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["page_text"] = "" data_dict["instructions"] = instructions data_dict["raw_answer"] = response data_dict["extract_data"] = data data_dict["extract_way"] = "image" data_dict["prompt_token"] = result.get("prompt_token", 0) data_dict["completion_token"] = result.get("completion_token", 0) data_dict["total_token"] = result.get("total_token", 0) return data_dict def get_image_text(self, page_num: int) -> str: image_base64 = self.get_pdf_image_base64(page_num) instructions = self.instructions_config.get("get_image_text", "\n") logger.info(f"Get text from image of page {page_num}") result, with_error = chat( prompt=instructions, response_format={"type": "json_object"}, image_base64=image_base64 ) response = result.get("response", "") text = "" if with_error: logger.error(f"Can't get text from current image") try: data = json.loads(response) except: try: data = json_repair.loads(response) except: pass text = data.get("text", "") # print(text) return text def validate_data(self, extract_data_info: dict, page_text: str, previous_page_last_fund: str=None) -> dict: """ Validate data by the rules 1. Each data should be with fund name 2. For share level data, it should be with share name """ data_list = extract_data_info.get("data", []) if len(data_list) == 0: return extract_data_info remove_list = [] performance_fee_regex = r"Amount\s+of\s+the\s+performance\s+fees|Performance\s+Fees\s+amounts|Performance\s+fees\s+amounts|Commissioni\s+di\s+performance|Performance\s+Fee\s+|Performance\s+fees\s+charged" nav_regex = r"based\s+on\s+(the\s+)?NAV|on\s+the\s+Share\s+Class\s+NAV|NAV\s+of\s+performance\s+fee|of\s+the\s+average\s+Net\s+Asset\s+Value|Attivi\s+in\s+gestione|Performance\s+Fee\s+of\s+NAV\s+in|share\s+class\s+dealing\s+NAV" if page_text is not None and len(page_text) > 0: performance_fee_search = re.search(performance_fee_regex, page_text) nav_search = re.search(nav_regex, page_text) else: performance_fee_search = None nav_search = None for data in data_list: if (data.get("performance_fee", None) is not None and performance_fee_search is not None and nav_search is not None): data.pop("performance_fee") keys = [key for key in list(data.keys()) if key not in ["fund name", "share name"]] if len(keys) == 0: remove_list.append(data) continue fund_name = data.get("fund name", "").strip() if fund_name == "": remove_list.append(data) continue # Clean fund name start if previous_page_last_fund is not None and len(previous_page_last_fund) > 0: previous_page_last_fund = previous_page_last_fund.strip() if fund_name.startswith(previous_page_last_fund) and fund_name != previous_page_last_fund: modified_fund_name = fund_name.replace(previous_page_last_fund, "").strip() if len(modified_fund_name.split()) > 1: fund_name = modified_fund_name fund_name = self.get_fund_name(fund_name, "Fund") fund_name = self.get_fund_name(fund_name, "Bond") remove_prefix_list = ["Market Specific Equity Sub-Funds", "International and Regional Equity Sub-Funds", "Equity Sub-Funds"] for remove_item in remove_prefix_list: if fund_name.startswith(remove_item): fund_name = fund_name.replace(remove_item, "").strip() data["fund name"] = fund_name # Clean fund name end keys = list(data.keys()) for key in keys: if self.datapoint_level_config.get(key, "") == "share_level": if data.get("share name", "") == "": include_key_words = False if key == "ter" and page_text is not None and len(page_text) > 0: ter_regex = r"TER\s+in\s+\%|TER\s*\%" ter_search = re.search(ter_regex, page_text) if ter_search is not None: include_key_words = True if not include_key_words: is_share_name = self.check_fund_name_as_share(fund_name) if not is_share_name: remove_list.append(data) break data["share name"] = fund_name if data.get(key, "") == "": data.pop(key) for remove_data in remove_list: if remove_data in data_list: data_list.remove(remove_data) # check performance_fee for data in data_list: performance_fee = data.get("performance_fee", None) if performance_fee is not None: try: performance_fee = float(performance_fee) if (performance_fee > 3 and performance_fee % 2.5 == 0) or \ performance_fee > 10: data.pop("performance_fee") except: data.pop("performance_fee") remove_list = [] for data in data_list: keys = [key for key in list(data.keys()) if key not in ["fund name", "share name"]] if len(keys) == 0: remove_list.append(data) for remove_data in remove_list: if remove_data in data_list: data_list.remove(remove_data) # update "fund name" to be "fund_name" # update "share name" to be "share_name" new_data_list = [] multi_over_3_share_regex = r"([A-Z]{1,}\,\s){3,}" exist_multi_over_3_share = False for data in data_list: fund_name = data.get("fund name", "").strip() if len(fund_name) == 0: continue raw_share_name = data.get("share name", "") if not exist_multi_over_3_share: multi_over_3_share_search = re.search(multi_over_3_share_regex, raw_share_name) if multi_over_3_share_search is not None: exist_multi_over_3_share = True if exist_multi_over_3_share: share_name_list = self.split_multi_share_name(raw_share_name) else: share_name_list = [raw_share_name] if len(share_name_list) > 0: for share_name in share_name_list: new_data = {} new_data["fund_name"] = fund_name if share_name != "": new_data["share_name"] = share_name ter = data.get("ter", None) if ter is not None: new_data["ter"] = ter performance_fee = data.get("performance fees", None) if performance_fee is not None: new_data["performance_fee"] = performance_fee for key, value in data.items(): if key not in ["fund name", "share name", "ter", "performance fees"]: new_data[key] = value new_data_list.append(new_data) extract_data_info["data"] = new_data_list return extract_data_info def split_multi_share_name(self, raw_share_name: str) -> list: """ Some document, e.g. 481482392 Exist multi share name as table header, e.g. "Class A, B, E, M, N, P, R, U" For this case, need split the share name to be ["Class A", "Class B", "Class E", "Class M", "Class N", "Class P", "Class R", "Class U"] """ multi_over_2_share_regex = r"([A-Z]{1,}\,\s){2,}" multi_over_2_share_search = re.search(multi_over_2_share_regex, raw_share_name) share_name_list = [raw_share_name] if multi_over_2_share_search is not None: multi_share_splits = [share_name.strip() for share_name in raw_share_name.split(",") if len(share_name.strip()) > 0] first_share_name = multi_share_splits[0] first_share_name_split = first_share_name.split() share_name_prefix = None if len(first_share_name_split) == 2: share_name_prefix = first_share_name_split[0] if share_name_prefix is not None and len(share_name_prefix) > 0: new_share_name_list = [] for split in multi_share_splits: if split == first_share_name: new_share_name_list.append(split) else: new_share_name_list.append(f"{share_name_prefix} {split}") share_name_list = new_share_name_list else: share_name_list = multi_share_splits else: share_name_list = multi_share_splits return share_name_list def get_fund_name(self, fund_name: str, fund_feature: str): if not fund_name.endswith(fund_feature): return fund_name # to avoid split funds to fund s fund_feature = fund_feature + " " fund_name_split = fund_name.split(fund_feature) if len(fund_name_split) > 1: last_fund = fund_name_split[-1].strip() if len(last_fund) == 0: last_fund = fund_name_split[-2].strip() fund_name = f"{last_fund} {fund_feature}" return fund_name def check_fund_name_as_share(self, fund_name: str) -> bool: """ Check if the fund name is the same as share name """ if len(fund_name) == 0 == 0: return False share_name_list = self.document_mapping_info_df["ShareClassName"].unique().tolist() if len(share_name_list) == 0: return False max_similarity_name, max_similarity = get_most_similar_name( text=fund_name, name_list=share_name_list, share_name=None, fund_name=None, matching_type="share", process_cache=None) if max_similarity >= 0.8: return True return False def get_datapoints_by_page_num(self, page_num: int) -> list: datapoints = [] for datapoint in self.datapoints: if page_num in self.datapoint_page_info.get(datapoint, []): datapoints.append(datapoint) return datapoints def get_instructions_by_datapoints( self, page_text: str, datapoints: list, need_exclude: bool = False, exclude_data: list = None, extract_way: str = "text", ) -> str: """ Get instructions to extract data from the page by the datapoints Below is the instructions sections: summary: string reported_name by datapoints: dict data_business_features: dict common: list investment_level by datapoints: dict data_value_range by datapoints: dict special_rule by datapoints: dict special_cases: dict common: list title contents special_case by datapoints: list title contents output_requirement common: list fund_level: list share_level: dict fund_name: list share_name: list ogc_value: list ter_value: list performance_fee_value: list end """ instructions = [] if extract_way == "text": instructions = [f"Context:\n{page_text}\n\nInstructions:\n"] datapoint_name_list = [] for datapoint in datapoints: datapoint_name = self.datapoint_name_config.get(datapoint, "") datapoint_name_list.append(datapoint_name) if extract_way == "text": summary = self.instructions_config.get("summary", "\n") elif extract_way == "image": summary = self.instructions_config.get("summary_image", "\n") if page_text is not None and len(page_text) > 0: logger.info(f"Transfer previous page fund name: {page_text} to be the pre-fix of page text") summary += f"\nThe last fund name of previous PDF page: {page_text}\n" else: summary = self.instructions_config.get("summary", "\n") instructions.append(summary.format(", ".join(datapoint_name_list))) instructions.append("\n") if extract_way == "image": image_features = self.instructions_config.get("image_features", []) instructions.extend(image_features) instructions.append("\n") instructions.append("Datapoints Reported name:\n") instructions.append("Please look for relevant reported names and similar variations in the context.\n") reported_name_info_in_instructions = self.instructions_config.get("reported_name", {}) for datapoint in datapoints: reported_name_list = self.datapoint_reported_name_config.get(datapoint, []) if len(reported_name_list) == 0: reported_name = reported_name_info_in_instructions.get(datapoint, "") else: joined_reported_name = ", ".join(reported_name_list) datapoint_name = datapoint if datapoint_name == "performance_fee": datapoint_name = "performance fees" else: datapoint_name = self.datapoint_name_config.get(datapoint_name, "") if len(datapoint_name) == 0: datapoint_name = datapoint.upper() reported_name = f"The {datapoint_name} reported name could be:\n{joined_reported_name}" instructions.append(reported_name) instructions.append("\n") instructions.append("\n") if self.language != "english": """ "multilingual_reported_name": { "describe": "Please be careful to extract relevant data by different reported names from multilingual Context.", "regular_example_template": "{datapoint} Example {number}:\nLanguage: {language}\n---Context Start-----\n{fund_name}\n{share_name}\n{reported_name}\n{value}\n---Context End-----\nAnswer: {answer}", "special_example_template_none": "{datapoint} Example {number}:\nLanguage: {language}\nValue is belong to \"-, *, **, N/A, N/A%, N/A %, NONE\", ignore it\n---Context Start-----\n{fund_name}\n{share_name}\n{reported_name} 2)\n-\n---Context End-----\nAnswer: {answer}", "value_examples": ["1,98", "3.25", "2.16", "1,73", "4,53"] "fund_example": "Fund 1", "share_example": "Share 1" } """ multilingual_reported_name_config = self.instructions_config.get("multilingual_reported_name", {}) describe = multilingual_reported_name_config.get("describe", "") regular_example_template = multilingual_reported_name_config.get("regular_example_template", "") special_example_template_none = multilingual_reported_name_config.get("special_example_template_none", "") value_examples = multilingual_reported_name_config.get("value_examples", []) fund_example = multilingual_reported_name_config.get("fund_example", "") share_example = multilingual_reported_name_config.get("share_example", "") instructions.append("Multilingual reported name:\n") instructions.append(f"{describe}\n") # set language the first char to be upper language = self.language[0].upper() + self.language[1:] for datapoint in datapoints: mul_reported_name_list = self.non_english_reported_name_config.get(datapoint, []) # shuffle the reported name list mul_reported_name_list = list(set(mul_reported_name_list)) if len(mul_reported_name_list) == 0: continue datapoint_name = datapoint if datapoint_name == "performance_fee": datapoint_name = "performance fees" else: datapoint_name = datapoint_name.upper() example_count = 1 none_value_example_count = 0 for mul_reported_name in mul_reported_name_list: if datapoint in ["ter", "performance_fee"] and example_count >= 3: break value = value_examples[example_count % len(value_examples)] answer = {"fund name": fund_example, "share name": share_example, datapoint: float(value.replace(",", "."))} # transfer answer to string answer = json.dumps(answer, ensure_ascii=False) example = regular_example_template.format( datapoint=datapoint_name, number=example_count, language=language, fund_name=fund_example, share_name=share_example, reported_name=mul_reported_name, value=value, answer=answer, ) instructions.append(example) instructions.append("\n") instructions.append("\n") example_count += 1 if len(mul_reported_name.split()) > 1: if none_value_example_count != 2: none_value_example = special_example_template_none.format( datapoint=datapoint_name, number=example_count, language=language, fund_name=fund_example, share_name=share_example, reported_name=mul_reported_name, answer = json.dumps({}, ensure_ascii=False) ) instructions.append(none_value_example) instructions.append("\n") instructions.append("\n") example_count += 1 none_value_example_count += 1 instructions.append("\n") instructions.append("Data business features:\n") data_business_features = self.instructions_config.get( "data_business_features", {} ) common = "\n".join(data_business_features.get("common", [])) instructions.append(common) instructions.append("\n") instructions.append("Datapoints investment level:\n") investment_level_info = data_business_features.get("investment_level", {}) for datapoint in datapoints: investment_level = investment_level_info.get(datapoint, "") instructions.append(investment_level) instructions.append("\n") instructions.append("\n") instructions.append("Datapoints value range:\n") data_value_range_info = data_business_features.get("data_value_range", {}) for datapoint in datapoints: data_value_range = data_value_range_info.get(datapoint, "") instructions.append(data_value_range) instructions.append("\n") instructions.append("\n") special_rule_info = data_business_features.get("special_rule", {}) # The reason why apply special_rule_by_keywords is: # 1. The special rule is very complex, prompsts are very long. # 2. To load it by keywords, is to avoid for simple case, the prompts are too long. complex_special_rule = data_business_features.get("sepcial_rule_by_keywords", "") with_special_rule_title = False for datapoint in datapoints: find_complex_special_rule = False if page_text is not None and len(page_text) > 0: complex_special_rule_list = complex_special_rule.get(datapoint, []) for complex_special_rule in complex_special_rule_list: complex_keywords = complex_special_rule.get("keywords", []) if len(complex_keywords) == 0: continue exist_keywords = False for special_keywords in complex_keywords: special_keywrods_regex = add_slash_to_text_as_regex(special_keywords) if special_keywords in page_text or \ re.search(special_keywrods_regex, page_text) is not None: exist_keywords = True break if exist_keywords: complex_prompts_list = complex_special_rule.get("prompts", []) if len(complex_prompts_list) > 0: if not with_special_rule_title: instructions.append("Special rule:\n") with_special_rule_title = True complex_prompts = "\n".join(complex_prompts_list) instructions.append(complex_prompts) instructions.append("\n\n") find_complex_special_rule = True if find_complex_special_rule: continue special_rule_list = special_rule_info.get(datapoint, []) if len(special_rule_list) > 0: if not with_special_rule_title: instructions.append("Special rule:\n") with_special_rule_title = True special_rule = "\n".join(special_rule_list) instructions.append(special_rule) instructions.append("\n\n") instructions.append("\n") instructions.append("Special cases:\n") special_cases = self.instructions_config.get("special_cases", {}) special_cases_common_list = special_cases.get("common", []) special_cases_number = 1 for special_cases_common in special_cases_common_list: title = special_cases_common.get("title", "") title = f"{special_cases_number}. {title} " special_cases_number += 1 instructions.append(title) instructions.append("\n") contents_list = special_cases_common.get("contents", []) contents = "\n".join(contents_list) instructions.append(contents) instructions.append("\n\n") for datapoint in datapoints: special_case_list = special_cases.get(datapoint, []) for special_case in special_case_list: title = special_case.get("title", "") title = f"{special_cases_number}. {title} " special_cases_number += 1 instructions.append(title) instructions.append("\n") contents_list = special_case.get("contents", []) contents = "\n".join(contents_list) instructions.append(contents) instructions.append("\n") instructions.append("\n") instructions.append("Output requirement:\n") output_requirement = self.instructions_config.get("output_requirement", {}) output_requirement_common_list = output_requirement.get("common", []) instructions.append("\n".join(output_requirement_common_list)) instructions.append("\n") fund_datapoint_value_example = {} fund_level_config = output_requirement.get("fund_level", {}) share_datapoint_value_example = {} share_level_config = output_requirement.get("share_level", {}) example_list = [] dp_reported_name_config = output_requirement.get("dp_reported_name", {}) dp_reported_name = {} for datapoint in datapoints: investment_level = self.datapoint_level_config.get(datapoint, "") if investment_level == "fund_level": # fund_level_example_list = output_requirement.get("fund_level", []) # for example in fund_level_example_list: # try: # sub_example_list = json.loads(example) # except: # sub_example_list = json_repair.loads(example) # example_list.extend(sub_example_list) fund_datapoint_value_example[datapoint] = fund_level_config.get( f"{datapoint}_value", [] ) elif investment_level == "share_level": share_datapoint_value_example[datapoint] = share_level_config.get( f"{datapoint}_value", [] ) dp_reported_name[datapoint] = dp_reported_name_config.get(datapoint, "") instructions.append(f"Example:\n") fund_datapoint_list = list(fund_datapoint_value_example.keys()) if len(fund_datapoint_list) > 0: fund_name_example_list = fund_level_config.get("fund_name", []) for index in range(len(fund_name_example_list)): example_dict = { "fund name": fund_name_example_list[index], } for fund_datapoint in fund_datapoint_list: fund_datapoint_values = fund_datapoint_value_example[fund_datapoint] if index < len(fund_datapoint_values): example_dict[fund_datapoint] = fund_datapoint_values[index] example_list.append(example_dict) share_datapoint_list = list(share_datapoint_value_example.keys()) if len(share_datapoint_list) > 0: fund_name_example_list = share_level_config.get("fund_name", []) share_name_example_list = share_level_config.get("share_name", []) for index in range(len(fund_name_example_list)): example_dict = { "fund name": fund_name_example_list[index], "share name": share_name_example_list[index], } for share_datapoint in share_datapoint_list: share_datapoint_values = share_datapoint_value_example[share_datapoint] if index < len(share_datapoint_values): example_dict[share_datapoint] = share_datapoint_values[index] example_list.append(example_dict) example_data = {"data": example_list, "dp_reported_name": dp_reported_name} instructions.append(json.dumps(example_data, ensure_ascii=False, indent=4)) instructions.append("\n") instructions.append("\n") end_list = self.instructions_config.get("end", []) instructions.append("\n".join(end_list)) instructions.append("\n") if need_exclude and exclude_data is not None and isinstance(exclude_data, list): instructions.append("Please exclude below data from output:\n") instructions.append(json.dumps(exclude_data, ensure_ascii=False, indent=4)) instructions.append("\n") instructions.append("\n") instructions.append("Answer:\n") instructions_text = "".join(instructions) return instructions_text # def chat_by_split_context(self, # page_text: str, # page_datapoints: list, # need_exclude: bool, # exclude_data: list) -> list: # """ # If occur error, split the context to two parts and try to get data from the two parts # Relevant document: 503194284, page index 147 # """ # try: # logger.info(f"Split context to get data to fix issue which output length is over 4K tokens") # split_context = re.split(r"\n", page_text) # split_context = [text.strip() for text in split_context # if len(text.strip()) > 0] # if len(split_context) < 10: # return {"data": []} # split_context_len = len(split_context) # top_10_context = split_context[:10] # rest_context = split_context[10:] # header = "\n".join(top_10_context) # half_len = split_context_len // 2 # # the member of half_len should not start with number # # reverse iterate the list by half_len # half_len_list = [i for i in range(half_len)] # fund_name_line = "" # half_line = rest_context[half_len].strip() # max_similarity_fund_name, max_similarity = get_most_similar_name( # half_line, self.provider_fund_name_list, matching_type="fund" # ) # if max_similarity < 0.2: # # get the fund name line text from the first half # for index in reversed(half_len_list): # line_text = rest_context[index].strip() # if len(line_text) == 0: # continue # line_text_split = line_text.split() # if len(line_text_split) < 3: # continue # first_word = line_text_split[0] # if first_word.lower() == "class": # continue # max_similarity_fund_name, max_similarity = get_most_similar_name( # line_text, self.provider_fund_name_list, matching_type="fund" # ) # if max_similarity >= 0.2: # fund_name_line = line_text # break # else: # fund_name_line = half_line # half_len += 1 # if fund_name_line == "": # return {"data": []} # logger.info(f"Split first part from 0 to {half_len}") # split_first_part = "\n".join(split_context[:half_len]) # first_part = '\n'.join(split_first_part) # first_instructions = self.get_instructions_by_datapoints( # first_part, page_datapoints, need_exclude, exclude_data, extract_way="text" # ) # response, with_error = chat( # first_instructions, response_format={"type": "json_object"} # ) # first_part_data = {"data": []} # if not with_error: # try: # first_part_data = json.loads(response) # except: # first_part_data = json_repair.loads(response) # logger.info(f"Split second part from {half_len} to {split_context_len}") # split_second_part = "\n".join(split_context[half_len:]) # second_part = header + "\n" + fund_name_line + "\n" + split_second_part # second_instructions = self.get_instructions_by_datapoints( # second_part, page_datapoints, need_exclude, exclude_data, extract_way="text" # ) # response, with_error = chat( # second_instructions, response_format={"type": "json_object"} # ) # second_part_data = {"data": []} # if not with_error: # try: # second_part_data = json.loads(response) # except: # second_part_data = json_repair.loads(response) # first_part_data_list = first_part_data.get("data", []) # logger.info(f"First part data count: {len(first_part_data_list)}") # second_part_data_list = second_part_data.get("data", []) # logger.info(f"Second part data count: {len(second_part_data_list)}") # for first_data in first_part_data_list: # if first_data in second_part_data_list: # second_part_data_list.remove(first_data) # else: # # if the first part data is with same fund name and share name, # # remove the second part data # first_data_dp = [key for key in list(first_data.keys()) # if key not in ["fund name", "share name"]] # # order the data points # first_data_dp.sort() # first_fund_name = first_data.get("fund name", "") # first_share_name = first_data.get("share name", "") # if len(first_fund_name) > 0 and len(first_share_name) > 0: # remove_second_list = [] # for second_data in second_part_data_list: # second_fund_name = second_data.get("fund name", "") # second_share_name = second_data.get("share name", "") # if first_fund_name == second_fund_name and \ # first_share_name == second_share_name: # second_data_dp = [key for key in list(second_data.keys()) # if key not in ["fund name", "share name"]] # second_data_dp.sort() # if first_data_dp == second_data_dp: # remove_second_list.append(second_data) # for remove_second in remove_second_list: # if remove_second in second_part_data_list: # second_part_data_list.remove(remove_second) # data_list = first_part_data_list + second_part_data_list # extract_data = {"data": data_list} # return extract_data # except Exception as e: # logger.error(f"Error in split context: {e}") # return {"data": []}