import os import json import pandas as pd from utils.biz_utils import get_most_similar_name, remove_common_word from utils.sql_query_util import ( query_document_fund_mapping, query_investment_by_provider, ) from utils.logger import logger class DataMapping: def __init__( self, doc_id, datapoints: list, raw_document_data_list: list, document_mapping_info_df: pd.DataFrame, output_data_folder: str, doc_source: str = "emea_ar" ): self.doc_id = doc_id self.datapoints = datapoints self.doc_source = doc_source self.raw_document_data_list = raw_document_data_list if document_mapping_info_df is None or len(document_mapping_info_df) == 0: self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False) else: self.document_mapping_info_df = document_mapping_info_df if output_data_folder is None or len(output_data_folder) == 0: output_data_folder = r"/data/emea_ar/output/mapping_data/docs/" os.makedirs(output_data_folder, exist_ok=True) self.output_data_json_folder = os.path.join(output_data_folder, "json/") os.makedirs(self.output_data_json_folder, exist_ok=True) self.output_data_excel_folder = os.path.join(output_data_folder, "excel/") os.makedirs(self.output_data_excel_folder, exist_ok=True) self.set_mapping_data_by_db(self.document_mapping_info_df) def set_mapping_data_by_db(self, document_mapping_info_df: pd.DataFrame): logger.info("Setting document mapping data") if document_mapping_info_df is None or len(document_mapping_info_df) == 0: self.document_mapping_info_df = query_document_fund_mapping(self.doc_id, rerun=False) else: self.document_mapping_info_df = document_mapping_info_df if len(self.document_mapping_info_df) == 0: self.doc_fund_name_list = [] self.doc_share_name_list = [] self.doc_fund_mapping = pd.DataFrame() self.doc_fund_class_mapping = pd.DataFrame() else: self.doc_fund_name_list = ( self.document_mapping_info_df["FundName"].unique().tolist() ) self.doc_share_name_list = ( self.document_mapping_info_df["ShareClassName"].unique().tolist() ) self.doc_fund_mapping = self.document_mapping_info_df[ ["FundId", "FundName"] ].drop_duplicates() self.doc_fund_class_mapping = self.document_mapping_info_df[ ["FundId", "SecId", "ShareClassName", "CurrencyId"] ].drop_duplicates() logger.info("Setting provider mapping data") self.provider_mapping_df = self.get_provider_mapping() if len(self.provider_mapping_df) == 0: self.provider_fund_name_list = [] self.provider_share_name_list = [] self.provider_fund_mapping = pd.DataFrame() self.provider_fund_class_mapping = pd.DataFrame() else: self.provider_fund_name_list = ( self.provider_mapping_df["FundName"].unique().tolist() ) self.provider_share_name_list = ( self.provider_mapping_df["ShareClassName"].unique().tolist() ) self.provider_fund_mapping = self.provider_mapping_df[ ["FundId", "FundName"] ].drop_duplicates() self.provider_fund_class_mapping = self.provider_mapping_df[ ["FundId", "SecId", "ShareClassName", "CurrencyId"] ].drop_duplicates() def get_provider_mapping(self): if len(self.document_mapping_info_df) == 0: return pd.DataFrame() provider_id_list = ( self.document_mapping_info_df["ProviderId"].unique().tolist() ) provider_mapping_list = [] for provider_id in provider_id_list: provider_mapping_list.append(query_investment_by_provider(provider_id, rerun=False)) provider_mapping_df = pd.concat(provider_mapping_list) provider_mapping_df = provider_mapping_df.drop_duplicates() provider_mapping_df.reset_index(drop=True, inplace=True) return provider_mapping_df def mapping_raw_data(self): """ doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ logger.info(f"Mapping raw data for document {self.doc_id}") mapped_data_list = [] mapped_fund_cache = {} mapped_share_cache = {} process_cache = {} for page_data in self.raw_document_data_list: doc_id = page_data.get("doc_id", "") page_index = page_data.get("page_index", "") raw_data_list = page_data.get("extract_data", {}).get("data", []) for raw_data in raw_data_list: raw_fund_name = raw_data.get("fund_name", "") if raw_fund_name is None or len(raw_fund_name) == 0: continue raw_share_name = raw_data.get("share_name", "") if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0: if len(raw_share_name) > 0: integrated_share_name = self.integrate_share_name(raw_fund_name, raw_share_name) raw_data_keys = list(raw_data.keys()) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": integrated_share_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": 1, "investment_id": "", "investment_name": "", "similarity": 0 } mapped_data_list.append(mapped_data) else: raw_data_keys = list(raw_data.keys()) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": "", "raw_name": raw_fund_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": 33, "investment_id": "", "investment_name": "" } mapped_data_list.append(mapped_data) else: raw_name = "" if raw_share_name is not None and len(raw_share_name) > 0: raw_name = self.integrate_share_name(raw_fund_name, raw_share_name) if mapped_share_cache.get(raw_name) is not None: investment_info = mapped_share_cache[raw_name] else: if mapped_fund_cache.get(raw_fund_name) is not None: fund_info = mapped_fund_cache[raw_fund_name] fund_id = fund_info["id"] else: fund_info = self.matching_with_database( raw_name=raw_fund_name, matching_type="fund" ) fund_id = fund_info["id"] mapped_fund_cache[raw_fund_name] = fund_info investment_info = self.matching_with_database( raw_name=raw_name, raw_share_name=raw_share_name, raw_fund_name=raw_fund_name, parent_id=fund_id, matching_type="share", process_cache=process_cache ) mapped_share_cache[raw_name] = investment_info elif raw_fund_name is not None and len(raw_fund_name) > 0: raw_name = raw_fund_name if mapped_fund_cache.get(raw_fund_name) is not None: investment_info = mapped_fund_cache[raw_fund_name] else: investment_info = self.matching_with_database( raw_name=raw_fund_name, matching_type="fund" ) mapped_fund_cache[raw_fund_name] = investment_info else: raw_name = "" investment_info = { "id": "", "legal_name": "", "investment_type": -1, "similarity": 0 } raw_data_keys = list(raw_data.keys()) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": raw_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": investment_info["investment_type"], "investment_id": investment_info["id"], "investment_name": investment_info["legal_name"], "similarity": investment_info["similarity"] } mapped_data_list.append(mapped_data) json_data_file = os.path.join( self.output_data_json_folder, f"{self.doc_id}.json" ) with open(json_data_file, "w", encoding="utf-8") as f: json.dump(mapped_data_list, f, ensure_ascii=False, indent=4) extract_data_df = pd.DataFrame(self.raw_document_data_list) extract_data_df.reset_index(drop=True, inplace=True) mapping_data_df = pd.DataFrame(mapped_data_list) mapping_data_df.reset_index(drop=True, inplace=True) excel_data_file = os.path.join( self.output_data_excel_folder, f"{self.doc_id}.xlsx" ) try: with pd.ExcelWriter(excel_data_file) as writer: mapping_data_df.to_excel(writer, sheet_name="mapping_data", index=False) extract_data_df.to_excel(writer, sheet_name="extract_data", index=False) except Exception as e: logger.error(f"Failed to save excel file: {e}") return mapped_data_list def integrate_share_name(self, raw_fund_name: str, raw_share_name: str): raw_name = "" if raw_share_name is not None and len(raw_share_name) > 0: raw_name = raw_share_name # some share names are very short, # so we need to combine with fund name raw_name_splits = raw_name.split() raw_fund_name_splits = raw_fund_name.split() for split in raw_name_splits: if split not in raw_fund_name_splits: raw_fund_name_splits.append(split) raw_name = " ".join(raw_fund_name_splits) return raw_name def matching_with_database( self, raw_name: str, raw_share_name: str = None, raw_fund_name: str = None, parent_id: str = None, matching_type: str = "fund", process_cache: dict = {} ): if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0: data_info["id"] = "" data_info["legal_name"] = "" if matching_type == "fund": investment_type = 33 else: investment_type = 1 data_info["investment_type"] = investment_type data_info["similarity"] = 0 return data_info if matching_type == "fund": doc_compare_name_list = self.doc_fund_name_list doc_compare_mapping = self.doc_fund_mapping provider_compare_name_list = self.provider_fund_name_list provider_compare_mapping = self.provider_fund_mapping compare_name_dp = "FundName" compare_id_dp = "FundId" investment_type = 33 else: if parent_id is not None and len(parent_id) > 0: # filter self.doc_fund_class_mapping by parent_id as FundId doc_compare_mapping = self.doc_fund_class_mapping[ self.doc_fund_class_mapping["FundId"] == parent_id ] provider_compare_mapping = self.provider_fund_class_mapping\ [self.provider_fund_class_mapping["FundId"] == parent_id] if len(doc_compare_mapping) == 0: if len(provider_compare_mapping) == 0: doc_compare_name_list = self.doc_share_name_list doc_compare_mapping = self.doc_fund_class_mapping provider_compare_name_list = self.provider_share_name_list provider_compare_mapping = self.provider_fund_class_mapping else: provider_compare_name_list = ( provider_compare_mapping["ShareClassName"].unique().tolist() ) doc_compare_name_list = [] doc_compare_mapping = pd.DataFrame() else: doc_compare_name_list = ( doc_compare_mapping["ShareClassName"].unique().tolist() ) if len(provider_compare_mapping) == 0 or \ len(provider_compare_mapping) < len(doc_compare_mapping): provider_compare_name_list = doc_compare_name_list provider_compare_mapping = doc_compare_mapping else: provider_compare_name_list = ( provider_compare_mapping["ShareClassName"].unique().tolist() ) else: doc_compare_name_list = self.doc_share_name_list doc_compare_mapping = self.doc_fund_class_mapping provider_compare_name_list = self.provider_share_name_list provider_compare_mapping = self.provider_fund_class_mapping compare_name_dp = "ShareClassName" compare_id_dp = "SecId" investment_type = 1 data_info = {"name": raw_name} if len(provider_compare_name_list) > 0: pre_common_word_list = [] if doc_compare_name_list is not None and len(doc_compare_name_list) > 0: _, pre_common_word_list = remove_common_word(doc_compare_name_list) max_similarity_name, max_similarity = get_most_similar_name( raw_name, doc_compare_name_list, share_name=raw_share_name, fund_name=raw_fund_name, matching_type=matching_type, process_cache=process_cache) if matching_type == "fund": threshold = 0.7 else: threshold = 0.9 if max_similarity is not None and max_similarity >= threshold: data_info["id"] = doc_compare_mapping[ doc_compare_mapping[compare_name_dp] == max_similarity_name ][compare_id_dp].values[0] data_info["legal_name"] = max_similarity_name data_info["similarity"] = max_similarity if data_info.get("id", None) is None or len(data_info.get("id", "")) == 0: # set pre_common_word_list, reason: the document mapping for same fund maybe different with provider mapping # the purpose is to get the most common word list, to improve the similarity. max_similarity_name, max_similarity = get_most_similar_name( raw_name, provider_compare_name_list, share_name=raw_share_name, fund_name=raw_fund_name, matching_type=matching_type, pre_common_word_list=pre_common_word_list, process_cache=process_cache ) threshold = 0.7 if matching_type == "share": threshold = 0.5 round_similarity = 0 if max_similarity is not None and isinstance(max_similarity, float): round_similarity = round(max_similarity, 1) if round_similarity is not None and round_similarity >= threshold: data_info["id"] = provider_compare_mapping[ provider_compare_mapping[compare_name_dp] == max_similarity_name ][compare_id_dp].values[0] data_info["legal_name"] = max_similarity_name data_info["similarity"] = max_similarity else: if len(doc_compare_name_list) == 1: data_info["id"] = doc_compare_mapping[ doc_compare_mapping[compare_name_dp] == doc_compare_name_list[0] ][compare_id_dp].values[0] data_info["legal_name"] = doc_compare_name_list[0] data_info["similarity"] = 1 else: data_info["id"] = "" data_info["legal_name"] = "" data_info["similarity"] = 0 data_info["investment_type"] = investment_type else: data_info["id"] = "" data_info["legal_name"] = "" data_info["investment_type"] = investment_type data_info["similarity"] = 0 return data_info