import re import os import time from utils.logger import logger from copy import deepcopy from traceback import print_exc total_currency_list = [ "USD", "EUR", "AUD", "JPY", "CHF", "GBP", "SEK", "CNY", "NZD", "CNH", "NOK", "SGD", "HKD", "ZAR", "PLN", "CAD", "CZK", "HUF", "DKK", "BRL", "SKK", "RON", "TRY", "BGN", "CUP", "MXN", "CLF", "XCD", "ISK", "IDR", "MNT", "AED", "AFN", "INR", "ESP", "RUB", "CLP", "KRW", "ETB", "DZD", "XEU", "XFO", ] share_features_full_name = ['Accumulation', 'Income', 'Distribution', 'Investor', 'Institutional', 'Admin', 'Advantage'] share_features_abbrevation = ['Acc', 'Inc', 'Dist', 'Div', 'Inv', 'Inst', 'Adm', 'Adv'] lower_pre_fix_fund_share = ['fund', "funds", 'portfolio', 'bond', 'bonds', 'class', 'classes', 'share', 'shares'] def add_slash_to_text_as_regex(text: str): if text is None or len(text) == 0: return text special_char_iter = re.finditer("\W", text) for special_iter in special_char_iter: if len(special_iter.group().strip()) == 0: continue replace = r"\{0}".format(special_iter.group()) if replace not in text: text = re.sub(replace, r"\\W*", text) text = re.sub(r"( ){2,}", " ", text) text = text.replace(" ", r"\s*") return text def clean_text(text: str) -> str: # text = text.lower() # update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text) text = re.sub(r"( ){2,}", ' ', text.strip()) return text def get_most_similar_name(text: str, name_list: list, share_name: str = None, fund_name: str = None, matching_type="share", pre_common_word_list: list = None, process_cache: dict = None) -> str: """ Get the most similar fund name from fund_name_list by jacard similarity """ try: copy_name_list = deepcopy(name_list) if text is None or len(text.split()) == 0 or \ copy_name_list is None or len(copy_name_list) == 0: return None, None for i in range(len(copy_name_list)): copy_name = copy_name_list[i] if matching_type == "share": copy_name, _ = replace_share_name_for_multilingual(copy_name, None) share_part = get_share_part_list([copy_name])[0] if '-' in share_part: copy_name = copy_name.replace('-', ' ') copy_name = replace_abbrevation(copy_name) copy_name_list[i] = copy_name # get common words in fund_name_list common_word_list = [] if len(name_list) > 1: _, common_word_list = remove_common_word(copy_name_list) if pre_common_word_list is not None and len(pre_common_word_list) > 0: common_word_list.extend([word for word in pre_common_word_list if word not in common_word_list]) if len(common_word_list) > 0: common_word_list = [word for word in common_word_list if len(word) > 1 and word.upper() not in total_currency_list] text = text.strip() text = remove_special_characters(text) text = replace_abbrevation(text) raw_fund_name_split = [] if fund_name is not None and len(fund_name.strip()) > 0: fund_name = fund_name.strip() fund_name = remove_special_characters(fund_name) raw_fund_name_split = fund_name.upper().split() if share_name is not None: share_name = remove_special_characters(share_name) share_name = replace_abbrevation(share_name) text, share_name = replace_share_name_for_multilingual(text, share_name) if matching_type == "share" and share_name is None: text, share_name = replace_share_name_for_multilingual(text, None) text_splits = text.split() if len(text_splits) == 1: text = split_words_without_space(text) else: new_splits = [] for split in text_splits: if len(split) > 1: new_splits.extend(split_words_without_space(split).split()) else: new_splits.append(split) text = ' '.join(new_splits) lower_new_splits = [split.lower() for split in new_splits] for word in common_word_list: if word not in lower_new_splits: # remove word in fund_name_list for i in range(len(copy_name_list)): temp_splits = copy_name_list[i].split() copy_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() != word]) max_similarity = 0 max_similarity_full_name = None text = remove_special_characters(text) if matching_type == "share": text, share_name, copy_name_list = update_for_currency(text, share_name, copy_name_list) text = ' '.join([split for split in text.split() if split.lower() not in lower_pre_fix_fund_share]) if share_name is not None: share_name = ' '.join([split for split in share_name.split() if split.lower() not in lower_pre_fix_fund_share]) copy_share_name_list = get_share_part_list(copy_name_list) for i in range(len(copy_name_list)): temp_splits = copy_name_list[i].split() copy_name_list[i] = ' '.join([split for split in temp_splits if remove_special_characters(split).lower() not in lower_pre_fix_fund_share]) text_currency = None text_feature = None text_share_short_name_list = None if matching_type == "share" and text is not None and len(text.strip()) > 0: if process_cache is not None and isinstance(process_cache, dict): if process_cache.get(text, None) is not None: cache = process_cache.get(text) text_share_short_name_list = cache.get("share_short_name") text_feature = cache.get("share_feature") text_currency = cache.get("share_currency") else: if share_name is not None and len(share_name.strip()) > 0: text_share_short_name_list = get_share_short_name_from_text(share_name, confirm_text_share=True) text_feature = get_share_feature_from_text(share_name) text_currency = get_currency_from_text(share_name) else: text_share_short_name_list = get_share_short_name_from_text(text, confirm_text_share=True) text_feature = get_share_feature_from_text(text) text_currency = get_currency_from_text(text) # sort text_share_short_name_list text_share_short_name_list.sort() process_cache[text] = { "share_short_name": text_share_short_name_list, "share_feature": text_feature, "share_currency": text_currency } else: if share_name is not None and len(share_name.strip()) > 0: text_share_short_name_list = get_share_short_name_from_text(share_name, confirm_text_share=True) text_share_short_name_list.sort() text_feature = get_share_feature_from_text(share_name) text_currency = get_currency_from_text(share_name) else: text_share_short_name_list = get_share_short_name_from_text(text, confirm_text_share=True) text_feature = get_share_feature_from_text(text) text_currency = get_currency_from_text(text) # logger.info(f"Source text: {text}, candidate names count: {len(copy_name_list)}") same_max_similarity_name_list = [] for full_name, copy_name, copy_share_name in zip(name_list , copy_name_list, copy_share_name_list): if not isinstance(copy_name, str) or len(copy_name.strip()) == 0: continue copy_name = remove_special_characters(copy_name) copy_name = split_words_without_space(copy_name) copy_name_short_name_list = None copy_name_feature = None copy_name_currency = None if matching_type == "share": if process_cache is not None and isinstance(process_cache, dict): if process_cache.get(copy_name, None) is not None: cache = process_cache.get(copy_name) copy_name_short_name_list = cache.get("share_short_name") copy_name_feature = cache.get("share_feature") copy_name_currency = cache.get("share_currency") else: copy_name_short_name_list = get_share_short_name_from_text(copy_share_name) if copy_name_short_name_list is not None: copy_name_short_name_list.sort() copy_name_feature = get_share_feature_from_text(copy_share_name) copy_name_currency = get_currency_from_text(copy_share_name) process_cache[copy_name] = { "share_short_name": copy_name_short_name_list, "share_feature": copy_name_feature, "share_currency": copy_name_currency } else: copy_name_short_name_list = get_share_short_name_from_text(copy_share_name) copy_name_short_name_list.sort() copy_name_feature = get_share_feature_from_text(copy_share_name) copy_name_currency = get_currency_from_text(copy_share_name) try: if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \ copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0: updated_text_share_short_name_list, updated_copy_name_short_name_list = \ compare_both_short_name(text_share_short_name_list, copy_name_short_name_list) if updated_text_share_short_name_list != text_share_short_name_list: text = ' '.join([split for split in text.split() if split not in text_share_short_name_list]) text += ' ' + ' '.join(updated_text_share_short_name_list) text_share_short_name_list = updated_text_share_short_name_list if updated_copy_name_short_name_list != copy_name_short_name_list: copy_name = ' '.join([split for split in copy_name.split() if split not in copy_name_short_name_list]) copy_name += ' ' + ' '.join(updated_copy_name_short_name_list) copy_name_short_name_list = updated_copy_name_short_name_list except Exception as e: print(e) compare_text = text try: text_split = text.split() text_split_lower = text.lower().split() copy_name_split_lower = copy_name.lower().split() if copy_name_feature == "accumulation" and \ (text_feature is None or len(text_feature) == 0 or text_feature in ["capitalisation", "institutional"] or "capitalisation" in text_split_lower or "institutional" in text_split_lower): if "capitalisation" not in copy_name_split_lower: compare_text = " ".join([split for split in text_split if split.lower() not in ["cap", "cap.", "capitalisation"]]) text_split = compare_text.split() if "institutional" not in copy_name_split_lower: compare_text = " ".join([split for split in text_split if split.lower() not in ["inst", "inst.", "institutional"]]) text_split = compare_text.split() if text_feature is not None and len(text_feature) > 0: compare_text = " ".join([split for split in text_split if split.lower() != text_feature]) compare_text += " accumulation" text_feature = "accumulation" elif copy_name_feature == "income" and \ (text_feature is None or len(text_feature) == 0 or text_feature == "distribution"): if "dist" in text_split_lower or "dist." in text_split_lower or "distribution" in text_split_lower: compare_text = " ".join([split for split in text_split if split.lower() not in ["dist", "dist.", "distribution"]]) compare_text += " income" text_feature = "income" else: pass similarity = get_jacard_similarity(compare_text, copy_name, need_remove_numeric_characters=False) except Exception as e: print(e) print_exc() similarity = 0 if similarity == 1: return full_name, similarity copy_name_2 = replace_abbrevation(copy_name) if copy_name != copy_name_2: similarity_2 = get_jacard_similarity(compare_text, copy_name_2, need_remove_numeric_characters=False) if similarity_2 > similarity: similarity = similarity_2 if similarity > max_similarity: if matching_type == "share": if text_currency is not None and len(text_currency) > 0 and \ copy_name_currency is not None and len(copy_name_currency) > 0: if text_currency != copy_name_currency: continue if text_feature is not None and len(text_feature) > 0 and \ copy_name_feature is not None and len(copy_name_feature) > 0: if text_feature != copy_name_feature: if text_feature.lower() not in copy_name.lower().split() and \ copy_name_feature.lower() != "accmulation" and \ copy_name_feature.lower() not in compare_text.lower().split(): continue if matching_type == "share": if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \ copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0: short_name_invalid = False for short in text_share_short_name_list: if short not in copy_name_short_name_list: short_name_invalid = True break for compare_short in copy_name_short_name_list: if compare_short not in text_share_short_name_list: # some short word is in fund name, but not belong to share name if compare_short.upper() not in raw_fund_name_split: short_name_invalid = True break if short_name_invalid: continue max_similarity = similarity max_similarity_full_name = full_name same_max_similarity_name_list = [] elif matching_type == "fund" and max_similarity > 0 and max_similarity == similarity: if full_name is not None and max_similarity_full_name is not None and \ len(full_name.split()) > len(max_similarity_full_name.split()): max_similarity_full_name = full_name same_max_similarity_name_list = [] else: if full_name is not None: same_max_similarity_name_list.append(full_name) if max_similarity == 1: break # if there are multiple names with the same similarity, return None if len(same_max_similarity_name_list) > 0: return None, 0.0 if max_similarity < 0.35: return None, max_similarity return max_similarity_full_name, max_similarity except Exception as e: print(e) print_exc() return None, 0.0 def replace_share_name_for_multilingual(text: str, share_name: str): if text is None or len(text.strip()) == 0: return text, share_name multilingual_share_list = ["Catégorie de parts", "Classe di quote", "Kategorie Anteile", "Kategorie anteile", "Clase de participaciones", "Aandelenklasse", "aandelenklasse", "Anteilklasse", "anteilklasse", "Aktien", "Aktienklasse", "aktien", "aktienklasse", "Klasse"] for multilingual_share in multilingual_share_list: if multilingual_share in text: text = text.replace(multilingual_share, "Class") if share_name is not None and len(share_name.strip()) > 0: share_name = share_name.replace(multilingual_share, "Class") break return text, share_name def compare_both_short_name(text_short_name_list: list, compare_short_name_list: list): copy_text_short_name_list = deepcopy(text_short_name_list) copy_compare_short_name_list = deepcopy(compare_short_name_list) copy_text_short_name_list = verify_short_name_container(copy_text_short_name_list, copy_compare_short_name_list) copy_compare_short_name_list = verify_short_name_container(copy_compare_short_name_list, copy_text_short_name_list) return copy_text_short_name_list, copy_compare_short_name_list def verify_short_name_container(left_short_name_list: list, right_short_name_list: list): length_1_over_1 = False length_1_count = 0 length_1_list = [] for short_name in left_short_name_list: if len(short_name) == 1: length_1_count += 1 length_1_list.append(short_name) if length_1_count > 1: length_1_over_1 = True if length_1_over_1: for compare_short_name in right_short_name_list: if len(compare_short_name) == length_1_count: all_in = True for short_name in length_1_list: if short_name not in compare_short_name: all_in = False break if all_in: for short_name in length_1_list: if short_name in left_short_name_list: left_short_name_list.remove(short_name) left_short_name_list.append(compare_short_name) return left_short_name_list def get_share_part_list(text_list: list): share_part_list = [] for text in text_list: text_split = text.split("Funds") if len(text_split) == 1: text_split = text.split("Fund") if len(text_split) == 1: text_split = text.split("Portfolio") if len(text_split) == 1: text_split = text.split("Bonds") if len(text_split) == 1: text_split = text.split("Bond") if len(text_split) > 1: share_part_text = text_split[-1].strip() else: share_part_text = text.strip() share_part_text = ' '.join([split for split in share_part_text.split() if remove_special_characters(split).lower() not in lower_pre_fix_fund_share]) share_part_list.append(share_part_text) return share_part_list def get_share_short_name_from_text(text: str, confirm_text_share: bool = False): if text is None or len(text.strip()) == 0: return None text = remove_special_characters(text.strip()) text_split = text.split() temp_share_features = [feature.lower() for feature in share_features_full_name] count = 0 share_short_name_list = [] if confirm_text_share: count_threshold = 6 else: count_threshold = 4 for split in text_split[::-1]: if count == count_threshold: break if split.lower() not in temp_share_features and \ split.upper() not in total_currency_list: if len(split) <= 3: share_short_name_list.append(split.upper()) count += 1 if len(share_short_name_list) > 1: remove_number = [] for short_name in share_short_name_list[::-1]: if short_name.isdigit(): remove_number.append(short_name) else: break for remove in remove_number: if remove in share_short_name_list: share_short_name_list.remove(remove) return share_short_name_list def get_share_feature_from_text(text: str): if text is None or len(text.strip()) == 0: return None text = text.strip() text = text.lower() text_split = text.split() temp_share_features = [feature.lower() for feature in share_features_full_name] count = 0 for split in text_split[::-1]: if count == 4: break if split.lower() in temp_share_features: return split count += 1 return None def get_currency_from_text(text: str): if text is None or len(text.strip()) == 0: return None text = text.strip() text_split = text.split() count = 0 currency_list = [] for split in text_split[::-1]: if count == 4: break if split.upper() in total_currency_list: currency_list.append(split.upper()) count += 1 if len(currency_list) > 1: # remove the first currency from currency list if currency_list[0] in ['USD', 'EUR']: currency_list.pop(0) else: remove_currency = None for currency in currency_list: if currency in ['USD', 'EUR']: remove_currency = currency break if remove_currency is not None: currency_list.remove(remove_currency) return currency_list[0] elif len(currency_list) == 1: return currency_list[0] else: return None def update_for_currency(text: str, share_name: str, compare_list: list): try: currency_in_text = get_currency_from_text(text) with_currency = False if currency_in_text is not None: with_currency = True with_currency_list = [] without_currency_list = [] for index, compare in enumerate(compare_list): # compare_split = compare.split() with_currency_compare = False currecy_in_compare = get_currency_from_text(compare) if currecy_in_compare is not None: with_currency_compare = True if with_currency_compare: with_currency_list.append(index) else: without_currency_list.append(index) if not with_currency and len(with_currency_list) == 0: pass elif not with_currency and len(with_currency_list) > 0: share_short_name_list = [] if share_name is not None and len(share_name.strip()) > 0: share_short_name_list = get_share_short_name_from_text(share_name) updated = False if len(share_short_name_list) > 0: if len(without_currency_list) > 0: for index in without_currency_list: all_in_list = True compare_split = [split.upper() for split in compare_list[index].split()] for share_shot_name in share_short_name_list: if share_shot_name not in compare_split: all_in_list = False break if all_in_list: text = text + ' ' + 'USD' if share_name is not None: share_name = share_name + ' ' + 'USD' updated = True break if not updated: currency_list = [] for index in with_currency_list: all_in_list = True compare_split = [split.upper() for split in compare_list[index].split()] for share_shot_name in share_short_name_list: if share_shot_name not in compare_split: all_in_list = False break if all_in_list: current_currency_list = [split for split in compare_split if split.upper() in total_currency_list] if len(current_currency_list) > 0: currency_list.append(current_currency_list[-1]) if len(currency_list) == 1: text = text + ' ' + currency_list[0] if share_name is not None: share_name = share_name + ' ' + currency_list[0] updated = True for index in without_currency_list: compare_list[index] = compare_list[index] + ' ' + 'USD' if not updated: text = text + ' ' + 'USD' if share_name is not None: share_name = share_name + ' ' + 'USD' # return text, share_name, compare_list elif with_currency and len(without_currency_list) == 0: for index in without_currency_list: compare_list[index] = compare_list[index] + ' ' + 'USD' # return text, share_name, compare_list else: # return text, share_name, compare_list pass default_currency = 'USD' if with_currency and share_name is not None: share_name_split = share_name.split() share_name_currency = get_currency_from_text(share_name) if share_name_currency is not None and share_name_currency in total_currency_list: for split in share_name_split: if split in total_currency_list and split != share_name_currency: default_currency = split break new_share_name = ' '.join([split for split in share_name_split if split not in total_currency_list or (split == share_name_currency)]) if share_name in text: text = text.replace(share_name, new_share_name) else: text = ' '.join([split for split in text.split() if split not in total_currency_list or (split == share_name_currency)]) share_name = new_share_name for c_i in range(len(compare_list)): compare = compare_list[c_i] compare_share_part = get_share_part_list([compare])[0] compare_share_part_split = compare_share_part.split() compare_share_part_currency_list = [] for split in compare_share_part_split: if split.upper() in total_currency_list and split.upper() not in compare_share_part_currency_list: compare_share_part_currency_list.append(split) if len(compare_share_part_currency_list) > 1 and default_currency in compare_share_part_currency_list: compare_share_part_split = [split for split in compare_share_part_split if split.upper() != default_currency] new_compare_share_part = ' '.join(compare_share_part_split) compare_list[c_i] = compare.replace(compare_share_part, new_compare_share_part) except Exception as e: logger.error(f"Error in update_for_currency: {e}") return text, share_name, compare_list def remove_common_word(text_list: list): if text_list is None or len(text_list) == 0: return text_list new_text_list = [] for text in text_list: text = text.lower() text = remove_special_characters(text) text_splits = text.split() text = ' '.join([split for split in text_splits if split.lower() not in lower_pre_fix_fund_share]) new_text_list.append(text) # remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words # the result is ['Global', 'Growth'] common_word_list = [] new_text_splits_list = [text.split() for text in new_text_list] with_common_word = False for i in range(len(new_text_splits_list)): for j in range(i+1, len(new_text_splits_list)): if common_word_list is None or len(common_word_list) == 0: common_word_list = list( set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j]))) else: common_word_list = list( set(common_word_list).intersection(set(new_text_splits_list[j]))) if len(common_word_list) > 0: with_common_word = True if with_common_word and len(common_word_list) == 0: break if with_common_word and len(common_word_list) == 0: break remove_list = [] # if exists the share name and currency name, remove from the list for word in common_word_list: if word.upper() in total_currency_list: remove_list.append(word) for remove in remove_list: if remove in common_word_list: common_word_list.remove(remove) common_word_list = list(set(common_word_list)) for i in range(len(new_text_splits_list)): for common_word in common_word_list: if common_word in new_text_splits_list[i]: new_text_splits_list[i].remove(common_word) new_text_list = [' '.join(text_splits) for text_splits in new_text_splits_list] return new_text_list, common_word_list def split_words_without_space(text: str): """ Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund' """ if text is None or len(text.strip()) == 0: return [] text = text.strip() # splits = text.split() # if len(splits) > 1: # return text # find all words with capital letter + lower letter regex = r"[A-Z][a-z]+" regex2 = r"[A-Z]{2,}[a-z]+" word_list = re.findall(regex, text) word_list2 = re.findall(regex2, text) if len(word_list) > 0: for word in word_list: if len(word_list2) > 0: word_exists_in_word2 = False for word2 in word_list2: if word in word2: word_exists_in_word2 = True break if word_exists_in_word2: continue text = text.replace(word, " " + word + " ") text = re.sub(r"(\s)+", " ", text) return text.strip() def remove_special_characters(text): text = re.sub(r'\W', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_unique_words_text(text): text = remove_special_characters(text) text = text.lower() text_split = text.split() text_split = list(set(text_split)) # sort the list text_split.sort() return_text = ' '.join(text_split) return return_text def remove_numeric_characters(text): # remove numeric characters text = re.sub(r'\d+', ' ', text) text = re.sub(r'\s+', ' ', text) text = text.strip() return text def get_jacard_similarity(text_left, text_right, need_remove_special_characters=True, need_remove_numeric_characters=True): if need_remove_special_characters: text_left = remove_special_characters(text_left) text_right = remove_special_characters(text_right) if need_remove_numeric_characters: text_left = remove_numeric_characters(text_left) text_right = remove_numeric_characters(text_right) text_left = text_left.lower() text_right = text_right.lower() text_left = text_left.split() text_right = text_right.split() intersection = set(text_left).intersection(set(text_right)) union = set(text_left).union(set(text_right)) intersection_count = len(intersection) union_count = len(union) differ_a = list(set(text_left).difference(set(text_right))) differ_a.sort() differ_b = list(set(text_right).difference(set(text_left))) differ_b.sort() if ''.join(differ_a) == ''.join(differ_b): intersection_count += len(differ_a) + len(differ_b) if union_count > 0: return round(intersection_count / union_count, 3) else: return 0 def simple_most_similarity_name(text: str, name_list: list): if text is None or len(text.strip()) == 0 or \ name_list is None or len(name_list) == 0: return None, 0.0 max_similarity = 0 max_similarity_name = None for full_name in name_list: similarity = get_jacard_similarity(text, full_name) if similarity > max_similarity: max_similarity = similarity max_similarity_name = full_name if max_similarity == 1: break return max_similarity_name, max_similarity def get_beginning_common_words(text_list: list): """ Get the beginning common words in text_list """ if text_list is None or len(text_list) < 2: return [] common_words_list = [] first_text_split = text_list[0].split() for w_i, word in enumerate(first_text_split): all_same = True for text in text_list[1:]: text_split = text.split() if w_i >= len(text_split): all_same = False break if text_split[w_i] != word: all_same = False break if all_same: common_words_list.append(word) else: break return ' '.join(common_words_list).strip() def replace_abbrevation(text: str): if text is None or len(text.strip()) == 0: return text text = text.replace('(', ' ').replace(')', ' ').replace('-', ' ') text = re.sub(r'\s+', ' ', text).strip() if 'swiss franc' in text.lower(): text = re.sub(r'swiss\s+franc', 'CHF', text, flags=re.IGNORECASE) elif 'us dollar' in text.lower(): text = re.sub(r'us\s+dollar', 'USD', text, flags=re.IGNORECASE) elif 'singapore dollar' in text.lower(): text = re.sub(r'singapore\s+dollar', 'SGD', text, flags=re.IGNORECASE) elif 'hong kong dollar' in text.lower(): text = re.sub(r'hong\s+kong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'hongkong dollar' in text.lower(): text = re.sub(r'hongkong\s+dollar', 'HKD', text, flags=re.IGNORECASE) elif 'australian dollar' in text.lower(): text = re.sub(r'australian\s+dollar', 'AUD', text, flags=re.IGNORECASE) elif 'japanese yen' in text.lower(): text = re.sub(r'japanese\s+yen', 'JPY', text, flags=re.IGNORECASE) elif 'south african rand' in text.lower(): text = re.sub(r'South\s+African\s+rand', 'ZAR', text, flags=re.IGNORECASE) elif 'canadian dollar' in text.lower(): text = re.sub(r'canadian\s+dollar', 'CAD', text, flags=re.IGNORECASE) elif 'new zealand dollar' in text.lower(): text = re.sub(r'new\s+zealand\s+dollar', 'NZD', text, flags=re.IGNORECASE) elif 'norwegian krone' in text.lower(): text = re.sub(r'norwegian\s+krone', 'NOK', text, flags=re.IGNORECASE) elif 'danish krone' in text.lower(): text = re.sub(r'danish\s+krone', 'DKK', text, flags=re.IGNORECASE) elif 'swedish krona' in text.lower(): text = re.sub(r'swedish\s+krona', 'SEK', text, flags=re.IGNORECASE) elif 'swedish kronor' in text.lower(): text = re.sub(r'swedish\s+kronor', 'SEK', text, flags=re.IGNORECASE) elif "GPB" in text.split(): text = re.sub(r"GPB", "GBP", text, flags=re.IGNORECASE) elif 'sterling' in text.lower().split(): text = re.sub(r'sterling', 'GBP', text, flags=re.IGNORECASE) elif 'euro' in text.lower().split(): text = re.sub(r'euro', 'EUR', text, flags=re.IGNORECASE) elif '€' in text.lower().split(): text = re.sub(r'\€', 'EUR', text, flags=re.IGNORECASE) elif '$' in text.lower().split(): text = re.sub(r'\$', 'USD', text, flags=re.IGNORECASE) elif '£' in text.lower().split(): text = re.sub(r'\£', 'GBP', text, flags=re.IGNORECASE) elif 'RMB' in text.split(): text = re.sub(r'RMB', 'CNY', text, flags=re.IGNORECASE) else: pass text_splits = text.split() new_text_splits = [] for split in text_splits: if split.lower() in ['acc', 'acc.', 'accumulating', 'thesaurierende', 'thes.', 'accumulazione', 'akkumulation', 'acumulación', 'accumulatie']: new_text_splits.append('Accumulation') elif split.lower() in ['inc', 'inc.']: new_text_splits.append('Income') elif split.lower() in ['dist', 'dist.', 'dis', 'dis.', 'distributing', 'ausschüttende', 'aussch.', 'distribuzione']: new_text_splits.append('Distribution') elif split.lower() in ['inv', 'inv.']: new_text_splits.append('Investor') elif split.lower() in ['inst', 'inst.', 'institution']: new_text_splits.append('Institutional') elif split.lower() in ['cap', 'cap.']: new_text_splits.append('Capitalisation') elif split.lower() in ['div', 'div.']: new_text_splits.append('Dividend') elif split.lower() in ['adm', 'adm.']: new_text_splits.append('Admin') elif split.lower() in ['adv', 'adv.']: new_text_splits.append('Advantage') elif split.lower() in ['hdg', 'hgd', 'hdg.', 'hgd.', '(h)']: new_text_splits.append('Hedged') elif split.lower() in ['cl', 'cl.']: new_text_splits.append('Class') elif split.lower() in ['ser', 'ser.']: new_text_splits.append('Series') elif split.lower() in ['u.s.']: new_text_splits.append('US') elif split.lower() in ['nc', 'nc.']: new_text_splits.append('no trail') elif split.lower() in ['non']: new_text_splits.append('Not') elif split.lower() in ['net', 'unhgd'] or split == "fl": new_text_splits.append('') else: split = split_short_name_with_share_features(split) new_text_splits.append(split) new_text = ' '.join(new_text_splits) new_text = re.sub(r'\s+', ' ', new_text).strip() return new_text def split_short_name_with_share_features(text: str): """ Split short name with share features, for examples: Document mapping for 532422720 CHFHInc to be CHF H Income USDHAcc to be USD H Accumulation GBPHInc to be GBP H Income HAcc to be H Accumulation GBPHedgedAcc to be GBP Hedged Accumulation HGBPInc to be H GBP Income HNOKAcc to be H NOK Accumulation """ if text is None or len(text.strip()) == 0: return text if len(text.split()) > 1: return text text = text.strip() share_features = {'Acc': 'Accumulation', 'Inc': 'Income', 'Dist': 'Distribution', 'Div': 'Dividend',} feature_name = "" for key, value in share_features.items(): if len(text) > len(key) and text.endswith(key): feature_name = value text = text.replace(key, '') break currency_text = "" for currency in total_currency_list: if len(text) > len(currency) and currency in text: currency_text = currency text = text.replace(currency, '') break new_text = currency_text + ' ' + text + ' ' + feature_name new_text = re.sub(r'\s+', ' ', new_text).strip() return new_text def clean_folder(folder_path: str, expired_days: int = 5): if not os.path.exists(folder_path): return for root, dirs, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) if os.path.exists(file_path): file_time = os.path.getmtime(file_path) current_time = time.time() if (current_time - file_time) / (60 * 60 * 24) > expired_days: try: os.remove(file_path) except: pass def remove_abundant_data(data_list: list): exist_data_list = [] # remove abundant data, only keep the first one with value for data in data_list: extract_data = data.get("extract_data", {}) data_detail_list = extract_data.get("data", []) data_detail_list = remove_abundant_data_detail(data_detail_list, exist_data_list) data["extract_data"]["data"] = data_detail_list return data_list def remove_abundant_data_detail(data_detail_list: list, exist_data_list: list): regular_attributes = ["fund_name", "share_name"] remove_list = [] for data_detail in data_detail_list: fund_name = data_detail.get("fund_name", "") share_name = data_detail.get("share_name", "") is_exist_data = False for exist_data in exist_data_list: if fund_name == exist_data["fund_name"] and share_name == exist_data["share_name"]: is_exist_data = True break if not is_exist_data: exist_data_list.append({"fund_name": fund_name, "share_name": share_name}) pop_keys = [] for data_key, data_value in data_detail.items(): if data_key in regular_attributes: continue for exist_data in exist_data_list: if fund_name == exist_data["fund_name"] and share_name == exist_data["share_name"]: if data_key in exist_data.keys(): # fund_data remove fund_key exist_data_value = exist_data.get(data_key, None) if exist_data_value is not None: pop_keys.append(data_key) else: exist_data[data_key] = data_value if len(pop_keys) > 0: for pop_key in pop_keys: data_detail.pop(pop_key) value_keys = [value_key for value_key in list(data_detail.keys()) if value_key not in regular_attributes] if len(value_keys) == 0: remove_list.append(data_detail) for remove_data in remove_list: if remove_data in data_detail_list: data_detail_list.remove(remove_data) return data_detail_list def replace_special_table_header(page_text: str): """ For some special table header, replace to the standard header e.g. raw header 1: Investment Option \n Management \nfee (i) \n(% pa) \n Indirect costs (i) \n(% pa) \n Estimated performance fees (ii) \n(% pa) \n Transaction \ncosts (% pa) \n Buy/sell \nspreads (%) \n Recoverable \nexpenses (iii) \n Estimated \nother \nindirect costs \n Performance fees \ncharged to the \nInvestment \nOption by \nunderlying \nmanagers \n Performance fees \ncharged by \ninterposed \nvehicles \n raw header 2: Fund \n Management \nfee 1 \n(% pa) \n Indirect costs1\n(% pa)\n Estimated performance fees2\n(% pa)\n Transaction \ncosts \n(% pa) \n Buy/sell \nspreads (%) \n Recoverable \nexpenses 3 \n Estimated \nother indirect \ncosts \n Performance \nfees charged \nto the Fund \nby underlying \nmanagers \n Performance \nfees charged \nby interposed \nvehicles \n There are 2 layers of headers, the first layer is the main header, the second layer is the sub header The purpose is to merge the sub header to the main header Indirect costs (i) \n(% pa) replace to Recoverable expenses\nEstimated other indirect costs Estimated performance fees2\n(% pa) replace to Performance fees charged to the Fund by underlying managers\nPerformance fees charged by interposed vehicles Remove the second layer header. e.g. Recoverable \nexpenses (iii) \n Estimated \nother \nindirect costs \n Performance fees \ncharged to the \nInvestment \nOption by \nunderlying \nmanagers \n Performance fees \ncharged by \ninterposed \nvehicles \n or Recoverable \nexpenses 3 \n Estimated \nother indirect \ncosts \n Performance \nfees charged \nto the Fund \nby underlying \nmanagers \n Performance \nfees charged \nby interposed \nvehicles \n """ replace_info_list = [ { # item 0: document 410899007 # item 1: document 539266880, 539266817, 539261734 # item 2: document 539266893 "regex_all_list": [r"\nIndirect costs[\s\S]*?Estimated\s*performance\s*fees[\s\S]*?Investment\s*Option\s*Management\s*fee[\s\S]*?Buy\/sell\s*spreads[\s\S]*?Recoverable\s*expenses[\s\S]*?interposed\s*vehicles\s*\n", r"\n(Investment\s*Option|Fund)[\s\S]*?Management\s*fee[\s\S]*?Indirect\s*costs[\s\S]*?performance\s*fees[\s\S]*?Transaction\s*costs[\s\S]*?Buy\/sell\s*spreads[\s\S]*?Recoverable\s*expenses[\s\S]*?indirect\s*costs[\s\S]*?(interposed\s*vehicles|managers\s*vehicles)\s*\n", r"\nOption\s*name\s*Indirect costs[\s\S]*?Estimated\s*performance\s*fees[\s\S]*?Management\s*fee[\s\S]*?Buy\/sell\s*spreads[\s\S]*?Recoverable\s*expenses[\s\S]*?interposed\s*vehicles\s*\n"], "replace_text": "\nInvestment Option \nManagement fee (% pa) \nRecoverable expenses \nEstimated other indirect costs \nPerformance fees charged to the Fund by underlying managers \nPerformance fees charged by interposed vehicles \nTransaction costs (% pa) \nBuy/sell spreads (%) \n" }, { # item 0: document 410899007 "regex_all_list": [r"Indirect costs[\s\S]*?Estimated\s*performance\s*fees[\s\S]*?Investment\s*Option\s*Management\s*fee[\s\S]*?Transactions\s*costs[\s\S]*?Buy\/sell\s*spreads\s*\(\%\)\s*\n"], "replace_text": "\nInvestment Option \nManagement fee (% pa) \nRecoverable expenses \nEstimated other indirect costs \nPerformance fees charged to the Fund by underlying managers \nPerformance fees charged by interposed vehicles \nTransaction costs (% pa) \nBuy/sell spreads (%) \n" }, { # item 0: document 401212184, page 17 - 20 "regex_all_list": [r"Management\s*Fees\s*and\s*costs\s*[\s\S]*?Ongoing\s*Fee.*?\(A\)[\s\S]*?\(D\)\s*Total\s*Fees\s*and\s*Costs\s*Investment\s*fund\s*Entry\s*Fee[\s\S]*?Nil\s*Entry[\s\S]*?Other\s*investment\s*costs[\s\S]*?Performance\s*fees[\s\S]*?Transaction\s*costs[\s\S]*?Nil\s*Entry\s*Fee\s*.*\n", r"Management\s*Fees\s*and\s*costs\s*[\s\S]*?Ongoing\s*Fee.*?\(A\)[\s\S]*?\(D\)\s*Total\s*Fees\s*and\s*Costs\s*Investment\s*fund\s*Estimated\s*Other[\s\S]*?Entry\s*Fee\s*Nil\s*Entry[\s\S]*?Nil\s*Entry[\s\S]*?Performance\s*fees[\s\S]*?Transaction\s*costs[\s\S]*?Fee\s*option.*\n"], "replace_text": "\nManagement Fees and costs \nInvestment fund \nEntry Fee option \nNil Entry option \nEstimated Other investment costs \nEstimated Performance fees \nEstimated Transaction costs \nOther option 1 \nOther option 2 \n" }, { "regex_all_list": [r"Investment\s*option\s*Administration fees[\s\S]*?administration\s*costs\s*Investment\s*fees[\s\S]*?investment\s*costs\s*Administration\s*fees[\s\S]*?Investment\s*fees[\s\S]*?Estimated\s*administration[\s\S]*?transaction\s*costs[\s\S]*?annual\s*fees\s*and\s*costs\s*\(\%\s*pa\)\s*\n"], "replace_text": "\nInvestment option \nAdministration fees \nEstimated administration costs \nInvestment fees \nEstimated performance fees \nEstimated other investment costs \nEstimated transaction costs \nEstimated total ongoing annual fees and costs \n" } ] updated_text = False for replace_info in replace_info_list: for regex_all in replace_info["regex_all_list"]: if re.search(regex_all, page_text) is not None: page_text = re.sub(regex_all, replace_info["replace_text"], page_text) updated_text = True break if updated_text: break return page_text