import re from copy import deepcopy def add_slash_to_text_as_regex(text: str): if text is None or len(text) == 0: return text special_char_iter = re.finditer("\W", text) for special_iter in special_char_iter: if len(special_iter.group().strip()) == 0: continue replace = r"\{0}".format(special_iter.group()) if replace not in text: text = re.sub(replace, replace, text) text = re.sub(r"\s+", r"\\s+", text) return text def clean_text(text: str) -> str: # text = text.lower() # update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text) text = re.sub(r"( ){2,}", ' ', text.strip()) return text def get_most_similar_name(text: str, name_list: list, pre_common_word_list: list = None) -> str: """ Get the most similar fund name from fund_name_list by jacard similarity """ try: copy_fund_name_list = deepcopy(name_list) if text is None or len(text.split()) == 0 or \ copy_fund_name_list is None or len(copy_fund_name_list) == 0: return None, None copy_fund_name_list = [replace_abbrevation(copy_fund_name) for copy_fund_name in copy_fund_name_list] # get common words in fund_name_list common_word_list = [] if len(name_list) > 1: _, common_word_list = remove_common_word(copy_fund_name_list) if pre_common_word_list is not None and len(pre_common_word_list) > 0: common_word_list.extend([word for word in pre_common_word_list if word not in common_word_list]) text = text.strip() text = remove_special_characters(text) text = replace_abbrevation(text) text_splits = text.split() if len(text_splits) == 1: text = split_words_without_space(text) else: new_splits = [] for split in text_splits: if len(split) > 1: new_splits.extend(split_words_without_space(split).split()) else: new_splits.append(split) lower_new_splits = [split.lower() for split in new_splits] for word in common_word_list: if word not in lower_new_splits: # remove word in fund_name_list for i in range(len(copy_fund_name_list)): temp_splits = copy_fund_name_list[i].split() copy_fund_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() != word]) for i in range(len(copy_fund_name_list)): temp_splits = copy_fund_name_list[i].split() copy_fund_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() not in ['fund', 'portfolio', 'class', 'share', 'shares']]) final_splits = [] for split in new_splits: if split.lower() not in ['fund', 'portfolio', 'class', 'share', 'shares']: final_splits.append(split) text = ' '.join(final_splits) max_similarity = 0 max_similarity_fund_name = None for fund_name, copy_fund_name in zip(name_list , copy_fund_name_list): copy_fund_name = remove_special_characters(copy_fund_name) copy_fund_name = split_words_without_space(copy_fund_name) similarity = get_jacard_similarity(text, copy_fund_name, need_remove_numeric_characters=False) if similarity > max_similarity: max_similarity = similarity max_similarity_fund_name = fund_name if max_similarity == 1: break if max_similarity < 0.35: return None, max_similarity return max_similarity_fund_name, max_similarity except Exception as e: print(e) return None, 0.0 def remove_common_word(text_list: list): if text_list is None or len(text_list) == 0: return text_list new_text_list = [] for text in text_list: text = text.lower() text = remove_special_characters(text) text_splits = text.split() while 'fund' in text_splits: text_splits.remove('fund') while 'portfolio' in text_splits: text_splits.remove('portfolio') while 'share' in text_splits: text_splits.remove('share') while 'class' in text_splits: text_splits.remove('class') text = ' '.join(text_splits) new_text_list.append(text) # remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words # the result is ['Global', 'Growth'] common_word_list = [] new_text_splits_list = [text.split() for text in new_text_list] for i in range(len(new_text_splits_list)): for j in range(i+1, len(new_text_splits_list)): if common_word_list is None or len(common_word_list) == 0: common_word_list = list( set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j]))) else: common_word_list = list( set(common_word_list).intersection(set(new_text_splits_list[j]))) common_word_list = list(set(common_word_list)) for i in range(len(new_text_splits_list)): for common_word in common_word_list: if common_word in new_text_splits_list[i]: new_text_splits_list[i].remove(common_word) new_text_list = [' '.join(text_splits) for text_splits in new_text_splits_list] return new_text_list, common_word_list def split_words_without_space(text: str): """ Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund' """ if text is None or len(text.strip()) == 0: return [] text = text.strip() # splits = text.split() # if len(splits) > 1: # return text # find all words with capital letter + lower letter regex = r'[A-Z][a-z]+' word_list = re.findall(regex, text) if len(word_list) > 0: for word in word_list: text = text.replace(word, ' ' + word + ' ') text = re.sub(r'(\s)+', ' ', text) return text.strip() def remove_special_characters(text): text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_unique_words_text(text): text = remove_special_characters(text) text = text.lower() text_split = text.split() text_split = list(set(text_split)) # sort the list text_split.sort() return_text = ' '.join(text_split) return return_text def remove_numeric_characters(text): # remove numeric characters text = re.sub(r'\d+', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_jacard_similarity(text_left, text_right, need_remove_special_characters=True, need_remove_numeric_characters=True): if need_remove_special_characters: text_left = remove_special_characters(text_left) text_right = remove_special_characters(text_right) if need_remove_numeric_characters: text_left = remove_numeric_characters(text_left) text_right = remove_numeric_characters(text_right) text_left = text_left.lower() text_right = text_right.lower() text_left = text_left.split() text_right = text_right.split() intersection = set(text_left).intersection(set(text_right)) union = set(text_left).union(set(text_right)) if len(union) > 0: return round(len(intersection) / len(union), 3) else: return 0 def get_beginning_common_words(text_list: list): """ Get the beginning common words in text_list """ if text_list is None or len(text_list) < 2: return [] common_words_list = [] first_text_split = text_list[0].split() for w_i, word in enumerate(first_text_split): all_same = True for text in text_list[1:]: text_split = text.split() if w_i >= len(text_split): all_same = False break if text_split[w_i] != word: all_same = False break if all_same: common_words_list.append(word) else: break return ' '.join(common_words_list).strip() def replace_abbrevation(text: str): if text is None or len(text.strip()) == 0: return text text = text.strip() if 'swiss franc' in text.lower(): text = re.sub(r'swiss\s+franc', 'CHF', text, flags=re.IGNORECASE) elif 'us dollar' in text.lower(): text = re.sub(r'us\s+dollar', 'USD', text, flags=re.IGNORECASE) elif 'singapore dollar' in text.lower(): text = re.sub(r'singapore\s+dollar', 'SGD', text, flags=re.IGNORECASE) elif 'hong kong dollar' in text.lower(): text = re.sub(r'hong\s+kong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'hongkong dollar' in text.lower(): text = re.sub(r'hongkong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'australian dollar' in text.lower(): text = re.sub(r'australian\s+dollar', 'AUD', text, flags=re.IGNORECASE) elif 'japanese yen' in text.lower(): text = re.sub(r'japanese\s+yen', 'JPY', text, flags=re.IGNORECASE) elif 'south african rand' in text.lower(): text = re.sub(r'South\s+African\s+rand', 'ZAR', text, flags=re.IGNORECASE) elif 'canadian dollar' in text.lower(): text = re.sub(r'canadian\s+dollar', 'CAD', text, flags=re.IGNORECASE) elif 'new zealand dollar' in text.lower(): text = re.sub(r'new\s+zealand\s+dollar', 'NZD', text, flags=re.IGNORECASE) elif 'norwegian krone' in text.lower(): text = re.sub(r'norwegian\s+krone', 'NOK', text, flags=re.IGNORECASE) elif 'danish krone' in text.lower(): text = re.sub(r'danish\s+krone', 'DKK', text, flags=re.IGNORECASE) elif 'swedish krona' in text.lower(): text = re.sub(r'swedish\s+krona', 'SEK', text, flags=re.IGNORECASE) elif 'swedish kronor' in text.lower(): text = re.sub(r'swedish\s+kronor', 'SEK', text, flags=re.IGNORECASE) elif 'sterling' in text.lower().split(): text = re.sub(r'sterling', 'GBP', text, flags=re.IGNORECASE) elif 'euro' in text.lower().split(): text = re.sub(r'euro', 'EUR', text, flags=re.IGNORECASE) elif '€' in text.lower().split(): text = re.sub(r'\€', 'EUR', text, flags=re.IGNORECASE) elif 'RMB' in text.lower().split(): text = re.sub(r'RMB', 'CNY', text, flags=re.IGNORECASE) else: pass text_splits = text.split() new_text_splits = [] for split in text_splits: if split.lower() in ['acc', 'acc.']: new_text_splits.append('Accumulation') elif split.lower() in ['inc', 'inc.']: new_text_splits.append('Income') elif split.lower() in ['dist', 'dist.']: new_text_splits.append('Distribution') elif split.lower() in ['inv', 'inv.']: new_text_splits.append('Investor') elif split.lower() in ['inst', 'inst.', 'institution']: new_text_splits.append('Institutional') elif split.lower() in ['cap', 'cap.']: new_text_splits.append('Capitalisation') elif split.lower() in ['adm', 'adm.']: new_text_splits.append('Admin') elif split.lower() in ['adv', 'adv.']: new_text_splits.append('Advantage') elif split.lower() in ['hdg', 'hgd', 'hdg.', 'hgd.', '(h)']: new_text_splits.append('Hedged') elif split.lower() in ['cl', 'cl.']: new_text_splits.append('Class') elif split.lower() in ['ser', 'ser.']: new_text_splits.append('Series') elif split.lower() in ['u.s.']: new_text_splits.append('US') elif split.lower() in ['nc', 'nc.']: new_text_splits.append('no trail') else: new_text_splits.append(split) new_text = ' '.join(new_text_splits) return new_text