dc-ml-emea-ar/utils/biz_utils.py

1121 lines
49 KiB
Python

import re
import os
import time
from utils.logger import logger
from copy import deepcopy
from traceback import print_exc
import utils.benchmark_names
total_currency_list = [
"USD",
"EUR",
"AUD",
"JPY",
"CHF",
"GBP",
"SEK",
"CNY",
"NZD",
"CNH",
"NOK",
"SGD",
"HKD",
"ZAR",
"PLN",
"CAD",
"CZK",
"HUF",
"DKK",
"BRL",
"SKK",
"RON",
"TRY",
"BGN",
"CUP",
"MXN",
"CLF",
"XCD",
"ISK",
"IDR",
"MNT",
"AED",
"AFN",
"INR",
"ESP",
"RUB",
"CLP",
"KRW",
"ETB",
"DZD",
"XEU",
"XFO",
]
share_features_full_name = ['Accumulation', 'Income', 'Distribution', 'Investor', 'Institutional', 'Admin', 'Advantage']
share_features_abbrevation = ['Acc', 'Inc', 'Dist', 'Div', 'Inv', 'Inst', 'Adm', 'Adv']
lower_pre_fix_fund_share = ['fund', "funds", 'portfolio',
'bond', 'bonds', 'class',
'classes', 'share', 'shares']
def add_slash_to_text_as_regex(text: str):
if text is None or len(text) == 0:
return text
special_char_iter = re.finditer("\W", text)
for special_iter in special_char_iter:
if len(special_iter.group().strip()) == 0:
continue
replace = r"\{0}".format(special_iter.group())
if replace not in text:
text = re.sub(replace, r"\\W*", text)
text = re.sub(r"( ){2,}", " ", text)
text = text.replace(" ", r"\s*")
return text
def clean_text(text: str) -> str:
# text = text.lower()
# update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space
text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text)
text = re.sub(r"( ){2,}", ' ', text.strip())
return text
def get_most_similar_name(text: str,
name_list: list,
share_name: str = None,
fund_name: str = None,
matching_type="share",
pre_common_word_list: list = None,
process_cache: dict = None) -> str:
"""
Get the most similar fund name from fund_name_list by jacard similarity
"""
try:
copy_name_list = deepcopy(name_list)
if text is None or len(text.split()) == 0 or \
copy_name_list is None or len(copy_name_list) == 0:
return None, None
for i in range(len(copy_name_list)):
copy_name = copy_name_list[i]
if matching_type == "share":
copy_name, _ = replace_share_name_for_multilingual(copy_name, None)
share_part = get_share_part_list([copy_name])[0]
if '-' in share_part:
copy_name = copy_name.replace('-', ' ')
copy_name = replace_abbrevation(copy_name)
copy_name_list[i] = copy_name
# get common words in fund_name_list
common_word_list = []
if len(name_list) > 1:
_, common_word_list = remove_common_word(copy_name_list)
if pre_common_word_list is not None and len(pre_common_word_list) > 0:
common_word_list.extend([word for word in pre_common_word_list
if word not in common_word_list])
if len(common_word_list) > 0:
common_word_list = [word for word in common_word_list
if len(word) > 1 and word.upper() not in total_currency_list]
text = text.strip()
text = remove_special_characters(text)
text = replace_abbrevation(text)
raw_fund_name_split = []
if fund_name is not None and len(fund_name.strip()) > 0:
fund_name = fund_name.strip()
fund_name = remove_special_characters(fund_name)
raw_fund_name_split = fund_name.upper().split()
if share_name is not None:
share_name = remove_special_characters(share_name)
share_name = replace_abbrevation(share_name)
text, share_name = replace_share_name_for_multilingual(text, share_name)
if matching_type == "share" and share_name is None:
text, share_name = replace_share_name_for_multilingual(text, None)
text_splits = text.split()
if len(text_splits) == 1:
text = split_words_without_space(text)
else:
new_splits = []
for split in text_splits:
if len(split) > 1:
new_splits.extend(split_words_without_space(split).split())
else:
new_splits.append(split)
text = ' '.join(new_splits)
lower_new_splits = [split.lower() for split in new_splits]
for word in common_word_list:
if word not in lower_new_splits:
# remove word in fund_name_list
for i in range(len(copy_name_list)):
temp_splits = copy_name_list[i].split()
copy_name_list[i] = ' '.join([split for split in temp_splits
if remove_special_characters(split).lower() != word])
max_similarity = 0
max_similarity_full_name = None
text = remove_special_characters(text)
if matching_type == "share":
text, share_name, copy_name_list = update_for_currency(text, share_name, copy_name_list)
text = ' '.join([split for split in text.split()
if split.lower() not in lower_pre_fix_fund_share])
if share_name is not None:
share_name = ' '.join([split for split in share_name.split()
if split.lower() not in lower_pre_fix_fund_share])
copy_share_name_list = get_share_part_list(copy_name_list)
for i in range(len(copy_name_list)):
temp_splits = copy_name_list[i].split()
copy_name_list[i] = ' '.join([split for split in temp_splits
if remove_special_characters(split).lower()
not in lower_pre_fix_fund_share])
text_currency = None
text_feature = None
text_share_short_name_list = None
if matching_type == "share" and text is not None and len(text.strip()) > 0:
if process_cache is not None and isinstance(process_cache, dict):
if process_cache.get(text, None) is not None:
cache = process_cache.get(text)
text_share_short_name_list = cache.get("share_short_name")
text_feature = cache.get("share_feature")
text_currency = cache.get("share_currency")
else:
if share_name is not None and len(share_name.strip()) > 0:
text_share_short_name_list = get_share_short_name_from_text(share_name,
confirm_text_share=True)
text_feature = get_share_feature_from_text(share_name)
text_currency = get_currency_from_text(share_name)
else:
text_share_short_name_list = get_share_short_name_from_text(text,
confirm_text_share=True)
text_feature = get_share_feature_from_text(text)
text_currency = get_currency_from_text(text)
# sort text_share_short_name_list
text_share_short_name_list.sort()
process_cache[text] = {
"share_short_name": text_share_short_name_list,
"share_feature": text_feature,
"share_currency": text_currency
}
else:
if share_name is not None and len(share_name.strip()) > 0:
text_share_short_name_list = get_share_short_name_from_text(share_name,
confirm_text_share=True)
text_share_short_name_list.sort()
text_feature = get_share_feature_from_text(share_name)
text_currency = get_currency_from_text(share_name)
else:
text_share_short_name_list = get_share_short_name_from_text(text,
confirm_text_share=True)
text_feature = get_share_feature_from_text(text)
text_currency = get_currency_from_text(text)
# logger.info(f"Source text: {text}, candidate names count: {len(copy_name_list)}")
same_max_similarity_name_list = []
for full_name, copy_name, copy_share_name in zip(name_list , copy_name_list, copy_share_name_list):
if not isinstance(copy_name, str) or len(copy_name.strip()) == 0:
continue
copy_name = remove_special_characters(copy_name)
copy_name = split_words_without_space(copy_name)
copy_name_short_name_list = None
copy_name_feature = None
copy_name_currency = None
if matching_type == "share":
if process_cache is not None and isinstance(process_cache, dict):
if process_cache.get(copy_name, None) is not None:
cache = process_cache.get(copy_name)
copy_name_short_name_list = cache.get("share_short_name")
copy_name_feature = cache.get("share_feature")
copy_name_currency = cache.get("share_currency")
else:
copy_name_short_name_list = get_share_short_name_from_text(copy_share_name)
if copy_name_short_name_list is not None:
copy_name_short_name_list.sort()
copy_name_feature = get_share_feature_from_text(copy_share_name)
copy_name_currency = get_currency_from_text(copy_share_name)
process_cache[copy_name] = {
"share_short_name": copy_name_short_name_list,
"share_feature": copy_name_feature,
"share_currency": copy_name_currency
}
else:
copy_name_short_name_list = get_share_short_name_from_text(copy_share_name)
copy_name_short_name_list.sort()
copy_name_feature = get_share_feature_from_text(copy_share_name)
copy_name_currency = get_currency_from_text(copy_share_name)
try:
if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \
copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0:
updated_text_share_short_name_list, updated_copy_name_short_name_list = \
compare_both_short_name(text_share_short_name_list, copy_name_short_name_list)
if updated_text_share_short_name_list != text_share_short_name_list:
text = ' '.join([split for split in text.split()
if split not in text_share_short_name_list])
text += ' ' + ' '.join(updated_text_share_short_name_list)
text_share_short_name_list = updated_text_share_short_name_list
if updated_copy_name_short_name_list != copy_name_short_name_list:
copy_name = ' '.join([split for split in copy_name.split()
if split not in copy_name_short_name_list])
copy_name += ' ' + ' '.join(updated_copy_name_short_name_list)
copy_name_short_name_list = updated_copy_name_short_name_list
except Exception as e:
print(e)
compare_text = text
try:
text_split = text.split()
text_split_lower = text.lower().split()
copy_name_split_lower = copy_name.lower().split()
if copy_name_feature == "accumulation" and \
(text_feature is None or len(text_feature) == 0 or
text_feature in ["capitalisation", "institutional"]
or "capitalisation" in text_split_lower or "institutional" in text_split_lower):
if "capitalisation" not in copy_name_split_lower:
compare_text = " ".join([split for split in text_split
if split.lower() not in ["cap", "cap.", "capitalisation"]])
text_split = compare_text.split()
if "institutional" not in copy_name_split_lower:
compare_text = " ".join([split for split in text_split
if split.lower() not in ["inst", "inst.", "institutional"]])
text_split = compare_text.split()
if text_feature is not None and len(text_feature) > 0:
compare_text = " ".join([split for split in text_split
if split.lower() != text_feature])
compare_text += " accumulation"
text_feature = "accumulation"
elif copy_name_feature == "income" and \
(text_feature is None or len(text_feature) == 0 or text_feature == "distribution"):
if "dist" in text_split_lower or "dist." in text_split_lower or "distribution" in text_split_lower:
compare_text = " ".join([split for split in text_split
if split.lower() not in ["dist", "dist.", "distribution"]])
compare_text += " income"
text_feature = "income"
else:
pass
similarity = get_jacard_similarity(compare_text,
copy_name,
need_remove_numeric_characters=False)
except Exception as e:
print(e)
print_exc()
similarity = 0
if similarity == 1:
return full_name, similarity
copy_name_2 = replace_abbrevation(copy_name)
if copy_name != copy_name_2:
similarity_2 = get_jacard_similarity(compare_text,
copy_name_2,
need_remove_numeric_characters=False)
if similarity_2 > similarity:
similarity = similarity_2
if similarity > max_similarity:
if matching_type == "share":
if text_currency is not None and len(text_currency) > 0 and \
copy_name_currency is not None and len(copy_name_currency) > 0:
if text_currency != copy_name_currency:
continue
if text_feature is not None and len(text_feature) > 0 and \
copy_name_feature is not None and len(copy_name_feature) > 0:
if text_feature != copy_name_feature:
if text_feature.lower() not in copy_name.lower().split() and \
copy_name_feature.lower() != "accmulation" and \
copy_name_feature.lower() not in compare_text.lower().split():
continue
if matching_type == "share":
if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \
copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0:
short_name_invalid = False
for short in text_share_short_name_list:
if short not in copy_name_short_name_list:
short_name_invalid = True
break
for compare_short in copy_name_short_name_list:
if compare_short not in text_share_short_name_list:
# some short word is in fund name, but not belong to share name
if compare_short.upper() not in raw_fund_name_split:
short_name_invalid = True
break
if short_name_invalid:
continue
max_similarity = similarity
max_similarity_full_name = full_name
same_max_similarity_name_list = []
elif matching_type == "fund" and max_similarity > 0 and max_similarity == similarity:
if full_name is not None and max_similarity_full_name is not None and \
len(full_name.split()) > len(max_similarity_full_name.split()):
max_similarity_full_name = full_name
same_max_similarity_name_list = []
else:
if full_name is not None:
same_max_similarity_name_list.append(full_name)
if max_similarity == 1:
break
# if there are multiple names with the same similarity, return None
if len(same_max_similarity_name_list) > 0:
return None, 0.0
if max_similarity < 0.35:
return None, max_similarity
return max_similarity_full_name, max_similarity
except Exception as e:
print(e)
print_exc()
return None, 0.0
def replace_share_name_for_multilingual(text: str, share_name: str):
if text is None or len(text.strip()) == 0:
return text, share_name
multilingual_share_list = ["Catégorie de parts", "Classe di quote",
"Kategorie Anteile", "Kategorie anteile",
"Clase de participaciones", "Aandelenklasse",
"aandelenklasse", "Anteilklasse", "anteilklasse",
"Aktien", "Aktienklasse", "aktien", "aktienklasse",
"Klasse"]
for multilingual_share in multilingual_share_list:
if multilingual_share in text:
text = text.replace(multilingual_share, "Class")
if share_name is not None and len(share_name.strip()) > 0:
share_name = share_name.replace(multilingual_share, "Class")
break
return text, share_name
def compare_both_short_name(text_short_name_list: list, compare_short_name_list: list):
copy_text_short_name_list = deepcopy(text_short_name_list)
copy_compare_short_name_list = deepcopy(compare_short_name_list)
copy_text_short_name_list = verify_short_name_container(copy_text_short_name_list,
copy_compare_short_name_list)
copy_compare_short_name_list = verify_short_name_container(copy_compare_short_name_list,
copy_text_short_name_list)
return copy_text_short_name_list, copy_compare_short_name_list
def verify_short_name_container(left_short_name_list: list, right_short_name_list: list):
length_1_over_1 = False
length_1_count = 0
length_1_list = []
for short_name in left_short_name_list:
if len(short_name) == 1:
length_1_count += 1
length_1_list.append(short_name)
if length_1_count > 1:
length_1_over_1 = True
if length_1_over_1:
for compare_short_name in right_short_name_list:
if len(compare_short_name) == length_1_count:
all_in = True
for short_name in length_1_list:
if short_name not in compare_short_name:
all_in = False
break
if all_in:
for short_name in length_1_list:
if short_name in left_short_name_list:
left_short_name_list.remove(short_name)
left_short_name_list.append(compare_short_name)
return left_short_name_list
def get_share_part_list(text_list: list):
share_part_list = []
for text in text_list:
text_split = text.split("Funds")
if len(text_split) == 1:
text_split = text.split("Fund")
if len(text_split) == 1:
text_split = text.split("Portfolio")
if len(text_split) == 1:
text_split = text.split("Bonds")
if len(text_split) == 1:
text_split = text.split("Bond")
if len(text_split) > 1:
share_part_text = text_split[-1].strip()
else:
share_part_text = text.strip()
share_part_text = ' '.join([split for split in share_part_text.split()
if remove_special_characters(split).lower()
not in lower_pre_fix_fund_share])
share_part_list.append(share_part_text)
return share_part_list
def get_share_short_name_from_text(text: str, confirm_text_share: bool = False):
if text is None or len(text.strip()) == 0:
return None
text = remove_special_characters(text.strip())
text_split = text.split()
temp_share_features = [feature.lower() for feature in share_features_full_name]
count = 0
share_short_name_list = []
if confirm_text_share:
count_threshold = 6
else:
count_threshold = 4
for split in text_split[::-1]:
if count == count_threshold:
break
if split.lower() not in temp_share_features and \
split.upper() not in total_currency_list:
if len(split) <= 3:
share_short_name_list.append(split.upper())
count += 1
if len(share_short_name_list) > 1:
remove_number = []
for short_name in share_short_name_list[::-1]:
if short_name.isdigit():
remove_number.append(short_name)
else:
break
for remove in remove_number:
if remove in share_short_name_list:
share_short_name_list.remove(remove)
return share_short_name_list
def get_share_feature_from_text(text: str):
if text is None or len(text.strip()) == 0:
return None
text = text.strip()
text = text.lower()
text_split = text.split()
temp_share_features = [feature.lower() for feature in share_features_full_name]
count = 0
for split in text_split[::-1]:
if count == 4:
break
if split.lower() in temp_share_features:
return split
count += 1
return None
def get_currency_from_text(text: str):
if text is None or len(text.strip()) == 0:
return None
text = text.strip()
text_split = text.split()
count = 0
currency_list = []
for split in text_split[::-1]:
if count == 4:
break
if split.upper() in total_currency_list:
currency_list.append(split.upper())
count += 1
if len(currency_list) > 1:
# remove the first currency from currency list
if currency_list[0] in ['USD', 'EUR']:
currency_list.pop(0)
else:
remove_currency = None
for currency in currency_list:
if currency in ['USD', 'EUR']:
remove_currency = currency
break
if remove_currency is not None:
currency_list.remove(remove_currency)
return currency_list[0]
elif len(currency_list) == 1:
return currency_list[0]
else:
return None
def update_for_currency(text: str, share_name: str, compare_list: list):
try:
currency_in_text = get_currency_from_text(text)
with_currency = False
if currency_in_text is not None:
with_currency = True
with_currency_list = []
without_currency_list = []
for index, compare in enumerate(compare_list):
# compare_split = compare.split()
with_currency_compare = False
currecy_in_compare = get_currency_from_text(compare)
if currecy_in_compare is not None:
with_currency_compare = True
if with_currency_compare:
with_currency_list.append(index)
else:
without_currency_list.append(index)
if not with_currency and len(with_currency_list) == 0:
pass
elif not with_currency and len(with_currency_list) > 0:
share_short_name_list = []
if share_name is not None and len(share_name.strip()) > 0:
share_short_name_list = get_share_short_name_from_text(share_name)
updated = False
if len(share_short_name_list) > 0:
if len(without_currency_list) > 0:
for index in without_currency_list:
all_in_list = True
compare_split = [split.upper() for split in compare_list[index].split()]
for share_shot_name in share_short_name_list:
if share_shot_name not in compare_split:
all_in_list = False
break
if all_in_list:
text = text + ' ' + 'USD'
if share_name is not None:
share_name = share_name + ' ' + 'USD'
updated = True
break
if not updated:
currency_list = []
for index in with_currency_list:
all_in_list = True
compare_split = [split.upper() for split in compare_list[index].split()]
for share_shot_name in share_short_name_list:
if share_shot_name not in compare_split:
all_in_list = False
break
if all_in_list:
current_currency_list = [split for split in compare_split
if split.upper() in total_currency_list]
if len(current_currency_list) > 0:
currency_list.append(current_currency_list[-1])
if len(currency_list) == 1:
text = text + ' ' + currency_list[0]
if share_name is not None:
share_name = share_name + ' ' + currency_list[0]
updated = True
for index in without_currency_list:
compare_list[index] = compare_list[index] + ' ' + 'USD'
if not updated:
text = text + ' ' + 'USD'
if share_name is not None:
share_name = share_name + ' ' + 'USD'
# return text, share_name, compare_list
elif with_currency and len(without_currency_list) == 0:
for index in without_currency_list:
compare_list[index] = compare_list[index] + ' ' + 'USD'
# return text, share_name, compare_list
else:
# return text, share_name, compare_list
pass
default_currency = 'USD'
if with_currency and share_name is not None:
share_name_split = share_name.split()
share_name_currency = get_currency_from_text(share_name)
if share_name_currency is not None and share_name_currency in total_currency_list:
for split in share_name_split:
if split in total_currency_list and split != share_name_currency:
default_currency = split
break
new_share_name = ' '.join([split for split in share_name_split
if split not in total_currency_list
or (split == share_name_currency)])
if share_name in text:
text = text.replace(share_name, new_share_name)
else:
text = ' '.join([split for split in text.split()
if split not in total_currency_list
or (split == share_name_currency)])
share_name = new_share_name
for c_i in range(len(compare_list)):
compare = compare_list[c_i]
compare_share_part = get_share_part_list([compare])[0]
compare_share_part_split = compare_share_part.split()
compare_share_part_currency_list = []
for split in compare_share_part_split:
if split.upper() in total_currency_list and split.upper() not in compare_share_part_currency_list:
compare_share_part_currency_list.append(split)
if len(compare_share_part_currency_list) > 1 and default_currency in compare_share_part_currency_list:
compare_share_part_split = [split for split in compare_share_part_split if split.upper() != default_currency]
new_compare_share_part = ' '.join(compare_share_part_split)
compare_list[c_i] = compare.replace(compare_share_part, new_compare_share_part)
except Exception as e:
logger.error(f"Error in update_for_currency: {e}")
return text, share_name, compare_list
def remove_common_word(text_list: list):
if text_list is None or len(text_list) == 0:
return text_list
new_text_list = []
for text in text_list:
text = text.lower()
text = remove_special_characters(text)
text_splits = text.split()
text = ' '.join([split for split in text_splits
if split.lower() not in lower_pre_fix_fund_share])
new_text_list.append(text)
# remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words
# the result is ['Global', 'Growth']
common_word_list = []
new_text_splits_list = [text.split() for text in new_text_list]
with_common_word = False
for i in range(len(new_text_splits_list)):
for j in range(i+1, len(new_text_splits_list)):
if common_word_list is None or len(common_word_list) == 0:
common_word_list = list(
set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j])))
else:
common_word_list = list(
set(common_word_list).intersection(set(new_text_splits_list[j])))
if len(common_word_list) > 0:
with_common_word = True
if with_common_word and len(common_word_list) == 0:
break
if with_common_word and len(common_word_list) == 0:
break
remove_list = []
# if exists the share name and currency name, remove from the list
for word in common_word_list:
if word.upper() in total_currency_list:
remove_list.append(word)
for remove in remove_list:
if remove in common_word_list:
common_word_list.remove(remove)
common_word_list = list(set(common_word_list))
for i in range(len(new_text_splits_list)):
for common_word in common_word_list:
if common_word in new_text_splits_list[i]:
new_text_splits_list[i].remove(common_word)
new_text_list = [' '.join(text_splits)
for text_splits in new_text_splits_list]
return new_text_list, common_word_list
def split_words_without_space(text: str):
"""
Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund'
"""
if text is None or len(text.strip()) == 0:
return []
text = text.strip()
# splits = text.split()
# if len(splits) > 1:
# return text
# find all words with capital letter + lower letter
regex = r"[A-Z][a-z]+"
regex2 = r"[A-Z]{2,}[a-z]+"
word_list = re.findall(regex, text)
word_list2 = re.findall(regex2, text)
if len(word_list) > 0:
for word in word_list:
if len(word_list2) > 0:
word_exists_in_word2 = False
for word2 in word_list2:
if word in word2:
word_exists_in_word2 = True
break
if word_exists_in_word2:
continue
text = text.replace(word, " " + word + " ")
text = re.sub(r"(\s)+", " ", text)
return text.strip()
def remove_special_characters(text):
text = re.sub(r'\W', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def get_unique_words_text(text):
text = remove_special_characters(text)
text = text.lower()
text_split = text.split()
text_split = list(set(text_split))
# sort the list
text_split.sort()
return_text = ' '.join(text_split)
return return_text
def remove_numeric_characters(text):
# remove numeric characters
text = re.sub(r'\d+', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def get_jacard_similarity(text_left,
text_right,
need_remove_special_characters=True,
need_remove_numeric_characters=True):
if need_remove_special_characters:
text_left = remove_special_characters(text_left)
text_right = remove_special_characters(text_right)
if need_remove_numeric_characters:
text_left = remove_numeric_characters(text_left)
text_right = remove_numeric_characters(text_right)
text_left = text_left.lower()
text_right = text_right.lower()
text_left = text_left.split()
text_right = text_right.split()
intersection = set(text_left).intersection(set(text_right))
union = set(text_left).union(set(text_right))
intersection_count = len(intersection)
union_count = len(union)
differ_a = list(set(text_left).difference(set(text_right)))
differ_a.sort()
differ_b = list(set(text_right).difference(set(text_left)))
differ_b.sort()
if ''.join(differ_a) == ''.join(differ_b):
intersection_count += len(differ_a) + len(differ_b)
if union_count > 0:
return round(intersection_count / union_count, 3)
else:
return 0
def simple_most_similarity_name(text: str, name_list: list):
if text is None or len(text.strip()) == 0 or \
name_list is None or len(name_list) == 0:
return None, 0.0
max_similarity = 0
max_similarity_name = None
for full_name in name_list:
similarity = get_jacard_similarity(text, full_name)
if similarity > max_similarity:
max_similarity = similarity
max_similarity_name = full_name
if max_similarity == 1:
break
return max_similarity_name, max_similarity
def get_beginning_common_words(text_list: list):
"""
Get the beginning common words in text_list
"""
if text_list is None or len(text_list) < 2:
return []
common_words_list = []
first_text_split = text_list[0].split()
for w_i, word in enumerate(first_text_split):
all_same = True
for text in text_list[1:]:
text_split = text.split()
if w_i >= len(text_split):
all_same = False
break
if text_split[w_i] != word:
all_same = False
break
if all_same:
common_words_list.append(word)
else:
break
return ' '.join(common_words_list).strip()
def replace_abbrevation(text: str):
if text is None or len(text.strip()) == 0:
return text
text = text.replace('(', ' ').replace(')', ' ').replace('-', ' ')
text = re.sub(r'\s+', ' ', text).strip()
if 'swiss franc' in text.lower():
text = re.sub(r'swiss\s+franc', 'CHF', text, flags=re.IGNORECASE)
elif 'us dollar' in text.lower():
text = re.sub(r'us\s+dollar', 'USD', text, flags=re.IGNORECASE)
elif 'singapore dollar' in text.lower():
text = re.sub(r'singapore\s+dollar', 'SGD', text, flags=re.IGNORECASE)
elif 'hong kong dollar' in text.lower():
text = re.sub(r'hong\s+kong\s+dollar', 'HKD', text, flags=re.IGNORECASE)
elif 'hongkong dollar' in text.lower():
text = re.sub(r'hongkong\s+dollar', 'HKD', text, flags=re.IGNORECASE)
elif 'australian dollar' in text.lower():
text = re.sub(r'australian\s+dollar', 'AUD', text, flags=re.IGNORECASE)
elif 'japanese yen' in text.lower():
text = re.sub(r'japanese\s+yen', 'JPY', text, flags=re.IGNORECASE)
elif 'south african rand' in text.lower():
text = re.sub(r'South\s+African\s+rand', 'ZAR', text, flags=re.IGNORECASE)
elif 'canadian dollar' in text.lower():
text = re.sub(r'canadian\s+dollar', 'CAD', text, flags=re.IGNORECASE)
elif 'new zealand dollar' in text.lower():
text = re.sub(r'new\s+zealand\s+dollar', 'NZD', text, flags=re.IGNORECASE)
elif 'norwegian krone' in text.lower():
text = re.sub(r'norwegian\s+krone', 'NOK', text, flags=re.IGNORECASE)
elif 'danish krone' in text.lower():
text = re.sub(r'danish\s+krone', 'DKK', text, flags=re.IGNORECASE)
elif 'swedish krona' in text.lower():
text = re.sub(r'swedish\s+krona', 'SEK', text, flags=re.IGNORECASE)
elif 'swedish kronor' in text.lower():
text = re.sub(r'swedish\s+kronor', 'SEK', text, flags=re.IGNORECASE)
elif "GPB" in text.split():
text = re.sub(r"GPB", "GBP", text, flags=re.IGNORECASE)
elif 'sterling' in text.lower().split():
text = re.sub(r'sterling', 'GBP', text, flags=re.IGNORECASE)
elif 'euro' in text.lower().split():
text = re.sub(r'euro', 'EUR', text, flags=re.IGNORECASE)
elif '' in text.lower().split():
text = re.sub(r'\', 'EUR', text, flags=re.IGNORECASE)
elif '$' in text.lower().split():
text = re.sub(r'\$', 'USD', text, flags=re.IGNORECASE)
elif '£' in text.lower().split():
text = re.sub(r'\£', 'GBP', text, flags=re.IGNORECASE)
elif 'RMB' in text.split():
text = re.sub(r'RMB', 'CNY', text, flags=re.IGNORECASE)
else:
pass
text_splits = text.split()
new_text_splits = []
for split in text_splits:
if split.lower() in ['acc', 'acc.', 'accumulating',
'thesaurierende', 'thes.', 'accumulazione',
'akkumulation', 'acumulación',
'accumulatie']:
new_text_splits.append('Accumulation')
elif split.lower() in ['inc', 'inc.']:
new_text_splits.append('Income')
elif split.lower() in ['dist', 'dist.', 'dis',
'dis.', 'distributing', 'ausschüttende',
'aussch.', 'distribuzione']:
new_text_splits.append('Distribution')
elif split.lower() in ['inv', 'inv.']:
new_text_splits.append('Investor')
elif split.lower() in ['inst', 'inst.', 'institution']:
new_text_splits.append('Institutional')
elif split.lower() in ['cap', 'cap.']:
new_text_splits.append('Capitalisation')
elif split.lower() in ['div', 'div.']:
new_text_splits.append('Dividend')
elif split.lower() in ['adm', 'adm.']:
new_text_splits.append('Admin')
elif split.lower() in ['adv', 'adv.']:
new_text_splits.append('Advantage')
elif split.lower() in ['hdg', 'hgd', 'hdg.', 'hgd.', '(h)']:
new_text_splits.append('Hedged')
elif split.lower() in ['cl', 'cl.']:
new_text_splits.append('Class')
elif split.lower() in ['ser', 'ser.']:
new_text_splits.append('Series')
elif split.lower() in ['u.s.']:
new_text_splits.append('US')
elif split.lower() in ['nc', 'nc.']:
new_text_splits.append('no trail')
elif split.lower() in ['non']:
new_text_splits.append('Not')
elif split.lower() in ['net', 'unhgd'] or split == "fl":
new_text_splits.append('')
else:
split = split_short_name_with_share_features(split)
new_text_splits.append(split)
new_text = ' '.join(new_text_splits)
new_text = re.sub(r'\s+', ' ', new_text).strip()
return new_text
def split_short_name_with_share_features(text: str):
"""
Split short name with share features,
for examples:
Document mapping for 532422720
CHFHInc to be CHF H Income
USDHAcc to be USD H Accumulation
GBPHInc to be GBP H Income
HAcc to be H Accumulation
GBPHedgedAcc to be GBP Hedged Accumulation
HGBPInc to be H GBP Income
HNOKAcc to be H NOK Accumulation
"""
if text is None or len(text.strip()) == 0:
return text
if len(text.split()) > 1:
return text
text = text.strip()
share_features = {'Acc': 'Accumulation',
'Inc': 'Income',
'Dist': 'Distribution',
'Div': 'Dividend',}
feature_name = ""
for key, value in share_features.items():
if len(text) > len(key) and text.endswith(key):
feature_name = value
text = text.replace(key, '')
break
currency_text = ""
for currency in total_currency_list:
if len(text) > len(currency) and currency in text:
currency_text = currency
text = text.replace(currency, '')
break
new_text = currency_text + ' ' + text + ' ' + feature_name
new_text = re.sub(r'\s+', ' ', new_text).strip()
return new_text
def clean_folder(folder_path: str, expired_days: int = 5):
if not os.path.exists(folder_path):
return
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
if os.path.exists(file_path):
file_time = os.path.getmtime(file_path)
current_time = time.time()
if (current_time - file_time) / (60 * 60 * 24) > expired_days:
try:
os.remove(file_path)
except:
pass
def remove_abundant_data(data_list: list):
exist_data_list = []
# remove abundant data, only keep the first one with value
for data in data_list:
extract_data = data.get("extract_data", {})
data_detail_list = extract_data.get("data", [])
data_detail_list = remove_abundant_data_detail(data_detail_list,
exist_data_list)
data["extract_data"]["data"] = data_detail_list
return data_list
def remove_abundant_data_detail(data_detail_list: list,
exist_data_list: list):
regular_attributes = ["fund_name", "share_name"]
remove_list = []
for data_detail in data_detail_list:
fund_name = data_detail.get("fund_name", "")
share_name = data_detail.get("share_name", "")
is_exist_data = False
for exist_data in exist_data_list:
if fund_name == exist_data["fund_name"] and share_name == exist_data["share_name"]:
is_exist_data = True
break
if not is_exist_data:
exist_data_list.append({"fund_name": fund_name, "share_name": share_name})
pop_keys = []
for data_key, data_value in data_detail.items():
if data_key in regular_attributes:
continue
for exist_data in exist_data_list:
if fund_name == exist_data["fund_name"] and share_name == exist_data["share_name"]:
if data_key in exist_data.keys():
# fund_data remove fund_key
exist_data_value = exist_data.get(data_key, None)
if exist_data_value is not None:
pop_keys.append(data_key)
else:
exist_data[data_key] = data_value
if len(pop_keys) > 0:
for pop_key in pop_keys:
data_detail.pop(pop_key)
value_keys = [value_key for value_key in list(data_detail.keys())
if value_key not in regular_attributes]
if len(value_keys) == 0:
remove_list.append(data_detail)
for remove_data in remove_list:
if remove_data in data_detail_list:
data_detail_list.remove(remove_data)
return data_detail_list
def replace_special_table_header(replace_table_header_config: list, page_text: str):
"""
For some special table header, replace to the standard header
e.g.
raw header 1:
Investment Option \n
Management \nfee (i) \n(% pa) \n
Indirect costs (i) \n(% pa) \n
Estimated performance fees (ii) \n(% pa) \n
Transaction \ncosts (% pa) \n
Buy/sell \nspreads (%) \n
Recoverable \nexpenses (iii) \n
Estimated \nother \nindirect costs \n
Performance fees \ncharged to the \nInvestment \nOption by \nunderlying \nmanagers \n
Performance fees \ncharged by \ninterposed \nvehicles \n
raw header 2:
Fund \n
Management \nfee 1 \n(% pa) \n
Indirect costs1\n(% pa)\n
Estimated performance fees2\n(% pa)\n
Transaction \ncosts \n(% pa) \n
Buy/sell \nspreads (%) \n
Recoverable \nexpenses 3 \n
Estimated \nother indirect \ncosts \n
Performance \nfees charged \nto the Fund \nby underlying \nmanagers \n
Performance \nfees charged \nby interposed \nvehicles \n
There are 2 layers of headers, the first layer is the main header, the second layer is the sub header
The purpose is to merge the sub header to the main header
Indirect costs (i) \n(% pa) replace to Recoverable expenses\nEstimated other indirect costs
Estimated performance fees2\n(% pa) replace to Performance fees charged to the Fund by underlying managers\nPerformance fees charged by interposed vehicles
Remove the second layer header.
e.g.
Recoverable \nexpenses (iii) \n
Estimated \nother \nindirect costs \n
Performance fees \ncharged to the \nInvestment \nOption by \nunderlying \nmanagers \n
Performance fees \ncharged by \ninterposed \nvehicles \n
or
Recoverable \nexpenses 3 \n
Estimated \nother indirect \ncosts \n
Performance \nfees charged \nto the Fund \nby underlying \nmanagers \n
Performance \nfees charged \nby interposed \nvehicles \n
"""
if replace_table_header_config is None or len(replace_table_header_config) == 0:
return page_text
updated_text = False
for replace_info in replace_table_header_config:
for regex_all in replace_info.get("regex_all_list", []):
table_header_search = re.search(regex_all, page_text)
if table_header_search is not None:
original_text = table_header_search.group()
page_text = re.sub(regex_all, replace_info.get("replace_text", original_text), page_text)
updated_text = True
break
if updated_text:
break
# split numbers like 1.320.00 to be 1.32 0.00 by regex
if re.search(r'(\d)\.(\d{2})(\d)\.(\d{2})', page_text):
page_text = re.sub(r'(\d)\.(\d{2})(\d)\.(\d{2})', r'\1.\2 \3.\4', page_text)
return page_text
def get_bechmark_name(text, search_terms, word_count=300):
results = []
try:
# text = bs(text).get_text()
for term in search_terms:
pattern = r'\b' + re.escape(term) + r'\b(?:\s+\S+){0,' + str(word_count) + '}'
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
results.append(match.group())
except Exception as e:
logger.error(f"An error occurred while processing the term '{term}': {e}")
return " ".join(results)
#benchmark_name = get_bechmark_name(text, benchmark_names.benchmark_keywords)