import re from utils.logger import logger from copy import deepcopy from traceback import print_exc total_currency_list = [ "USD", "EUR", "AUD", "JPY", "CHF", "GBP", "SEK", "CNY", "NZD", "CNH", "NOK", "SGD", "HKD", "ZAR", "PLN", "CAD", "CZK", "HUF", "DKK", "BRL", "SKK", "RON", "TRY", "BGN", "CUP", "MXN", "CLF", "XCD", "ISK", "IDR", "MNT", "AED", "AFN", "INR", "ESP", "RUB", "CLP", "KRW", "ETB", "DZD", "XEU", "XFO", ] share_features_full_name = ['Accumulation', 'Income', 'Distribution', 'Investor', 'Institutional', 'Admin', 'Advantage'] share_features_abbrevation = ['Acc', 'Inc', 'Dist', 'Div', 'Inv', 'Inst', 'Adm', 'Adv'] def add_slash_to_text_as_regex(text: str): if text is None or len(text) == 0: return text special_char_iter = re.finditer("\W", text) for special_iter in special_char_iter: if len(special_iter.group().strip()) == 0: continue replace = r"\{0}".format(special_iter.group()) if replace not in text: text = re.sub(replace, replace, text) text = re.sub(r"\s+", r"\\s+", text) return text def clean_text(text: str) -> str: # text = text.lower() # update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text) text = re.sub(r"( ){2,}", ' ', text.strip()) return text def get_most_similar_name(text: str, name_list: list, share_name: str = None, matching_type="share", pre_common_word_list: list = None, process_cache: dict = None) -> str: """ Get the most similar fund name from fund_name_list by jacard similarity """ try: copy_name_list = deepcopy(name_list) if text is None or len(text.split()) == 0 or \ copy_name_list is None or len(copy_name_list) == 0: return None, None for i in range(len(copy_name_list)): copy_name = copy_name_list[i] share_part = get_share_part_list([copy_name])[0] if '-' in share_part: copy_name = copy_name.replace('-', ' ') copy_name = replace_abbrevation(copy_name) copy_name_list[i] = copy_name # get common words in fund_name_list common_word_list = [] if len(name_list) > 1: _, common_word_list = remove_common_word(copy_name_list) if pre_common_word_list is not None and len(pre_common_word_list) > 0: common_word_list.extend([word for word in pre_common_word_list if word not in common_word_list]) if len(common_word_list) > 0: common_word_list = [word for word in common_word_list if len(word) > 1 and word.upper() not in total_currency_list] text = text.strip() text = remove_special_characters(text) text = replace_abbrevation(text) if share_name is not None: share_name = remove_special_characters(share_name) share_name = replace_abbrevation(share_name) text_splits = text.split() if len(text_splits) == 1: text = split_words_without_space(text) else: new_splits = [] for split in text_splits: if len(split) > 1: new_splits.extend(split_words_without_space(split).split()) else: new_splits.append(split) lower_new_splits = [split.lower() for split in new_splits] for word in common_word_list: if word not in lower_new_splits: # remove word in fund_name_list for i in range(len(copy_name_list)): temp_splits = copy_name_list[i].split() copy_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() != word]) final_splits = [] for split in new_splits: if split.lower() not in ['fund', "funds", 'portfolio', 'class', 'classes', 'share', 'shares']: final_splits.append(split) text = ' '.join(final_splits) copy_share_name_list = get_share_part_list(copy_name_list) for i in range(len(copy_name_list)): temp_splits = copy_name_list[i].split() copy_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() not in ['fund', "funds", 'portfolio', 'class', 'classes', 'share', 'shares']]) max_similarity = 0 max_similarity_full_name = None text = remove_special_characters(text) if matching_type == "share": text, share_name, copy_name_list = update_for_currency(text, share_name, copy_name_list) text_currency = None text_feature = None text_share_short_name = None if matching_type == "share" and text is not None and len(text.strip()) > 0: if process_cache is not None and isinstance(process_cache, dict): if process_cache.get(text, None) is not None: cache = process_cache.get(text) text_share_short_name = cache.get("share_short_name") text_feature = cache.get("share_feature") text_currency = cache.get("share_currency") else: if share_name is not None and len(share_name.strip()) > 0: text_share_short_name = get_share_short_name_from_text(share_name) text_feature = get_share_feature_from_text(share_name) text_currency = get_currency_from_text(share_name) else: text_share_short_name = get_share_short_name_from_text(text) text_feature = get_share_feature_from_text(text) text_currency = get_currency_from_text(text) process_cache[text] = { "share_short_name": text_share_short_name, "share_feature": text_feature, "share_currency": text_currency } else: text_share_short_name = get_share_short_name_from_text(share_name) text_feature = get_share_feature_from_text(share_name) text_currency = get_currency_from_text(share_name) # logger.info(f"Source text: {text}, candidate names count: {len(copy_name_list)}") same_max_similarity_name_list = [] for full_name, copy_name, copy_share_name in zip(name_list , copy_name_list, copy_share_name_list): copy_name = remove_special_characters(copy_name) copy_name = split_words_without_space(copy_name) similarity = get_jacard_similarity(text, copy_name, need_remove_numeric_characters=False) copy_name_2 = replace_abbrevation(copy_name) if copy_name != copy_name_2: similarity_2 = get_jacard_similarity(text, copy_name_2, need_remove_numeric_characters=False) if similarity_2 > similarity: similarity = similarity_2 if similarity > max_similarity: if matching_type == "share": if process_cache is not None and isinstance(process_cache, dict): if process_cache.get(copy_name, None) is not None: cache = process_cache.get(copy_name) copy_name_short_name = cache.get("share_short_name") copy_name_feature = cache.get("share_feature") copy_name_currency = cache.get("share_currency") else: copy_name_short_name = get_share_short_name_from_text(copy_share_name) copy_name_feature = get_share_feature_from_text(copy_share_name) copy_name_currency = get_currency_from_text(copy_share_name) process_cache[copy_name] = { "share_short_name": copy_name_short_name, "share_feature": copy_name_feature, "share_currency": copy_name_currency } else: copy_name_short_name = get_share_short_name_from_text(copy_share_name) copy_name_feature = get_share_feature_from_text(copy_share_name) copy_name_currency = get_currency_from_text(copy_share_name) if text_currency is not None and len(text_currency) > 0 and \ copy_name_currency is not None and len(copy_name_currency) > 0: if text_currency != copy_name_currency: continue if text_feature is not None and len(text_feature) > 0 and \ copy_name_feature is not None and len(copy_name_feature) > 0: if text_feature != copy_name_feature: if copy_name_feature.lower() not in text.lower().split(): continue if matching_type == "share": if text_share_short_name is not None and len(text_share_short_name) > 0 and \ copy_name_short_name is not None and len(copy_name_short_name) > 0: if text_share_short_name != copy_name_short_name: continue max_similarity = similarity max_similarity_full_name = full_name same_max_similarity_name_list = [] elif matching_type == "fund" and max_similarity > 0 and max_similarity == similarity: if full_name is not None and max_similarity_full_name is not None and \ len(full_name.split()) > len(max_similarity_full_name.split()): max_similarity_full_name = full_name same_max_similarity_name_list = [] else: if full_name is not None: same_max_similarity_name_list.append(full_name) if max_similarity == 1: break # if there are multiple names with the same similarity, return None if len(same_max_similarity_name_list) > 0: return None, 0.0 if max_similarity < 0.35: return None, max_similarity return max_similarity_full_name, max_similarity except Exception as e: print(e) print_exc() return None, 0.0 def get_share_part_list(text_list: list): share_part_list = [] for text in text_list: text_split = text.split("Fund") if len(text_split) == 1: text_split = text.split("funds") if len(text_split) == 1: text_split = text.split("Portfolio") if len(text_split) == 1: text_split = text.split("Bond") if len(text_split) == 1: text_split = text.split("Bonds") if len(text_split) > 1: share_part_list.append(text_split[-1].strip()) else: share_part_list.append(text) return share_part_list def get_share_short_name_from_text(text: str): if text is None or len(text.strip()) == 0: return None text = remove_special_characters(text.strip()) text_split = text.split() temp_share_features = [feature.lower() for feature in share_features_full_name] count = 0 for split in text_split[::-1]: if count == 4: break if split.lower() not in temp_share_features and \ split not in total_currency_list: if len(split) <= 3 and split.upper() == split: return split.upper() count += 1 return None def get_share_feature_from_text(text: str): if text is None or len(text.strip()) == 0: return None text = text.strip() text = text.lower() text_split = text.split() temp_share_features = [feature.lower() for feature in share_features_full_name] count = 0 for split in text_split[::-1]: if count == 4: break if split.lower() in temp_share_features: return split count += 1 return None def get_currency_from_text(text: str): if text is None or len(text.strip()) == 0: return None text = text.strip() text = text.lower() text_split = text.split() count = 0 for split in text_split[::-1]: if count == 4: break if split.upper() in total_currency_list: return split count += 1 return None def update_for_currency(text: str, share_name: str, compare_list: list): text_split = text.split() with_currency = False for split in text_split: if split.upper() in total_currency_list: with_currency = True break with_currency_list = [] without_currency_list = [] for index, compare in enumerate(compare_list): compare_split = compare.split() with_currency_compare = False for split in compare_split: if split.upper() in total_currency_list: with_currency_compare = True break if with_currency_compare: with_currency_list.append(index) else: without_currency_list.append(index) if not with_currency and len(with_currency_list) == 0: return text, share_name, compare_list elif not with_currency and len(with_currency_list) > 0: last_split = text_split[-1] updated = False if len(last_split) < 4 and last_split.upper() == last_split: if len(without_currency_list) > 0: for index in without_currency_list: if last_split in compare_list[index].split(): text = text + ' ' + 'USD' if share_name is not None: share_name = share_name + ' ' + 'USD' updated = True break if not updated: currency_list = [] for index in with_currency_list: compare_split = compare_list[index].split() if last_split in compare_split: current_currency_list = [split for split in compare_split if split.upper() in total_currency_list] if len(current_currency_list) > 0: currency_list.append(current_currency_list[-1]) if len(currency_list) == 1: text = text + ' ' + currency_list[0] if share_name is not None: share_name = share_name + ' ' + currency_list[0] updated = True for index in without_currency_list: compare_list[index] = compare_list[index] + ' ' + 'USD' if not updated: text = text + ' ' + 'USD' if share_name is not None: share_name = share_name + ' ' + 'USD' return text, share_name, compare_list elif with_currency and len(without_currency_list) == 0: for index in without_currency_list: compare_list[index] = compare_list[index] + ' ' + 'USD' return text, share_name, compare_list else: return text, share_name, compare_list def remove_common_word(text_list: list): if text_list is None or len(text_list) == 0: return text_list new_text_list = [] for text in text_list: text = text.lower() text = remove_special_characters(text) text_splits = text.split() while 'fund' in text_splits: text_splits.remove('fund') while 'portfolio' in text_splits: text_splits.remove('portfolio') while 'share' in text_splits: text_splits.remove('share') while 'class' in text_splits: text_splits.remove('class') text = ' '.join(text_splits) new_text_list.append(text) # remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words # the result is ['Global', 'Growth'] common_word_list = [] new_text_splits_list = [text.split() for text in new_text_list] for i in range(len(new_text_splits_list)): for j in range(i+1, len(new_text_splits_list)): if common_word_list is None or len(common_word_list) == 0: common_word_list = list( set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j]))) else: common_word_list = list( set(common_word_list).intersection(set(new_text_splits_list[j]))) remove_list = [] # if exists the share name and currency name, remove from the list for word in common_word_list: if word.upper() in total_currency_list: remove_list.append(word) for remove in remove_list: if remove in common_word_list: common_word_list.remove(remove) common_word_list = list(set(common_word_list)) for i in range(len(new_text_splits_list)): for common_word in common_word_list: if common_word in new_text_splits_list[i]: new_text_splits_list[i].remove(common_word) new_text_list = [' '.join(text_splits) for text_splits in new_text_splits_list] return new_text_list, common_word_list def split_words_without_space(text: str): """ Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund' """ if text is None or len(text.strip()) == 0: return [] text = text.strip() # splits = text.split() # if len(splits) > 1: # return text # find all words with capital letter + lower letter regex = r"[A-Z][a-z]+" regex2 = r"[A-Z]{2,}[a-z]+" word_list = re.findall(regex, text) word_list2 = re.findall(regex2, text) if len(word_list) > 0: for word in word_list: if len(word_list2) > 0: word_exists_in_word2 = False for word2 in word_list2: if word in word2: word_exists_in_word2 = True break if word_exists_in_word2: continue text = text.replace(word, " " + word + " ") text = re.sub(r"(\s)+", " ", text) return text.strip() def remove_special_characters(text): text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_unique_words_text(text): text = remove_special_characters(text) text = text.lower() text_split = text.split() text_split = list(set(text_split)) # sort the list text_split.sort() return_text = ' '.join(text_split) return return_text def remove_numeric_characters(text): # remove numeric characters text = re.sub(r'\d+', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_jacard_similarity(text_left, text_right, need_remove_special_characters=True, need_remove_numeric_characters=True): if need_remove_special_characters: text_left = remove_special_characters(text_left) text_right = remove_special_characters(text_right) if need_remove_numeric_characters: text_left = remove_numeric_characters(text_left) text_right = remove_numeric_characters(text_right) text_left = text_left.lower() text_right = text_right.lower() text_left = text_left.split() text_right = text_right.split() intersection = set(text_left).intersection(set(text_right)) union = set(text_left).union(set(text_right)) if len(union) > 0: return round(len(intersection) / len(union), 3) else: return 0 def simple_most_similarity_name(text: str, name_list: list): if text is None or len(text.strip()) == 0 or \ name_list is None or len(name_list) == 0: return None, 0.0 max_similarity = 0 max_similarity_name = None for full_name in name_list: similarity = get_jacard_similarity(text, full_name) if similarity > max_similarity: max_similarity = similarity max_similarity_name = full_name if max_similarity == 1: break return max_similarity_name, max_similarity def get_beginning_common_words(text_list: list): """ Get the beginning common words in text_list """ if text_list is None or len(text_list) < 2: return [] common_words_list = [] first_text_split = text_list[0].split() for w_i, word in enumerate(first_text_split): all_same = True for text in text_list[1:]: text_split = text.split() if w_i >= len(text_split): all_same = False break if text_split[w_i] != word: all_same = False break if all_same: common_words_list.append(word) else: break return ' '.join(common_words_list).strip() def replace_abbrevation(text: str): if text is None or len(text.strip()) == 0: return text text = text.strip() if 'swiss franc' in text.lower(): text = re.sub(r'swiss\s+franc', 'CHF', text, flags=re.IGNORECASE) elif 'us dollar' in text.lower(): text = re.sub(r'us\s+dollar', 'USD', text, flags=re.IGNORECASE) elif 'singapore dollar' in text.lower(): text = re.sub(r'singapore\s+dollar', 'SGD', text, flags=re.IGNORECASE) elif 'hong kong dollar' in text.lower(): text = re.sub(r'hong\s+kong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'hongkong dollar' in text.lower(): text = re.sub(r'hongkong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'australian dollar' in text.lower(): text = re.sub(r'australian\s+dollar', 'AUD', text, flags=re.IGNORECASE) elif 'japanese yen' in text.lower(): text = re.sub(r'japanese\s+yen', 'JPY', text, flags=re.IGNORECASE) elif 'south african rand' in text.lower(): text = re.sub(r'South\s+African\s+rand', 'ZAR', text, flags=re.IGNORECASE) elif 'canadian dollar' in text.lower(): text = re.sub(r'canadian\s+dollar', 'CAD', text, flags=re.IGNORECASE) elif 'new zealand dollar' in text.lower(): text = re.sub(r'new\s+zealand\s+dollar', 'NZD', text, flags=re.IGNORECASE) elif 'norwegian krone' in text.lower(): text = re.sub(r'norwegian\s+krone', 'NOK', text, flags=re.IGNORECASE) elif 'danish krone' in text.lower(): text = re.sub(r'danish\s+krone', 'DKK', text, flags=re.IGNORECASE) elif 'swedish krona' in text.lower(): text = re.sub(r'swedish\s+krona', 'SEK', text, flags=re.IGNORECASE) elif 'swedish kronor' in text.lower(): text = re.sub(r'swedish\s+kronor', 'SEK', text, flags=re.IGNORECASE) elif "GPB" in text.split(): text = re.sub(r"GPB", "GBP", text, flags=re.IGNORECASE) elif 'sterling' in text.lower().split(): text = re.sub(r'sterling', 'GBP', text, flags=re.IGNORECASE) elif 'euro' in text.lower().split(): text = re.sub(r'euro', 'EUR', text, flags=re.IGNORECASE) elif '€' in text.lower().split(): text = re.sub(r'\€', 'EUR', text, flags=re.IGNORECASE) elif '$' in text.lower().split(): text = re.sub(r'\$', 'USD', text, flags=re.IGNORECASE) elif '£' in text.lower().split(): text = re.sub(r'\£', 'GBP', text, flags=re.IGNORECASE) elif 'RMB' in text.split(): text = re.sub(r'RMB', 'CNY', text, flags=re.IGNORECASE) else: pass text_splits = text.split() new_text_splits = [] for split in text_splits: if split.lower() in ['acc', 'acc.']: new_text_splits.append('Accumulation') elif split.lower() in ['inc', 'inc.']: new_text_splits.append('Income') elif split.lower() in ['dist', 'dist.']: new_text_splits.append('Distribution') elif split.lower() in ['inv', 'inv.']: new_text_splits.append('Investor') elif split.lower() in ['inst', 'inst.', 'institution']: new_text_splits.append('Institutional') elif split.lower() in ['cap', 'cap.']: new_text_splits.append('Capitalisation') elif split.lower() in ['div', 'div.']: new_text_splits.append('Dividend') elif split.lower() in ['adm', 'adm.']: new_text_splits.append('Admin') elif split.lower() in ['adv', 'adv.']: new_text_splits.append('Advantage') elif split.lower() in ['hdg', 'hgd', 'hdg.', 'hgd.', '(h)']: new_text_splits.append('Hedged') elif split.lower() in ['unhgd']: split = "" elif split.lower() in ['cl', 'cl.']: new_text_splits.append('Class') elif split.lower() in ['ser', 'ser.']: new_text_splits.append('Series') elif split.lower() in ['u.s.']: new_text_splits.append('US') elif split.lower() in ['nc', 'nc.']: new_text_splits.append('no trail') else: new_text_splits.append(split) new_text = ' '.join(new_text_splits) return new_text