import os import json import pandas as pd from copy import deepcopy from utils.biz_utils import get_most_similar_name, remove_common_word from utils.sql_query_util import ( query_document_fund_mapping, query_investment_by_provider, ) from utils.logger import logger from core.auz_nz.hybrid_solution_script import final_function_to_match class DataMapping: def __init__( self, doc_id, datapoints: list, raw_document_data_list: list, document_mapping_info_df: pd.DataFrame, output_data_folder: str, doc_source: str = "emea_ar", compare_with_provider: bool = True ): self.doc_id = doc_id self.datapoints = datapoints self.doc_source = doc_source self.compare_with_provider = compare_with_provider self.raw_document_data_list = raw_document_data_list if document_mapping_info_df is None or len(document_mapping_info_df) == 0: self.document_mapping_info_df = query_document_fund_mapping( doc_id, rerun=False ) else: self.document_mapping_info_df = document_mapping_info_df if output_data_folder is None or len(output_data_folder) == 0: output_data_folder = r"/data/emea_ar/output/mapping_data/docs/" os.makedirs(output_data_folder, exist_ok=True) self.output_data_json_folder = os.path.join(output_data_folder, "json/") os.makedirs(self.output_data_json_folder, exist_ok=True) self.output_data_excel_folder = os.path.join(output_data_folder, "excel/") os.makedirs(self.output_data_excel_folder, exist_ok=True) self.set_mapping_data_by_db(self.document_mapping_info_df) def set_mapping_data_by_db(self, document_mapping_info_df: pd.DataFrame): logger.info("Setting document mapping data") if document_mapping_info_df is None or len(document_mapping_info_df) == 0: self.document_mapping_info_df = query_document_fund_mapping( self.doc_id, rerun=False ) else: self.document_mapping_info_df = document_mapping_info_df if len(self.document_mapping_info_df) == 0: self.doc_fund_name_list = [] self.doc_share_name_list = [] self.doc_fund_mapping = pd.DataFrame() self.doc_fund_class_mapping = pd.DataFrame() self.provider_name = "" else: self.doc_fund_name_list = ( self.document_mapping_info_df["FundName"].unique().tolist() ) self.doc_share_name_list = ( self.document_mapping_info_df["ShareClassName"].unique().tolist() ) self.doc_fund_mapping = self.document_mapping_info_df[ ["FundId", "FundName"] ].drop_duplicates() self.doc_fund_class_mapping = self.document_mapping_info_df[ ["FundId", "SecId", "ShareClassName", "CurrencyId"] ].drop_duplicates() self.provider_name = self.document_mapping_info_df["ProviderName"].values[0] logger.info("Setting provider mapping data") self.provider_mapping_df = self.get_provider_mapping() if len(self.provider_mapping_df) == 0: self.provider_fund_name_list = [] self.provider_share_name_list = [] self.provider_fund_mapping = pd.DataFrame() self.provider_fund_class_mapping = pd.DataFrame() else: self.provider_fund_name_list = ( self.provider_mapping_df["FundName"].unique().tolist() ) self.provider_share_name_list = ( self.provider_mapping_df["ShareClassName"].unique().tolist() ) self.provider_fund_mapping = self.provider_mapping_df[ ["FundId", "FundName"] ].drop_duplicates() self.provider_fund_class_mapping = self.provider_mapping_df[ ["FundId", "SecId", "ShareClassName", "CurrencyId"] ].drop_duplicates() def get_provider_mapping(self): if len(self.document_mapping_info_df) == 0: return pd.DataFrame() provider_id_list = self.document_mapping_info_df["ProviderId"].unique().tolist() provider_mapping_list = [] for provider_id in provider_id_list: provider_mapping_list.append( query_investment_by_provider(provider_id, rerun=False) ) provider_mapping_df = pd.concat(provider_mapping_list) provider_mapping_df = provider_mapping_df.drop_duplicates() provider_mapping_df.reset_index(drop=True, inplace=True) return provider_mapping_df def mapping_raw_data_entrance(self): if self.doc_source == "emea_ar": return self.mapping_raw_data() elif self.doc_source == "aus_prospectus": return self.mapping_raw_data_generic() else: return self.mapping_raw_data() # return self.mapping_raw_data_generic() def mapping_raw_data_generic(self): logger.info(f"Mapping raw data for AUS Prospectus document {self.doc_id}") mapped_data_list = [] # Generate raw name based on fund name and share name by integrate_share_name fund_raw_name_list = [] share_raw_name_list = [] for page_data in self.raw_document_data_list: doc_id = page_data.get("doc_id", "") page_index = page_data.get("page_index", "") raw_data_list = page_data.get("extract_data", {}).get("data", []) for raw_data in raw_data_list: raw_fund_name = raw_data.get("fund_name", "") if raw_fund_name is None or len(raw_fund_name) == 0: continue raw_share_name = raw_data.get("share_name", "") raw_data_keys = list(raw_data.keys()) if len(raw_share_name) > 0: integrated_share_name = self.integrate_share_name( raw_fund_name, raw_share_name ) if integrated_share_name not in share_raw_name_list: share_raw_name_list.append(integrated_share_name) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": integrated_share_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": 1, "investment_id": "", "investment_name": "", "similarity": 0, } mapped_data_list.append(mapped_data) else: if raw_fund_name not in fund_raw_name_list: fund_raw_name_list.append(raw_fund_name) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": "", "raw_name": raw_fund_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": 33, "investment_id": "", "investment_name": "", } mapped_data_list.append(mapped_data) # Mapping raw data with database iter_count = 60 fund_match_result = {} if len(fund_raw_name_list) > 0: fund_match_result = self.get_raw_name_db_match_result( fund_raw_name_list, "fund", iter_count ) # logger.info(f"Fund match result: \n{fund_match_result}") share_match_result = {} if len(share_raw_name_list) > 0: share_match_result = self.get_raw_name_db_match_result( share_raw_name_list, "share", iter_count ) # logger.info(f"Share match result: \n{share_match_result}") for mapped_data in mapped_data_list: investment_type = mapped_data["investment_type"] raw_name = mapped_data["raw_name"] if investment_type == 33: if fund_match_result.get(raw_name) is not None: matched_db_fund_name = fund_match_result[raw_name] if ( matched_db_fund_name is not None and len(matched_db_fund_name) > 0 ): # get FundId from self.doc_fund_mapping find_fund_df = self.doc_fund_mapping[ self.doc_fund_mapping["FundName"] == matched_db_fund_name ] if find_fund_df is not None and len(find_fund_df) > 0: fund_id = find_fund_df["FundId"].values[0] mapped_data["investment_id"] = fund_id mapped_data["investment_name"] = matched_db_fund_name mapped_data["similarity"] = 1 if investment_type == 1: if share_match_result.get(raw_name) is not None: matched_db_share_name = share_match_result[raw_name] if ( matched_db_share_name is not None and len(matched_db_share_name) > 0 ): # get SecId from self.doc_fund_class_mapping find_share_df = self.doc_fund_class_mapping[ self.doc_fund_class_mapping["ShareClassName"] == matched_db_share_name ] if find_share_df is not None and len(find_share_df) > 0: share_id = find_share_df["SecId"].values[0] mapped_data["investment_id"] = share_id mapped_data["investment_name"] = matched_db_share_name mapped_data["similarity"] = 1 self.output_mapping_file(mapped_data_list) if self.doc_source == "aus_prospectus": output_data_folder_splits = self.output_data_excel_folder.split("output") if len(output_data_folder_splits) == 2: merged_data_folder = f'{output_data_folder_splits[0]}output/merged_data/docs/' os.makedirs(merged_data_folder, exist_ok=True) merged_data_json_folder = os.path.join(merged_data_folder, "json/") os.makedirs(merged_data_json_folder, exist_ok=True) merged_data_excel_folder = os.path.join(merged_data_folder, "excel/") os.makedirs(merged_data_excel_folder, exist_ok=True) merged_data_list = self.merge_output_data_aus_prospectus(mapped_data_list, merged_data_json_folder, merged_data_excel_folder) return merged_data_list else: return mapped_data_list def merge_output_data_aus_prospectus(self, mapped_data_list: list, merged_data_json_folder: str, merged_data_excel_folder: str): # TODO: merge output data for aus prospectus, plan to realize it on 2025-01-16 if mapped_data_list is None or len(mapped_data_list) == 0: return if merged_data_json_folder is None or len(merged_data_json_folder) == 0: return if merged_data_excel_folder is None or len(merged_data_excel_folder) == 0: return mapping_data_df = pd.DataFrame(mapped_data_list) mapping_data_df.reset_index(drop=True, inplace=True) mapping_data_df.fillna("", inplace=True) document_mapping_df = self.document_mapping_info_df document_mapping_df.fillna("", inplace=True) datapoint_keyword_config_file = ( f"./configuration/{self.doc_source}/datapoint_name.json" ) with open(datapoint_keyword_config_file, "r", encoding="utf-8") as f: datapoint_keyword_config = json.load(f) datapoint_name_list = list(datapoint_keyword_config.keys()) total_data_list = [] doc_date = str(document_mapping_df["EffectiveDate"].values[0])[0:10] share_doc_data_df = mapping_data_df[(mapping_data_df["investment_type"] == 1)] exist_raw_name_list = [] for index, row in share_doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_fund_name = str(row["raw_fund_name"]) raw_share_name = str(row["raw_share_name"]) raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] investment_type = row["investment_type"] share_class_id = row["investment_id"] share_class_legal_name = row["investment_name"] fund_id = "" fund_legal_name = "" if share_class_id != "": record_row = document_mapping_df[document_mapping_df["SecId"] == share_class_id] if len(record_row) > 0: fund_id = record_row["FundId"].values[0] fund_legal_name = record_row["FundName"].values[0] exist = False for exist_raw_name_info in exist_raw_name_list: exist_raw_name = exist_raw_name_info["raw_name"] exist_investment_type = exist_raw_name_info["investment_type"] exist_investment_id = exist_raw_name_info["investment_id"] if ( exist_raw_name == raw_name and exist_investment_type == investment_type ) or (len(exist_investment_id) > 0 and exist_investment_id == share_class_id): exist = True break if not exist: data = { "DocumentId": doc_id, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": raw_name, "fund_id": fund_id, "fund_name": fund_legal_name, "sec_id": share_class_id, "sec_name": share_class_legal_name, "EffectiveDate": doc_date, "page_index": [], "RawName": raw_name, } for datapoint_name in datapoint_name_list: data[datapoint_name] = "" exist_raw_name_list.append( {"raw_name": raw_name, "investment_type": investment_type, "investment_id": share_class_id} ) total_data_list.append(data) # find data from total_data_list by raw_name for data in total_data_list: if data["raw_name"] == raw_name: update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) break if len(share_class_id) > 0 and data["sec_id"] == share_class_id: update_key = datapoint if len(str(data[update_key])) == 0: data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) break fund_doc_data_df = mapping_data_df[(mapping_data_df["investment_type"] == 33)] fund_doc_data_df.fillna("", inplace=True) for index, row in fund_doc_data_df.iterrows(): doc_id = str(row["doc_id"]) page_index = int(row["page_index"]) raw_fund_name = str(row["raw_fund_name"]) raw_share_name = "" raw_name = str(row["raw_name"]) datapoint = str(row["datapoint"]) value = row["value"] fund_id = row["investment_id"] fund_legal_name = row["investment_name"] exist = False if fund_id != "": for data in total_data_list: if (fund_id != "" and data["fund_id"] == fund_id) or ( data["raw_fund_name"] == raw_fund_name ): update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) exist = True else: for data in total_data_list: if data["raw_name"] == raw_name: update_key = datapoint data[update_key] = value if page_index not in data["page_index"]: data["page_index"].append(page_index) exist = True if not exist: data = { "DocumentId": doc_id, "raw_fund_name": raw_fund_name, "raw_share_name": "", "raw_name": raw_name, "fund_id": fund_id, "fund_name": fund_legal_name, "sec_id": "", "sec_name": "", "EffectiveDate": doc_date, "page_index": [page_index], "RawName": raw_name, } for datapoint_name in datapoint_name_list: data[datapoint_name] = "" data[datapoint] = value total_data_list.append(data) total_data_df = pd.DataFrame(total_data_list) total_data_df.fillna("", inplace=True) merged_data_excel_file = os.path.join(merged_data_excel_folder, f"merged_{self.doc_id}.xlsx") with pd.ExcelWriter(merged_data_excel_file) as writer: total_data_df.to_excel(writer, index=False, sheet_name="merged_data") merged_data_json_file = os.path.join(merged_data_json_folder, f"merged_{self.doc_id}.json") with open(merged_data_json_file, "w", encoding="utf-8") as f: json.dump(total_data_list, f, ensure_ascii=False, indent=4) return total_data_list def get_raw_name_db_match_result( self, raw_name_list, investment_type: str, iter_count: int = 30 ): # split raw_name_list into several parts which each part is with 30 elements # The reason to split is to avoid invoke token limitation issues from CahtGPT raw_name_list_parts = [ raw_name_list[i : i + iter_count] for i in range(0, len(raw_name_list), iter_count) ] all_match_result = {} doc_fund_name_list = deepcopy(self.doc_fund_name_list) doc_share_name_list = deepcopy(self.doc_share_name_list) for raw_name_list in raw_name_list_parts: if investment_type == "fund": match_result, doc_fund_name_list = self.get_final_function_to_match( raw_name_list, doc_fund_name_list ) else: match_result, doc_share_name_list = self.get_final_function_to_match( raw_name_list, doc_share_name_list ) all_match_result.update(match_result) return all_match_result def get_final_function_to_match(self, raw_name_list, db_name_list): if len(db_name_list) == 0: match_result = {} for raw_name in raw_name_list: match_result[raw_name] = "" else: match_result = final_function_to_match( doc_id=self.doc_id, pred_list=raw_name_list, db_list=db_name_list, provider_name=self.provider_name, doc_source=self.doc_source ) matched_name_list = list(match_result.values()) db_name_list = self.remove_matched_names(db_name_list, matched_name_list) return match_result, db_name_list def remove_matched_names(self, target_name_list: list, matched_name_list: list): if len(matched_name_list) == 0: return target_name_list matched_name_list = list(set(matched_name_list)) matched_name_list = [ value for value in matched_name_list if value is not None and len(value) > 0 ] for matched_name in matched_name_list: if ( matched_name is not None and len(matched_name) > 0 and matched_name in target_name_list ): target_name_list.remove(matched_name) return target_name_list def mapping_raw_data(self): """ doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name """ logger.info(f"Mapping raw data for document {self.doc_id}") mapped_data_list = [] mapped_fund_cache = {} mapped_share_cache = {} process_cache = {} for page_data in self.raw_document_data_list: doc_id = page_data.get("doc_id", "") page_index = page_data.get("page_index", "") raw_data_list = page_data.get("extract_data", {}).get("data", []) for raw_data in raw_data_list: raw_fund_name = raw_data.get("fund_name", "") if raw_fund_name is None or len(raw_fund_name) == 0: continue raw_share_name = raw_data.get("share_name", "") if ( len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0 ): if len(raw_share_name) > 0: integrated_share_name = self.integrate_share_name( raw_fund_name, raw_share_name ) raw_data_keys = list(raw_data.keys()) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": integrated_share_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": 1, "investment_id": "", "investment_name": "", "similarity": 0, } mapped_data_list.append(mapped_data) else: raw_data_keys = list(raw_data.keys()) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": "", "raw_name": raw_fund_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": 33, "investment_id": "", "investment_name": "", } mapped_data_list.append(mapped_data) else: raw_name = "" if raw_share_name is not None and len(raw_share_name) > 0: raw_name = self.integrate_share_name( raw_fund_name, raw_share_name ) if mapped_share_cache.get(raw_name) is not None: investment_info = mapped_share_cache[raw_name] else: if mapped_fund_cache.get(raw_fund_name) is not None: fund_info = mapped_fund_cache[raw_fund_name] fund_id = fund_info["id"] else: fund_info = self.matching_with_database( raw_name=raw_fund_name, matching_type="fund" ) fund_id = fund_info["id"] mapped_fund_cache[raw_fund_name] = fund_info investment_info = {} if len(fund_id) > 0: investment_info = self.mapping_unique_raw_data(fund_id=fund_id, raw_fund_name=raw_fund_name, raw_data_list=raw_data_list) if investment_info.get("id", None) is None or len(investment_info.get("id", "")) == 0: investment_info = self.matching_with_database( raw_name=raw_name, raw_share_name=raw_share_name, raw_fund_name=raw_fund_name, parent_id=fund_id, matching_type="share", process_cache=process_cache, ) mapped_share_cache[raw_name] = investment_info elif raw_fund_name is not None and len(raw_fund_name) > 0: raw_name = raw_fund_name if mapped_fund_cache.get(raw_fund_name) is not None: investment_info = mapped_fund_cache[raw_fund_name] else: investment_info = self.matching_with_database( raw_name=raw_fund_name, matching_type="fund" ) mapped_fund_cache[raw_fund_name] = investment_info else: raw_name = "" investment_info = { "id": "", "legal_name": "", "investment_type": -1, "similarity": 0, } raw_data_keys = list(raw_data.keys()) for datapoint in self.datapoints: if datapoint in raw_data_keys: mapped_data = { "doc_id": doc_id, "page_index": page_index, "raw_fund_name": raw_fund_name, "raw_share_name": raw_share_name, "raw_name": raw_name, "datapoint": datapoint, "value": raw_data[datapoint], "investment_type": investment_info["investment_type"], "investment_id": investment_info["id"], "investment_name": investment_info["legal_name"], "similarity": investment_info["similarity"], } mapped_data_list.append(mapped_data) self.output_mapping_file(mapped_data_list) return mapped_data_list def mapping_unique_raw_data(self, fund_id: str, raw_fund_name: str, raw_data_list: list): share_count = 0 for raw_data in raw_data_list: fund_name = raw_data.get("fund_name", "") share_name = raw_data.get("share_name", "") if fund_name == raw_fund_name and share_name is not None and len(share_name) > 0: share_count += 1 if share_count > 1: break data_info = {} if share_count == 1: doc_compare_mapping = self.doc_fund_class_mapping[ self.doc_fund_class_mapping["FundId"] == fund_id ] if len(doc_compare_mapping) == 1: data_info["id"] = doc_compare_mapping["SecId"].values[0] data_info["legal_name"] = doc_compare_mapping["ShareClassName"].values[0] data_info["investment_type"] = 1 data_info["similarity"] = 1 return data_info def output_mapping_file(self, mapped_data_list: list): json_data_file = os.path.join( self.output_data_json_folder, f"{self.doc_id}.json" ) with open(json_data_file, "w", encoding="utf-8") as f: json.dump(mapped_data_list, f, ensure_ascii=False, indent=4) extract_data_df = pd.DataFrame(self.raw_document_data_list) extract_data_df.reset_index(drop=True, inplace=True) mapping_data_df = pd.DataFrame(mapped_data_list) mapping_data_df.reset_index(drop=True, inplace=True) excel_data_file = os.path.join( self.output_data_excel_folder, f"{self.doc_id}.xlsx" ) try: with pd.ExcelWriter(excel_data_file) as writer: mapping_data_df.to_excel(writer, sheet_name="mapping_data", index=False) extract_data_df.to_excel(writer, sheet_name="extract_data", index=False) except Exception as e: logger.error(f"Failed to save excel file: {e}") def integrate_share_name(self, raw_fund_name: str, raw_share_name: str): raw_name = "" if raw_share_name is not None and len(raw_share_name) > 0: raw_name = raw_share_name # some share names are very short, # so we need to combine with fund name raw_name_splits = raw_name.split() raw_fund_name_splits = raw_fund_name.split() for split in raw_name_splits: if split not in raw_fund_name_splits: raw_fund_name_splits.append(split) raw_name = " ".join(raw_fund_name_splits) return raw_name def matching_with_database( self, raw_name: str, raw_share_name: str = None, raw_fund_name: str = None, parent_id: str = None, matching_type: str = "fund", process_cache: dict = {}, ): if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0: data_info["id"] = "" data_info["legal_name"] = "" if matching_type == "fund": investment_type = 33 else: investment_type = 1 data_info["investment_type"] = investment_type data_info["similarity"] = 0 return data_info if matching_type == "fund": doc_compare_name_list = self.doc_fund_name_list doc_compare_mapping = self.doc_fund_mapping provider_compare_name_list = self.provider_fund_name_list provider_compare_mapping = self.provider_fund_mapping compare_name_dp = "FundName" compare_id_dp = "FundId" investment_type = 33 else: if parent_id is not None and len(parent_id) > 0: # filter self.doc_fund_class_mapping by parent_id as FundId doc_compare_mapping = self.doc_fund_class_mapping[ self.doc_fund_class_mapping["FundId"] == parent_id ] provider_compare_mapping = self.provider_fund_class_mapping[ self.provider_fund_class_mapping["FundId"] == parent_id ] if len(doc_compare_mapping) == 0: if len(provider_compare_mapping) == 0: doc_compare_name_list = self.doc_share_name_list doc_compare_mapping = self.doc_fund_class_mapping provider_compare_name_list = self.provider_share_name_list provider_compare_mapping = self.provider_fund_class_mapping else: provider_compare_name_list = ( provider_compare_mapping["ShareClassName"].unique().tolist() ) doc_compare_name_list = [] doc_compare_mapping = pd.DataFrame() else: doc_compare_name_list = ( doc_compare_mapping["ShareClassName"].unique().tolist() ) if len(provider_compare_mapping) == 0 or len( provider_compare_mapping ) < len(doc_compare_mapping): provider_compare_name_list = doc_compare_name_list provider_compare_mapping = doc_compare_mapping else: provider_compare_name_list = ( provider_compare_mapping["ShareClassName"].unique().tolist() ) else: doc_compare_name_list = self.doc_share_name_list doc_compare_mapping = self.doc_fund_class_mapping provider_compare_name_list = self.provider_share_name_list provider_compare_mapping = self.provider_fund_class_mapping compare_name_dp = "ShareClassName" compare_id_dp = "SecId" investment_type = 1 data_info = {"name": raw_name} if len(provider_compare_name_list) > 0: pre_common_word_list = [] if doc_compare_name_list is not None and len(doc_compare_name_list) > 0: _, pre_common_word_list = remove_common_word(doc_compare_name_list) max_similarity_name, max_similarity = get_most_similar_name( raw_name, doc_compare_name_list, share_name=raw_share_name, fund_name=raw_fund_name, matching_type=matching_type, process_cache=process_cache, ) if matching_type == "fund": threshold = 0.7 else: if self.compare_with_provider: threshold = 0.9 else: threshold = 0.6 if max_similarity is not None and max_similarity >= threshold: data_info["id"] = doc_compare_mapping[ doc_compare_mapping[compare_name_dp] == max_similarity_name ][compare_id_dp].values[0] data_info["legal_name"] = max_similarity_name data_info["similarity"] = max_similarity if data_info.get("id", None) is None or len(data_info.get("id", "")) == 0: # set pre_common_word_list, reason: the document mapping for same fund maybe different with provider mapping # the purpose is to get the most common word list, to improve the similarity. if self.compare_with_provider: max_similarity_name, max_similarity = get_most_similar_name( raw_name, provider_compare_name_list, share_name=raw_share_name, fund_name=raw_fund_name, matching_type=matching_type, pre_common_word_list=pre_common_word_list, process_cache=process_cache, ) threshold = 0.7 if matching_type == "share": threshold = 0.5 round_similarity = 0 if max_similarity is not None and isinstance(max_similarity, float): round_similarity = round(max_similarity, 1) if round_similarity is not None and round_similarity >= threshold: data_info["id"] = provider_compare_mapping[ provider_compare_mapping[compare_name_dp] == max_similarity_name ][compare_id_dp].values[0] data_info["legal_name"] = max_similarity_name data_info["similarity"] = max_similarity else: if len(doc_compare_name_list) == 1: data_info["id"] = doc_compare_mapping[ doc_compare_mapping[compare_name_dp] == doc_compare_name_list[0] ][compare_id_dp].values[0] data_info["legal_name"] = doc_compare_name_list[0] data_info["similarity"] = 1 else: data_info["id"] = "" data_info["legal_name"] = "" data_info["similarity"] = 0 else: data_info["id"] = "" data_info["legal_name"] = "" data_info["similarity"] = 0 data_info["investment_type"] = investment_type else: data_info["id"] = "" data_info["legal_name"] = "" data_info["investment_type"] = investment_type data_info["similarity"] = 0 return data_info