import re from utils.logger import logger from copy import deepcopy from traceback import print_exc total_currency_list = [ "USD", "EUR", "AUD", "JPY", "CHF", "GBP", "SEK", "CNY", "NZD", "CNH", "NOK", "SGD", "HKD", "ZAR", "PLN", "CAD", "CZK", "HUF", "DKK", "BRL", "SKK", "RON", "TRY", "BGN", "CUP", "MXN", "CLF", "XCD", "ISK", "IDR", "MNT", "AED", "AFN", "INR", "ESP", "RUB", "CLP", "KRW", "ETB", "DZD", "XEU", "XFO", ] share_features_full_name = ['Accumulation', 'Income', 'Distribution', 'Investor', 'Institutional', 'Admin', 'Advantage'] share_features_abbrevation = ['Acc', 'Inc', 'Dist', 'Div', 'Inv', 'Inst', 'Adm', 'Adv'] lower_pre_fix_fund_share = ['fund', "funds", 'portfolio', 'bond', 'bonds', 'class', 'classes', 'share', 'shares'] def add_slash_to_text_as_regex(text: str): if text is None or len(text) == 0: return text special_char_iter = re.finditer("\W", text) for special_iter in special_char_iter: if len(special_iter.group().strip()) == 0: continue replace = r"\{0}".format(special_iter.group()) if replace not in text: text = re.sub(replace, r"\\W", text) text = re.sub(r"( ){2,}", " ", text) text = text.replace(" ", r"\s*") return text def clean_text(text: str) -> str: # text = text.lower() # update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text) text = re.sub(r"( ){2,}", ' ', text.strip()) return text def get_most_similar_name(text: str, name_list: list, share_name: str = None, fund_name: str = None, matching_type="share", pre_common_word_list: list = None, process_cache: dict = None) -> str: """ Get the most similar fund name from fund_name_list by jacard similarity """ try: copy_name_list = deepcopy(name_list) if text is None or len(text.split()) == 0 or \ copy_name_list is None or len(copy_name_list) == 0: return None, None for i in range(len(copy_name_list)): copy_name = copy_name_list[i] if matching_type == "share": copy_name, _ = replace_share_name_for_multilingual(copy_name, None) share_part = get_share_part_list([copy_name])[0] if '-' in share_part: copy_name = copy_name.replace('-', ' ') copy_name = replace_abbrevation(copy_name) copy_name_list[i] = copy_name # get common words in fund_name_list common_word_list = [] if len(name_list) > 1: _, common_word_list = remove_common_word(copy_name_list) if pre_common_word_list is not None and len(pre_common_word_list) > 0: common_word_list.extend([word for word in pre_common_word_list if word not in common_word_list]) if len(common_word_list) > 0: common_word_list = [word for word in common_word_list if len(word) > 1 and word.upper() not in total_currency_list] text = text.strip() text = remove_special_characters(text) text = replace_abbrevation(text) raw_fund_name_split = [] if fund_name is not None and len(fund_name.strip()) > 0: fund_name = fund_name.strip() fund_name = remove_special_characters(fund_name) raw_fund_name_split = fund_name.upper().split() if share_name is not None: share_name = remove_special_characters(share_name) share_name = replace_abbrevation(share_name) text, share_name = replace_share_name_for_multilingual(text, share_name) text_splits = text.split() if len(text_splits) == 1: text = split_words_without_space(text) else: new_splits = [] for split in text_splits: if len(split) > 1: new_splits.extend(split_words_without_space(split).split()) else: new_splits.append(split) text = ' '.join(new_splits) lower_new_splits = [split.lower() for split in new_splits] for word in common_word_list: if word not in lower_new_splits: # remove word in fund_name_list for i in range(len(copy_name_list)): temp_splits = copy_name_list[i].split() copy_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() != word]) max_similarity = 0 max_similarity_full_name = None text = remove_special_characters(text) if matching_type == "share": text, share_name, copy_name_list = update_for_currency(text, share_name, copy_name_list) text = ' '.join([split for split in text.split() if split.lower() not in lower_pre_fix_fund_share]) if share_name is not None: share_name = ' '.join([split for split in share_name.split() if split.lower() not in lower_pre_fix_fund_share]) copy_share_name_list = get_share_part_list(copy_name_list) for i in range(len(copy_name_list)): temp_splits = copy_name_list[i].split() copy_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() not in lower_pre_fix_fund_share]) text_currency = None text_feature = None text_share_short_name_list = None if matching_type == "share" and text is not None and len(text.strip()) > 0: if process_cache is not None and isinstance(process_cache, dict): if process_cache.get(text, None) is not None: cache = process_cache.get(text) text_share_short_name_list = cache.get("share_short_name") text_feature = cache.get("share_feature") text_currency = cache.get("share_currency") else: if share_name is not None and len(share_name.strip()) > 0: text_share_short_name_list = get_share_short_name_from_text(share_name, confirm_text_share=True) text_feature = get_share_feature_from_text(share_name) text_currency = get_currency_from_text(share_name) else: text_share_short_name_list = get_share_short_name_from_text(text, confirm_text_share=True) text_feature = get_share_feature_from_text(text) text_currency = get_currency_from_text(text) # sort text_share_short_name_list text_share_short_name_list.sort() process_cache[text] = { "share_short_name": text_share_short_name_list, "share_feature": text_feature, "share_currency": text_currency } else: if share_name is not None and len(share_name.strip()) > 0: text_share_short_name_list = get_share_short_name_from_text(share_name, confirm_text_share=True) text_share_short_name_list.sort() text_feature = get_share_feature_from_text(share_name) text_currency = get_currency_from_text(share_name) else: text_share_short_name_list = get_share_short_name_from_text(text, confirm_text_share=True) text_feature = get_share_feature_from_text(text) text_currency = get_currency_from_text(text) # logger.info(f"Source text: {text}, candidate names count: {len(copy_name_list)}") same_max_similarity_name_list = [] for full_name, copy_name, copy_share_name in zip(name_list , copy_name_list, copy_share_name_list): if not isinstance(copy_name, str) or len(copy_name.strip()) == 0: continue copy_name = remove_special_characters(copy_name) copy_name = split_words_without_space(copy_name) copy_name_short_name_list = None copy_name_feature = None copy_name_currency = None if matching_type == "share": if process_cache is not None and isinstance(process_cache, dict): if process_cache.get(copy_name, None) is not None: cache = process_cache.get(copy_name) copy_name_short_name_list = cache.get("share_short_name") copy_name_feature = cache.get("share_feature") copy_name_currency = cache.get("share_currency") else: copy_name_short_name_list = get_share_short_name_from_text(copy_share_name) if copy_name_short_name_list is not None: copy_name_short_name_list.sort() copy_name_feature = get_share_feature_from_text(copy_share_name) copy_name_currency = get_currency_from_text(copy_share_name) process_cache[copy_name] = { "share_short_name": copy_name_short_name_list, "share_feature": copy_name_feature, "share_currency": copy_name_currency } else: copy_name_short_name_list = get_share_short_name_from_text(copy_share_name) copy_name_short_name_list.sort() copy_name_feature = get_share_feature_from_text(copy_share_name) copy_name_currency = get_currency_from_text(copy_share_name) try: if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \ copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0: updated_text_share_short_name_list, updated_copy_name_short_name_list = \ compare_both_short_name(text_share_short_name_list, copy_name_short_name_list) if updated_text_share_short_name_list != text_share_short_name_list: text = ' '.join([split for split in text.split() if split not in text_share_short_name_list]) text += ' ' + ' '.join(updated_text_share_short_name_list) text_share_short_name_list = updated_text_share_short_name_list if updated_copy_name_short_name_list != copy_name_short_name_list: copy_name = ' '.join([split for split in copy_name.split() if split not in copy_name_short_name_list]) copy_name += ' ' + ' '.join(updated_copy_name_short_name_list) copy_name_short_name_list = updated_copy_name_short_name_list except Exception as e: print(e) try: similarity = get_jacard_similarity(text, copy_name, need_remove_numeric_characters=False) except Exception as e: print(e) print_exc() similarity = 0 if similarity == 1: return full_name, similarity copy_name_2 = replace_abbrevation(copy_name) if copy_name != copy_name_2: similarity_2 = get_jacard_similarity(text, copy_name_2, need_remove_numeric_characters=False) if similarity_2 > similarity: similarity = similarity_2 if similarity > max_similarity: if matching_type == "share": if text_currency is not None and len(text_currency) > 0 and \ copy_name_currency is not None and len(copy_name_currency) > 0: if text_currency != copy_name_currency: continue if text_feature is not None and len(text_feature) > 0 and \ copy_name_feature is not None and len(copy_name_feature) > 0: if text_feature != copy_name_feature: if text_feature.lower() not in copy_name.lower().split() and \ copy_name_feature.lower() != "accmulation" and \ copy_name_feature.lower() not in text.lower().split(): continue if matching_type == "share": if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \ copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0: short_name_invalid = False for short in text_share_short_name_list: if short not in copy_name_short_name_list: short_name_invalid = True break for compare_short in copy_name_short_name_list: if compare_short not in text_share_short_name_list: # some short word is in fund name, but not belong to share name if compare_short.upper() not in raw_fund_name_split: short_name_invalid = True break if short_name_invalid: continue max_similarity = similarity max_similarity_full_name = full_name same_max_similarity_name_list = [] elif matching_type == "fund" and max_similarity > 0 and max_similarity == similarity: if full_name is not None and max_similarity_full_name is not None and \ len(full_name.split()) > len(max_similarity_full_name.split()): max_similarity_full_name = full_name same_max_similarity_name_list = [] else: if full_name is not None: same_max_similarity_name_list.append(full_name) if max_similarity == 1: break # if there are multiple names with the same similarity, return None if len(same_max_similarity_name_list) > 0: return None, 0.0 if max_similarity < 0.35: return None, max_similarity return max_similarity_full_name, max_similarity except Exception as e: print(e) print_exc() return None, 0.0 def replace_share_name_for_multilingual(text: str, share_name: str): if text is None or len(text.strip()) == 0: return text, share_name multilingual_share_list = ["Catégorie de parts", "Classe di quote", "Kategorie Anteile", "Kategorie anteile", "Clase de participaciones", "Aandelenklasse", "aandelenklasse", "Anteilklasse", "anteilklasse"] for multilingual_share in multilingual_share_list: if multilingual_share in text: text = text.replace(multilingual_share, "Class") if share_name is not None and len(share_name.strip()) > 0: share_name = share_name.replace(multilingual_share, "Class") break return text, share_name def compare_both_short_name(text_short_name_list: list, compare_short_name_list: list): copy_text_short_name_list = deepcopy(text_short_name_list) copy_compare_short_name_list = deepcopy(compare_short_name_list) copy_text_short_name_list = verify_short_name_container(copy_text_short_name_list, copy_compare_short_name_list) copy_compare_short_name_list = verify_short_name_container(copy_compare_short_name_list, copy_text_short_name_list) return copy_text_short_name_list, copy_compare_short_name_list def verify_short_name_container(left_short_name_list: list, right_short_name_list: list): length_1_over_1 = False length_1_count = 0 length_1_list = [] for short_name in left_short_name_list: if len(short_name) == 1: length_1_count += 1 length_1_list.append(short_name) if length_1_count > 1: length_1_over_1 = True if length_1_over_1: for compare_short_name in right_short_name_list: if len(compare_short_name) == length_1_count: all_in = True for short_name in length_1_list: if short_name not in compare_short_name: all_in = False break if all_in: for short_name in length_1_list: if short_name in left_short_name_list: left_short_name_list.remove(short_name) left_short_name_list.append(compare_short_name) return left_short_name_list def get_share_part_list(text_list: list): share_part_list = [] for text in text_list: text_split = text.split("Funds") if len(text_split) == 1: text_split = text.split("Fund") if len(text_split) == 1: text_split = text.split("Portfolio") if len(text_split) == 1: text_split = text.split("Bonds") if len(text_split) == 1: text_split = text.split("Bond") if len(text_split) > 1: share_part_text = text_split[-1].strip() else: share_part_text = text.strip() share_part_text = ' '.join([split for split in share_part_text.split() if remove_special_characters(split).lower() not in lower_pre_fix_fund_share]) share_part_list.append(share_part_text) return share_part_list def get_share_short_name_from_text(text: str, confirm_text_share: bool = False): if text is None or len(text.strip()) == 0: return None text = remove_special_characters(text.strip()) text_split = text.split() temp_share_features = [feature.lower() for feature in share_features_full_name] count = 0 share_short_name_list = [] if confirm_text_share: count_threshold = 6 else: count_threshold = 4 for split in text_split[::-1]: if count == count_threshold: break if split.lower() not in temp_share_features and \ split.upper() not in total_currency_list: if len(split) <= 3: share_short_name_list.append(split.upper()) count += 1 if len(share_short_name_list) > 1: remove_number = [] for short_name in share_short_name_list[::-1]: if short_name.isdigit(): remove_number.append(short_name) else: break for remove in remove_number: if remove in share_short_name_list: share_short_name_list.remove(remove) return share_short_name_list def get_share_feature_from_text(text: str): if text is None or len(text.strip()) == 0: return None text = text.strip() text = text.lower() text_split = text.split() temp_share_features = [feature.lower() for feature in share_features_full_name] count = 0 for split in text_split[::-1]: if count == 4: break if split.lower() in temp_share_features: return split count += 1 return None def get_currency_from_text(text: str): if text is None or len(text.strip()) == 0: return None text = text.strip() text_split = text.split() count = 0 currency_list = [] for split in text_split[::-1]: if count == 4: break if split.upper() in total_currency_list: currency_list.append(split.upper()) count += 1 if len(currency_list) > 1: # remove the first currency from currency list if currency_list[0] in ['USD', 'EUR']: currency_list.pop(0) else: remove_currency = None for currency in currency_list: if currency in ['USD', 'EUR']: remove_currency = currency break if remove_currency is not None: currency_list.remove(remove_currency) return currency_list[0] elif len(currency_list) == 1: return currency_list[0] else: return None def update_for_currency(text: str, share_name: str, compare_list: list): currency_in_text = get_currency_from_text(text) with_currency = False if currency_in_text is not None: with_currency = True with_currency_list = [] without_currency_list = [] for index, compare in enumerate(compare_list): # compare_split = compare.split() with_currency_compare = False currecy_in_compare = get_currency_from_text(compare) if currecy_in_compare is not None: with_currency_compare = True if with_currency_compare: with_currency_list.append(index) else: without_currency_list.append(index) if not with_currency and len(with_currency_list) == 0: pass elif not with_currency and len(with_currency_list) > 0: share_short_name_list = [] if share_name is not None and len(share_name.strip()) > 0: share_short_name_list = get_share_short_name_from_text(share_name) updated = False if len(share_short_name_list) > 0: if len(without_currency_list) > 0: for index in without_currency_list: all_in_list = True compare_split = [split.upper() for split in compare_list[index].split()] for share_shot_name in share_short_name_list: if share_shot_name not in compare_split: all_in_list = False break if all_in_list: text = text + ' ' + 'USD' if share_name is not None: share_name = share_name + ' ' + 'USD' updated = True break if not updated: currency_list = [] for index in with_currency_list: all_in_list = True compare_split = [split.upper() for split in compare_list[index].split()] for share_shot_name in share_short_name_list: if share_shot_name not in compare_split: all_in_list = False break if all_in_list: current_currency_list = [split for split in compare_split if split.upper() in total_currency_list] if len(current_currency_list) > 0: currency_list.append(current_currency_list[-1]) if len(currency_list) == 1: text = text + ' ' + currency_list[0] if share_name is not None: share_name = share_name + ' ' + currency_list[0] updated = True for index in without_currency_list: compare_list[index] = compare_list[index] + ' ' + 'USD' if not updated: text = text + ' ' + 'USD' if share_name is not None: share_name = share_name + ' ' + 'USD' # return text, share_name, compare_list elif with_currency and len(without_currency_list) == 0: for index in without_currency_list: compare_list[index] = compare_list[index] + ' ' + 'USD' # return text, share_name, compare_list else: # return text, share_name, compare_list pass default_currency = 'USD' if with_currency: share_name_split = share_name.split() share_name_currency = get_currency_from_text(share_name) if share_name_currency is not None and share_name_currency in total_currency_list: for split in share_name_split: if split in total_currency_list and split != share_name_currency: default_currency = split break new_share_name = ' '.join([split for split in share_name_split if split not in total_currency_list or (split == share_name_currency)]) if share_name in text: text = text.replace(share_name, new_share_name) else: text = ' '.join([split for split in text.split() if split not in total_currency_list or (split == share_name_currency)]) share_name = new_share_name for c_i in range(len(compare_list)): compare = compare_list[c_i] compare_share_part = get_share_part_list([compare])[0] compare_share_part_split = compare_share_part.split() compare_share_part_currency_list = [] for split in compare_share_part_split: if split.upper() in total_currency_list and split.upper() not in compare_share_part_currency_list: compare_share_part_currency_list.append(split) if len(compare_share_part_currency_list) > 1 and default_currency in compare_share_part_currency_list: compare_share_part_split = [split for split in compare_share_part_split if split.upper() != default_currency] new_compare_share_part = ' '.join(compare_share_part_split) compare_list[c_i] = compare.replace(compare_share_part, new_compare_share_part) return text, share_name, compare_list def remove_common_word(text_list: list): if text_list is None or len(text_list) == 0: return text_list new_text_list = [] for text in text_list: text = text.lower() text = remove_special_characters(text) text_splits = text.split() text = ' '.join([split for split in text_splits if split.lower() not in lower_pre_fix_fund_share]) new_text_list.append(text) # remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words # the result is ['Global', 'Growth'] common_word_list = [] new_text_splits_list = [text.split() for text in new_text_list] with_common_word = False for i in range(len(new_text_splits_list)): for j in range(i+1, len(new_text_splits_list)): if common_word_list is None or len(common_word_list) == 0: common_word_list = list( set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j]))) else: common_word_list = list( set(common_word_list).intersection(set(new_text_splits_list[j]))) if len(common_word_list) > 0: with_common_word = True if with_common_word and len(common_word_list) == 0: break if with_common_word and len(common_word_list) == 0: break remove_list = [] # if exists the share name and currency name, remove from the list for word in common_word_list: if word.upper() in total_currency_list: remove_list.append(word) for remove in remove_list: if remove in common_word_list: common_word_list.remove(remove) common_word_list = list(set(common_word_list)) for i in range(len(new_text_splits_list)): for common_word in common_word_list: if common_word in new_text_splits_list[i]: new_text_splits_list[i].remove(common_word) new_text_list = [' '.join(text_splits) for text_splits in new_text_splits_list] return new_text_list, common_word_list def split_words_without_space(text: str): """ Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund' """ if text is None or len(text.strip()) == 0: return [] text = text.strip() # splits = text.split() # if len(splits) > 1: # return text # find all words with capital letter + lower letter regex = r"[A-Z][a-z]+" regex2 = r"[A-Z]{2,}[a-z]+" word_list = re.findall(regex, text) word_list2 = re.findall(regex2, text) if len(word_list) > 0: for word in word_list: if len(word_list2) > 0: word_exists_in_word2 = False for word2 in word_list2: if word in word2: word_exists_in_word2 = True break if word_exists_in_word2: continue text = text.replace(word, " " + word + " ") text = re.sub(r"(\s)+", " ", text) return text.strip() def remove_special_characters(text): text = re.sub(r'\W', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_unique_words_text(text): text = remove_special_characters(text) text = text.lower() text_split = text.split() text_split = list(set(text_split)) # sort the list text_split.sort() return_text = ' '.join(text_split) return return_text def remove_numeric_characters(text): # remove numeric characters text = re.sub(r'\d+', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_jacard_similarity(text_left, text_right, need_remove_special_characters=True, need_remove_numeric_characters=True): if need_remove_special_characters: text_left = remove_special_characters(text_left) text_right = remove_special_characters(text_right) if need_remove_numeric_characters: text_left = remove_numeric_characters(text_left) text_right = remove_numeric_characters(text_right) text_left = text_left.lower() text_right = text_right.lower() text_left = text_left.split() text_right = text_right.split() intersection = set(text_left).intersection(set(text_right)) union = set(text_left).union(set(text_right)) intersection_count = len(intersection) union_count = len(union) differ_a = list(set(text_left).difference(set(text_right))) differ_a.sort() differ_b = list(set(text_right).difference(set(text_left))) differ_b.sort() if ''.join(differ_a) == ''.join(differ_b): intersection_count += len(differ_a) + len(differ_b) if union_count > 0: return round(intersection_count / union_count, 3) else: return 0 def simple_most_similarity_name(text: str, name_list: list): if text is None or len(text.strip()) == 0 or \ name_list is None or len(name_list) == 0: return None, 0.0 max_similarity = 0 max_similarity_name = None for full_name in name_list: similarity = get_jacard_similarity(text, full_name) if similarity > max_similarity: max_similarity = similarity max_similarity_name = full_name if max_similarity == 1: break return max_similarity_name, max_similarity def get_beginning_common_words(text_list: list): """ Get the beginning common words in text_list """ if text_list is None or len(text_list) < 2: return [] common_words_list = [] first_text_split = text_list[0].split() for w_i, word in enumerate(first_text_split): all_same = True for text in text_list[1:]: text_split = text.split() if w_i >= len(text_split): all_same = False break if text_split[w_i] != word: all_same = False break if all_same: common_words_list.append(word) else: break return ' '.join(common_words_list).strip() def replace_abbrevation(text: str): if text is None or len(text.strip()) == 0: return text text = text.replace('(', ' ').replace(')', ' ').replace('-', ' ') text = re.sub(r'\s+', ' ', text).strip() if 'swiss franc' in text.lower(): text = re.sub(r'swiss\s+franc', 'CHF', text, flags=re.IGNORECASE) elif 'us dollar' in text.lower(): text = re.sub(r'us\s+dollar', 'USD', text, flags=re.IGNORECASE) elif 'singapore dollar' in text.lower(): text = re.sub(r'singapore\s+dollar', 'SGD', text, flags=re.IGNORECASE) elif 'hong kong dollar' in text.lower(): text = re.sub(r'hong\s+kong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'hongkong dollar' in text.lower(): text = re.sub(r'hongkong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'australian dollar' in text.lower(): text = re.sub(r'australian\s+dollar', 'AUD', text, flags=re.IGNORECASE) elif 'japanese yen' in text.lower(): text = re.sub(r'japanese\s+yen', 'JPY', text, flags=re.IGNORECASE) elif 'south african rand' in text.lower(): text = re.sub(r'South\s+African\s+rand', 'ZAR', text, flags=re.IGNORECASE) elif 'canadian dollar' in text.lower(): text = re.sub(r'canadian\s+dollar', 'CAD', text, flags=re.IGNORECASE) elif 'new zealand dollar' in text.lower(): text = re.sub(r'new\s+zealand\s+dollar', 'NZD', text, flags=re.IGNORECASE) elif 'norwegian krone' in text.lower(): text = re.sub(r'norwegian\s+krone', 'NOK', text, flags=re.IGNORECASE) elif 'danish krone' in text.lower(): text = re.sub(r'danish\s+krone', 'DKK', text, flags=re.IGNORECASE) elif 'swedish krona' in text.lower(): text = re.sub(r'swedish\s+krona', 'SEK', text, flags=re.IGNORECASE) elif 'swedish kronor' in text.lower(): text = re.sub(r'swedish\s+kronor', 'SEK', text, flags=re.IGNORECASE) elif "GPB" in text.split(): text = re.sub(r"GPB", "GBP", text, flags=re.IGNORECASE) elif 'sterling' in text.lower().split(): text = re.sub(r'sterling', 'GBP', text, flags=re.IGNORECASE) elif 'euro' in text.lower().split(): text = re.sub(r'euro', 'EUR', text, flags=re.IGNORECASE) elif '€' in text.lower().split(): text = re.sub(r'\€', 'EUR', text, flags=re.IGNORECASE) elif '$' in text.lower().split(): text = re.sub(r'\$', 'USD', text, flags=re.IGNORECASE) elif '£' in text.lower().split(): text = re.sub(r'\£', 'GBP', text, flags=re.IGNORECASE) elif 'RMB' in text.split(): text = re.sub(r'RMB', 'CNY', text, flags=re.IGNORECASE) else: pass text_splits = text.split() new_text_splits = [] for split in text_splits: if split.lower() in ['acc', 'acc.', 'accumulating']: new_text_splits.append('Accumulation') elif split.lower() in ['inc', 'inc.']: new_text_splits.append('Income') elif split.lower() in ['dist', 'dist.', 'dis', 'dis.', "distributing"]: new_text_splits.append('Distribution') elif split.lower() in ['inv', 'inv.']: new_text_splits.append('Investor') elif split.lower() in ['inst', 'inst.', 'institution']: new_text_splits.append('Institutional') elif split.lower() in ['cap', 'cap.']: new_text_splits.append('Capitalisation') elif split.lower() in ['div', 'div.']: new_text_splits.append('Dividend') elif split.lower() in ['adm', 'adm.']: new_text_splits.append('Admin') elif split.lower() in ['adv', 'adv.']: new_text_splits.append('Advantage') elif split.lower() in ['hdg', 'hgd', 'hdg.', 'hgd.', '(h)']: new_text_splits.append('Hedged') elif split.lower() in ['cl', 'cl.']: new_text_splits.append('Class') elif split.lower() in ['ser', 'ser.']: new_text_splits.append('Series') elif split.lower() in ['u.s.']: new_text_splits.append('US') elif split.lower() in ['nc', 'nc.']: new_text_splits.append('no trail') elif split.lower() in ['non']: new_text_splits.append('Not') elif split.lower() in ['net', 'unhgd']: new_text_splits.append('') else: split = split_short_name_with_share_features(split) new_text_splits.append(split) new_text = ' '.join(new_text_splits) new_text = re.sub(r'\s+', ' ', new_text).strip() return new_text def split_short_name_with_share_features(text: str): """ Split short name with share features, for examples: Document mapping for 532422720 CHFHInc to be CHF H Income USDHAcc to be USD H Accumulation GBPHInc to be GBP H Income HAcc to be H Accumulation GBPHedgedAcc to be GBP Hedged Accumulation HGBPInc to be H GBP Income HNOKAcc to be H NOK Accumulation """ if text is None or len(text.strip()) == 0: return text if len(text.split()) > 1: return text text = text.strip() share_features = {'Acc': 'Accumulation', 'Inc': 'Income', 'Dist': 'Distribution', 'Div': 'Dividend',} feature_name = "" for key, value in share_features.items(): if len(text) > len(key) and text.endswith(key): feature_name = value text = text.replace(key, '') break currency_text = "" for currency in total_currency_list: if len(text) > len(currency) and currency in text: currency_text = currency text = text.replace(currency, '') break new_text = currency_text + ' ' + text + ' ' + feature_name new_text = re.sub(r'\s+', ' ', new_text).strip() return new_text