dc-ml-emea-ar/utils/biz_utils.py

874 lines
37 KiB
Python

import re
from utils.logger import logger
from copy import deepcopy
from traceback import print_exc
total_currency_list = [
"USD",
"EUR",
"AUD",
"JPY",
"CHF",
"GBP",
"SEK",
"CNY",
"NZD",
"CNH",
"NOK",
"SGD",
"HKD",
"ZAR",
"PLN",
"CAD",
"CZK",
"HUF",
"DKK",
"BRL",
"SKK",
"RON",
"TRY",
"BGN",
"CUP",
"MXN",
"CLF",
"XCD",
"ISK",
"IDR",
"MNT",
"AED",
"AFN",
"INR",
"ESP",
"RUB",
"CLP",
"KRW",
"ETB",
"DZD",
"XEU",
"XFO",
]
share_features_full_name = ['Accumulation', 'Income', 'Distribution', 'Investor', 'Institutional', 'Admin', 'Advantage']
share_features_abbrevation = ['Acc', 'Inc', 'Dist', 'Div', 'Inv', 'Inst', 'Adm', 'Adv']
lower_pre_fix_fund_share = ['fund', "funds", 'portfolio',
'bond', 'bonds', 'class',
'classes', 'share', 'shares']
def add_slash_to_text_as_regex(text: str):
if text is None or len(text) == 0:
return text
special_char_iter = re.finditer("\W", text)
for special_iter in special_char_iter:
if len(special_iter.group().strip()) == 0:
continue
replace = r"\{0}".format(special_iter.group())
if replace not in text:
text = re.sub(replace, replace, text)
text = re.sub(r"\s+", r"\\s+", text)
return text
def clean_text(text: str) -> str:
# text = text.lower()
# update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space
text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text)
text = re.sub(r"( ){2,}", ' ', text.strip())
return text
def get_most_similar_name(text: str,
name_list: list,
share_name: str = None,
fund_name: str = None,
matching_type="share",
pre_common_word_list: list = None,
process_cache: dict = None) -> str:
"""
Get the most similar fund name from fund_name_list by jacard similarity
"""
try:
copy_name_list = deepcopy(name_list)
if text is None or len(text.split()) == 0 or \
copy_name_list is None or len(copy_name_list) == 0:
return None, None
for i in range(len(copy_name_list)):
copy_name = copy_name_list[i]
share_part = get_share_part_list([copy_name])[0]
if '-' in share_part:
copy_name = copy_name.replace('-', ' ')
copy_name = replace_abbrevation(copy_name)
copy_name_list[i] = copy_name
# get common words in fund_name_list
common_word_list = []
if len(name_list) > 1:
_, common_word_list = remove_common_word(copy_name_list)
if pre_common_word_list is not None and len(pre_common_word_list) > 0:
common_word_list.extend([word for word in pre_common_word_list
if word not in common_word_list])
if len(common_word_list) > 0:
common_word_list = [word for word in common_word_list
if len(word) > 1 and word.upper() not in total_currency_list]
text = text.strip()
text = remove_special_characters(text)
text = replace_abbrevation(text)
raw_fund_name_split = []
if fund_name is not None and len(fund_name.strip()) > 0:
fund_name = fund_name.strip()
fund_name = remove_special_characters(fund_name)
raw_fund_name_split = fund_name.upper().split()
if share_name is not None:
share_name = remove_special_characters(share_name)
share_name = replace_abbrevation(share_name)
text_splits = text.split()
if len(text_splits) == 1:
text = split_words_without_space(text)
else:
new_splits = []
for split in text_splits:
if len(split) > 1:
new_splits.extend(split_words_without_space(split).split())
else:
new_splits.append(split)
text = ' '.join(new_splits)
lower_new_splits = [split.lower() for split in new_splits]
for word in common_word_list:
if word not in lower_new_splits:
# remove word in fund_name_list
for i in range(len(copy_name_list)):
temp_splits = copy_name_list[i].split()
copy_name_list[i] = ' '.join([split for split in temp_splits
if remove_special_characters(split).lower() != word])
max_similarity = 0
max_similarity_full_name = None
text = remove_special_characters(text)
if matching_type == "share":
text, share_name, copy_name_list = update_for_currency(text, share_name, copy_name_list)
text = ' '.join([split for split in text.split()
if split.lower() not in lower_pre_fix_fund_share])
if share_name is not None:
share_name = ' '.join([split for split in share_name.split()
if split.lower() not in lower_pre_fix_fund_share])
copy_share_name_list = get_share_part_list(copy_name_list)
for i in range(len(copy_name_list)):
temp_splits = copy_name_list[i].split()
copy_name_list[i] = ' '.join([split for split in temp_splits
if remove_special_characters(split).lower()
not in lower_pre_fix_fund_share])
text_currency = None
text_feature = None
text_share_short_name_list = None
if matching_type == "share" and text is not None and len(text.strip()) > 0:
if process_cache is not None and isinstance(process_cache, dict):
if process_cache.get(text, None) is not None:
cache = process_cache.get(text)
text_share_short_name_list = cache.get("share_short_name")
text_feature = cache.get("share_feature")
text_currency = cache.get("share_currency")
else:
if share_name is not None and len(share_name.strip()) > 0:
text_share_short_name_list = get_share_short_name_from_text(share_name,
confirm_text_share=True)
text_feature = get_share_feature_from_text(share_name)
text_currency = get_currency_from_text(share_name)
else:
text_share_short_name_list = get_share_short_name_from_text(text,
confirm_text_share=True)
text_feature = get_share_feature_from_text(text)
text_currency = get_currency_from_text(text)
# sort text_share_short_name_list
text_share_short_name_list.sort()
process_cache[text] = {
"share_short_name": text_share_short_name_list,
"share_feature": text_feature,
"share_currency": text_currency
}
else:
if share_name is not None and len(share_name.strip()) > 0:
text_share_short_name_list = get_share_short_name_from_text(share_name,
confirm_text_share=True)
text_share_short_name_list.sort()
text_feature = get_share_feature_from_text(share_name)
text_currency = get_currency_from_text(share_name)
else:
text_share_short_name_list = get_share_short_name_from_text(text,
confirm_text_share=True)
text_feature = get_share_feature_from_text(text)
text_currency = get_currency_from_text(text)
# logger.info(f"Source text: {text}, candidate names count: {len(copy_name_list)}")
same_max_similarity_name_list = []
for full_name, copy_name, copy_share_name in zip(name_list , copy_name_list, copy_share_name_list):
if not isinstance(copy_name, str) or len(copy_name.strip()) == 0:
continue
copy_name = remove_special_characters(copy_name)
copy_name = split_words_without_space(copy_name)
copy_name_short_name_list = None
copy_name_feature = None
copy_name_currency = None
if matching_type == "share":
if process_cache is not None and isinstance(process_cache, dict):
if process_cache.get(copy_name, None) is not None:
cache = process_cache.get(copy_name)
copy_name_short_name_list = cache.get("share_short_name")
copy_name_feature = cache.get("share_feature")
copy_name_currency = cache.get("share_currency")
else:
copy_name_short_name_list = get_share_short_name_from_text(copy_share_name)
if copy_name_short_name_list is not None:
copy_name_short_name_list.sort()
copy_name_feature = get_share_feature_from_text(copy_share_name)
copy_name_currency = get_currency_from_text(copy_share_name)
process_cache[copy_name] = {
"share_short_name": copy_name_short_name_list,
"share_feature": copy_name_feature,
"share_currency": copy_name_currency
}
else:
copy_name_short_name_list = get_share_short_name_from_text(copy_share_name)
copy_name_short_name_list.sort()
copy_name_feature = get_share_feature_from_text(copy_share_name)
copy_name_currency = get_currency_from_text(copy_share_name)
try:
if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \
copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0:
updated_text_share_short_name_list, updated_copy_name_short_name_list = \
compare_both_short_name(text_share_short_name_list, copy_name_short_name_list)
if updated_text_share_short_name_list != text_share_short_name_list:
text = ' '.join([split for split in text.split()
if split not in text_share_short_name_list])
text += ' ' + ' '.join(updated_text_share_short_name_list)
text_share_short_name_list = updated_text_share_short_name_list
if updated_copy_name_short_name_list != copy_name_short_name_list:
copy_name = ' '.join([split for split in copy_name.split()
if split not in copy_name_short_name_list])
copy_name += ' ' + ' '.join(updated_copy_name_short_name_list)
copy_name_short_name_list = updated_copy_name_short_name_list
except Exception as e:
print(e)
try:
similarity = get_jacard_similarity(text,
copy_name,
need_remove_numeric_characters=False)
except Exception as e:
print(e)
print_exc()
similarity = 0
if similarity == 1:
return full_name, similarity
copy_name_2 = replace_abbrevation(copy_name)
if copy_name != copy_name_2:
similarity_2 = get_jacard_similarity(text,
copy_name_2,
need_remove_numeric_characters=False)
if similarity_2 > similarity:
similarity = similarity_2
if similarity > max_similarity:
if matching_type == "share":
if text_currency is not None and len(text_currency) > 0 and \
copy_name_currency is not None and len(copy_name_currency) > 0:
if text_currency != copy_name_currency:
continue
if text_feature is not None and len(text_feature) > 0 and \
copy_name_feature is not None and len(copy_name_feature) > 0:
if text_feature != copy_name_feature:
if text_feature.lower() not in copy_name.lower().split() and \
copy_name_feature.lower() != "accmulation" and \
copy_name_feature.lower() not in text.lower().split():
continue
if matching_type == "share":
if text_share_short_name_list is not None and len(text_share_short_name_list) > 0 and \
copy_name_short_name_list is not None and len(copy_name_short_name_list) > 0:
short_name_invalid = False
for short in text_share_short_name_list:
if short not in copy_name_short_name_list:
short_name_invalid = True
break
for compare_short in copy_name_short_name_list:
if compare_short not in text_share_short_name_list:
# some short word is in fund name, but not belong to share name
if compare_short.upper() not in raw_fund_name_split:
short_name_invalid = True
break
if short_name_invalid:
continue
max_similarity = similarity
max_similarity_full_name = full_name
same_max_similarity_name_list = []
elif matching_type == "fund" and max_similarity > 0 and max_similarity == similarity:
if full_name is not None and max_similarity_full_name is not None and \
len(full_name.split()) > len(max_similarity_full_name.split()):
max_similarity_full_name = full_name
same_max_similarity_name_list = []
else:
if full_name is not None:
same_max_similarity_name_list.append(full_name)
if max_similarity == 1:
break
# if there are multiple names with the same similarity, return None
if len(same_max_similarity_name_list) > 0:
return None, 0.0
if max_similarity < 0.35:
return None, max_similarity
return max_similarity_full_name, max_similarity
except Exception as e:
print(e)
print_exc()
return None, 0.0
def compare_both_short_name(text_short_name_list: list, compare_short_name_list: list):
copy_text_short_name_list = deepcopy(text_short_name_list)
copy_compare_short_name_list = deepcopy(compare_short_name_list)
copy_text_short_name_list = verify_short_name_container(copy_text_short_name_list,
copy_compare_short_name_list)
copy_compare_short_name_list = verify_short_name_container(copy_compare_short_name_list,
copy_text_short_name_list)
return copy_text_short_name_list, copy_compare_short_name_list
def verify_short_name_container(left_short_name_list: list, right_short_name_list: list):
length_1_over_1 = False
length_1_count = 0
length_1_list = []
for short_name in left_short_name_list:
if len(short_name) == 1:
length_1_count += 1
length_1_list.append(short_name)
if length_1_count > 1:
length_1_over_1 = True
if length_1_over_1:
for compare_short_name in right_short_name_list:
if len(compare_short_name) == length_1_count:
all_in = True
for short_name in length_1_list:
if short_name not in compare_short_name:
all_in = False
break
if all_in:
for short_name in length_1_list:
if short_name in left_short_name_list:
left_short_name_list.remove(short_name)
left_short_name_list.append(compare_short_name)
return left_short_name_list
def get_share_part_list(text_list: list):
share_part_list = []
for text in text_list:
text_split = text.split("Funds")
if len(text_split) == 1:
text_split = text.split("Fund")
if len(text_split) == 1:
text_split = text.split("Portfolio")
if len(text_split) == 1:
text_split = text.split("Bonds")
if len(text_split) == 1:
text_split = text.split("Bond")
if len(text_split) > 1:
share_part_text = text_split[-1].strip()
else:
share_part_text = text.strip()
share_part_text = ' '.join([split for split in share_part_text.split()
if remove_special_characters(split).lower()
not in lower_pre_fix_fund_share])
share_part_list.append(share_part_text)
return share_part_list
def get_share_short_name_from_text(text: str, confirm_text_share: bool = False):
if text is None or len(text.strip()) == 0:
return None
text = remove_special_characters(text.strip())
text_split = text.split()
temp_share_features = [feature.lower() for feature in share_features_full_name]
count = 0
share_short_name_list = []
if confirm_text_share:
count_threshold = 6
else:
count_threshold = 4
for split in text_split[::-1]:
if count == count_threshold:
break
if split.lower() not in temp_share_features and \
split.upper() not in total_currency_list:
if len(split) <= 3:
share_short_name_list.append(split.upper())
count += 1
if len(share_short_name_list) > 1:
remove_number = []
for short_name in share_short_name_list[::-1]:
if short_name.isdigit():
remove_number.append(short_name)
else:
break
for remove in remove_number:
if remove in share_short_name_list:
share_short_name_list.remove(remove)
return share_short_name_list
def get_share_feature_from_text(text: str):
if text is None or len(text.strip()) == 0:
return None
text = text.strip()
text = text.lower()
text_split = text.split()
temp_share_features = [feature.lower() for feature in share_features_full_name]
count = 0
for split in text_split[::-1]:
if count == 4:
break
if split.lower() in temp_share_features:
return split
count += 1
return None
def get_currency_from_text(text: str):
if text is None or len(text.strip()) == 0:
return None
text = text.strip()
text_split = text.split()
count = 0
for split in text_split[::-1]:
if count == 4:
break
if split.upper() in total_currency_list:
return split
count += 1
return None
def update_for_currency(text: str, share_name: str, compare_list: list):
currency_in_text = get_currency_from_text(text)
with_currency = False
if currency_in_text is not None:
with_currency = True
with_currency_list = []
without_currency_list = []
for index, compare in enumerate(compare_list):
# compare_split = compare.split()
with_currency_compare = False
currecy_in_compare = get_currency_from_text(compare)
if currecy_in_compare is not None:
with_currency_compare = True
if with_currency_compare:
with_currency_list.append(index)
else:
without_currency_list.append(index)
if not with_currency and len(with_currency_list) == 0:
pass
elif not with_currency and len(with_currency_list) > 0:
share_short_name_list = []
if share_name is not None and len(share_name.strip()) > 0:
share_short_name_list = get_share_short_name_from_text(share_name)
updated = False
if len(share_short_name_list) > 0:
if len(without_currency_list) > 0:
for index in without_currency_list:
all_in_list = True
compare_split = [split.upper() for split in compare_list[index].split()]
for share_shot_name in share_short_name_list:
if share_shot_name not in compare_split:
all_in_list = False
break
if all_in_list:
text = text + ' ' + 'USD'
if share_name is not None:
share_name = share_name + ' ' + 'USD'
updated = True
break
if not updated:
currency_list = []
for index in with_currency_list:
all_in_list = True
compare_split = [split.upper() for split in compare_list[index].split()]
for share_shot_name in share_short_name_list:
if share_shot_name not in compare_split:
all_in_list = False
break
if all_in_list:
current_currency_list = [split for split in compare_split
if split.upper() in total_currency_list]
if len(current_currency_list) > 0:
currency_list.append(current_currency_list[-1])
if len(currency_list) == 1:
text = text + ' ' + currency_list[0]
if share_name is not None:
share_name = share_name + ' ' + currency_list[0]
updated = True
for index in without_currency_list:
compare_list[index] = compare_list[index] + ' ' + 'USD'
if not updated:
text = text + ' ' + 'USD'
if share_name is not None:
share_name = share_name + ' ' + 'USD'
# return text, share_name, compare_list
elif with_currency and len(without_currency_list) == 0:
for index in without_currency_list:
compare_list[index] = compare_list[index] + ' ' + 'USD'
# return text, share_name, compare_list
else:
# return text, share_name, compare_list
pass
if with_currency:
share_name_split = share_name.split()
share_name_currency_list = []
for split in share_name_split:
if split.upper() in total_currency_list and split.upper() not in share_name_currency_list:
share_name_currency_list.append(split)
if len(share_name_currency_list) > 1 and 'USD' in share_name_currency_list:
new_share_name = ' '.join([split for split in share_name_split if split.upper() != 'USD'])
if share_name in text:
text = text.replace(share_name, new_share_name)
else:
text = ' '.join([split for split in text.split() if split.upper() != 'USD'])
share_name = new_share_name
for c_i in range(len(compare_list)):
compare = compare_list[c_i]
compare_share_part = get_share_part_list([compare])[0]
compare_share_part_split = compare_share_part.split()
compare_share_part_currency_list = []
for split in compare_share_part_split:
if split.upper() in total_currency_list and split.upper() not in compare_share_part_currency_list:
compare_share_part_currency_list.append(split)
if len(compare_share_part_currency_list) > 1 and 'USD' in compare_share_part_currency_list:
compare_share_part_split = [split for split in compare_share_part_split if split.upper() != 'USD']
new_compare_share_part = ' '.join(compare_share_part_split)
compare_list[c_i] = compare.replace(compare_share_part, new_compare_share_part)
return text, share_name, compare_list
def remove_common_word(text_list: list):
if text_list is None or len(text_list) == 0:
return text_list
new_text_list = []
for text in text_list:
text = text.lower()
text = remove_special_characters(text)
text_splits = text.split()
text = ' '.join([split for split in text_splits
if split.lower() not in lower_pre_fix_fund_share])
new_text_list.append(text)
# remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words
# the result is ['Global', 'Growth']
common_word_list = []
new_text_splits_list = [text.split() for text in new_text_list]
with_common_word = False
for i in range(len(new_text_splits_list)):
for j in range(i+1, len(new_text_splits_list)):
if common_word_list is None or len(common_word_list) == 0:
common_word_list = list(
set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j])))
else:
common_word_list = list(
set(common_word_list).intersection(set(new_text_splits_list[j])))
if len(common_word_list) > 0:
with_common_word = True
if with_common_word and len(common_word_list) == 0:
break
if with_common_word and len(common_word_list) == 0:
break
remove_list = []
# if exists the share name and currency name, remove from the list
for word in common_word_list:
if word.upper() in total_currency_list:
remove_list.append(word)
for remove in remove_list:
if remove in common_word_list:
common_word_list.remove(remove)
common_word_list = list(set(common_word_list))
for i in range(len(new_text_splits_list)):
for common_word in common_word_list:
if common_word in new_text_splits_list[i]:
new_text_splits_list[i].remove(common_word)
new_text_list = [' '.join(text_splits)
for text_splits in new_text_splits_list]
return new_text_list, common_word_list
def split_words_without_space(text: str):
"""
Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund'
"""
if text is None or len(text.strip()) == 0:
return []
text = text.strip()
# splits = text.split()
# if len(splits) > 1:
# return text
# find all words with capital letter + lower letter
regex = r"[A-Z][a-z]+"
regex2 = r"[A-Z]{2,}[a-z]+"
word_list = re.findall(regex, text)
word_list2 = re.findall(regex2, text)
if len(word_list) > 0:
for word in word_list:
if len(word_list2) > 0:
word_exists_in_word2 = False
for word2 in word_list2:
if word in word2:
word_exists_in_word2 = True
break
if word_exists_in_word2:
continue
text = text.replace(word, " " + word + " ")
text = re.sub(r"(\s)+", " ", text)
return text.strip()
def remove_special_characters(text):
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def get_unique_words_text(text):
text = remove_special_characters(text)
text = text.lower()
text_split = text.split()
text_split = list(set(text_split))
# sort the list
text_split.sort()
return_text = ' '.join(text_split)
return return_text
def remove_numeric_characters(text):
# remove numeric characters
text = re.sub(r'\d+', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def get_jacard_similarity(text_left,
text_right,
need_remove_special_characters=True,
need_remove_numeric_characters=True):
if need_remove_special_characters:
text_left = remove_special_characters(text_left)
text_right = remove_special_characters(text_right)
if need_remove_numeric_characters:
text_left = remove_numeric_characters(text_left)
text_right = remove_numeric_characters(text_right)
text_left = text_left.lower()
text_right = text_right.lower()
text_left = text_left.split()
text_right = text_right.split()
intersection = set(text_left).intersection(set(text_right))
union = set(text_left).union(set(text_right))
intersection_count = len(intersection)
union_count = len(union)
differ_a = list(set(text_left).difference(set(text_right)))
differ_a.sort()
differ_b = list(set(text_right).difference(set(text_left)))
differ_b.sort()
if ''.join(differ_a) == ''.join(differ_b):
intersection_count += len(differ_a) + len(differ_b)
if union_count > 0:
return round(intersection_count / union_count, 3)
else:
return 0
def simple_most_similarity_name(text: str, name_list: list):
if text is None or len(text.strip()) == 0 or \
name_list is None or len(name_list) == 0:
return None, 0.0
max_similarity = 0
max_similarity_name = None
for full_name in name_list:
similarity = get_jacard_similarity(text, full_name)
if similarity > max_similarity:
max_similarity = similarity
max_similarity_name = full_name
if max_similarity == 1:
break
return max_similarity_name, max_similarity
def get_beginning_common_words(text_list: list):
"""
Get the beginning common words in text_list
"""
if text_list is None or len(text_list) < 2:
return []
common_words_list = []
first_text_split = text_list[0].split()
for w_i, word in enumerate(first_text_split):
all_same = True
for text in text_list[1:]:
text_split = text.split()
if w_i >= len(text_split):
all_same = False
break
if text_split[w_i] != word:
all_same = False
break
if all_same:
common_words_list.append(word)
else:
break
return ' '.join(common_words_list).strip()
def replace_abbrevation(text: str):
if text is None or len(text.strip()) == 0:
return text
text = text.replace('(', ' ').replace(')', ' ').replace('-', ' ')
text = re.sub(r'\s+', ' ', text).strip()
if 'swiss franc' in text.lower():
text = re.sub(r'swiss\s+franc', 'CHF', text, flags=re.IGNORECASE)
elif 'us dollar' in text.lower():
text = re.sub(r'us\s+dollar', 'USD', text, flags=re.IGNORECASE)
elif 'singapore dollar' in text.lower():
text = re.sub(r'singapore\s+dollar', 'SGD', text, flags=re.IGNORECASE)
elif 'hong kong dollar' in text.lower():
text = re.sub(r'hong\s+kong\s+dollar', 'HKD', text, flags=re.IGNORECASE)
elif 'hongkong dollar' in text.lower():
text = re.sub(r'hongkong\s+dollar', 'HKD', text, flags=re.IGNORECASE)
elif 'australian dollar' in text.lower():
text = re.sub(r'australian\s+dollar', 'AUD', text, flags=re.IGNORECASE)
elif 'japanese yen' in text.lower():
text = re.sub(r'japanese\s+yen', 'JPY', text, flags=re.IGNORECASE)
elif 'south african rand' in text.lower():
text = re.sub(r'South\s+African\s+rand', 'ZAR', text, flags=re.IGNORECASE)
elif 'canadian dollar' in text.lower():
text = re.sub(r'canadian\s+dollar', 'CAD', text, flags=re.IGNORECASE)
elif 'new zealand dollar' in text.lower():
text = re.sub(r'new\s+zealand\s+dollar', 'NZD', text, flags=re.IGNORECASE)
elif 'norwegian krone' in text.lower():
text = re.sub(r'norwegian\s+krone', 'NOK', text, flags=re.IGNORECASE)
elif 'danish krone' in text.lower():
text = re.sub(r'danish\s+krone', 'DKK', text, flags=re.IGNORECASE)
elif 'swedish krona' in text.lower():
text = re.sub(r'swedish\s+krona', 'SEK', text, flags=re.IGNORECASE)
elif 'swedish kronor' in text.lower():
text = re.sub(r'swedish\s+kronor', 'SEK', text, flags=re.IGNORECASE)
elif "GPB" in text.split():
text = re.sub(r"GPB", "GBP", text, flags=re.IGNORECASE)
elif 'sterling' in text.lower().split():
text = re.sub(r'sterling', 'GBP', text, flags=re.IGNORECASE)
elif 'euro' in text.lower().split():
text = re.sub(r'euro', 'EUR', text, flags=re.IGNORECASE)
elif '' in text.lower().split():
text = re.sub(r'\', 'EUR', text, flags=re.IGNORECASE)
elif '$' in text.lower().split():
text = re.sub(r'\$', 'USD', text, flags=re.IGNORECASE)
elif '£' in text.lower().split():
text = re.sub(r'\£', 'GBP', text, flags=re.IGNORECASE)
elif 'RMB' in text.split():
text = re.sub(r'RMB', 'CNY', text, flags=re.IGNORECASE)
else:
pass
text_splits = text.split()
new_text_splits = []
for split in text_splits:
if split.lower() in ['acc', 'acc.', 'accumulating']:
new_text_splits.append('Accumulation')
elif split.lower() in ['inc', 'inc.']:
new_text_splits.append('Income')
elif split.lower() in ['dist', 'dist.', 'dis', 'dis.', "distributing"]:
new_text_splits.append('Distribution')
elif split.lower() in ['inv', 'inv.']:
new_text_splits.append('Investor')
elif split.lower() in ['inst', 'inst.', 'institution']:
new_text_splits.append('Institutional')
elif split.lower() in ['cap', 'cap.']:
new_text_splits.append('Capitalisation')
elif split.lower() in ['div', 'div.']:
new_text_splits.append('Dividend')
elif split.lower() in ['adm', 'adm.']:
new_text_splits.append('Admin')
elif split.lower() in ['adv', 'adv.']:
new_text_splits.append('Advantage')
elif split.lower() in ['hdg', 'hgd', 'hdg.', 'hgd.', '(h)']:
new_text_splits.append('Hedged')
elif split.lower() in ['cl', 'cl.']:
new_text_splits.append('Class')
elif split.lower() in ['ser', 'ser.']:
new_text_splits.append('Series')
elif split.lower() in ['u.s.']:
new_text_splits.append('US')
elif split.lower() in ['nc', 'nc.']:
new_text_splits.append('no trail')
elif split.lower() in ['non']:
new_text_splits.append('Not')
elif split.lower() in ['net', 'unhgd']:
new_text_splits.append('')
else:
split = split_short_name_with_share_features(split)
new_text_splits.append(split)
new_text = ' '.join(new_text_splits)
new_text = re.sub(r'\s+', ' ', new_text).strip()
return new_text
def split_short_name_with_share_features(text: str):
"""
Split short name with share features,
for examples:
Document mapping for 532422720
CHFHInc to be CHF H Income
USDHAcc to be USD H Accumulation
GBPHInc to be GBP H Income
HAcc to be H Accumulation
GBPHedgedAcc to be GBP Hedged Accumulation
HGBPInc to be H GBP Income
HNOKAcc to be H NOK Accumulation
"""
if text is None or len(text.strip()) == 0:
return text
if len(text.split()) > 1:
return text
text = text.strip()
share_features = {'Acc': 'Accumulation',
'Inc': 'Income',
'Dist': 'Distribution',
'Div': 'Dividend',}
feature_name = ""
for key, value in share_features.items():
if len(text) > len(key) and text.endswith(key):
feature_name = value
text = text.replace(key, '')
break
currency_text = ""
for currency in total_currency_list:
if len(text) > len(currency) and currency in text:
currency_text = currency
text = text.replace(currency, '')
break
new_text = currency_text + ' ' + text + ' ' + feature_name
new_text = re.sub(r'\s+', ' ', new_text).strip()
return new_text