dc-ml-emea-ar/utils/biz_utils.py

383 lines
16 KiB
Python

import re
from copy import deepcopy
from traceback import print_exc
def add_slash_to_text_as_regex(text: str):
if text is None or len(text) == 0:
return text
special_char_iter = re.finditer("\W", text)
for special_iter in special_char_iter:
if len(special_iter.group().strip()) == 0:
continue
replace = r"\{0}".format(special_iter.group())
if replace not in text:
text = re.sub(replace, replace, text)
text = re.sub(r"\s+", r"\\s+", text)
return text
def clean_text(text: str) -> str:
# text = text.lower()
# update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space
text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text)
text = re.sub(r"( ){2,}", ' ', text.strip())
return text
def get_most_similar_name(text: str, name_list: list, pre_common_word_list: list = None) -> str:
"""
Get the most similar fund name from fund_name_list by jacard similarity
"""
try:
copy_fund_name_list = deepcopy(name_list)
if text is None or len(text.split()) == 0 or \
copy_fund_name_list is None or len(copy_fund_name_list) == 0:
return None, None
copy_fund_name_list = [replace_abbrevation(copy_fund_name) for copy_fund_name
in copy_fund_name_list]
# get common words in fund_name_list
common_word_list = []
if len(name_list) > 1:
_, common_word_list = remove_common_word(copy_fund_name_list)
if pre_common_word_list is not None and len(pre_common_word_list) > 0:
common_word_list.extend([word for word in pre_common_word_list
if word not in common_word_list])
text = text.strip()
text = remove_special_characters(text)
text = replace_abbrevation(text)
text_splits = text.split()
if len(text_splits) == 1:
text = split_words_without_space(text)
else:
new_splits = []
for split in text_splits:
if len(split) > 1:
new_splits.extend(split_words_without_space(split).split())
else:
new_splits.append(split)
lower_new_splits = [split.lower() for split in new_splits]
for word in common_word_list:
if word not in lower_new_splits:
# remove word in fund_name_list
for i in range(len(copy_fund_name_list)):
temp_splits = copy_fund_name_list[i].split()
copy_fund_name_list[i] = ' '.join([split for split in temp_splits
if remove_special_characters(split).lower() != word])
for i in range(len(copy_fund_name_list)):
temp_splits = copy_fund_name_list[i].split()
copy_fund_name_list[i] = ' '.join([split for split in temp_splits
if remove_special_characters(split).lower() not in ['fund', 'portfolio', 'class', 'share', 'shares']])
final_splits = []
for split in new_splits:
if split.lower() not in ['fund', 'portfolio', 'class', 'share', 'shares']:
final_splits.append(split)
text = ' '.join(final_splits)
max_similarity = 0
max_similarity_fund_name = None
text = remove_special_characters(text)
text, copy_fund_name_list = update_for_currency(text, copy_fund_name_list)
for fund_name, copy_fund_name in zip(name_list , copy_fund_name_list):
copy_fund_name = remove_special_characters(copy_fund_name)
copy_fund_name = split_words_without_space(copy_fund_name)
similarity = get_jacard_similarity(text,
copy_fund_name,
need_remove_numeric_characters=False)
if similarity > max_similarity:
max_similarity = similarity
max_similarity_fund_name = fund_name
if max_similarity == 1:
break
if max_similarity < 0.35:
return None, max_similarity
return max_similarity_fund_name, max_similarity
except Exception as e:
print(e)
print_exc()
return None, 0.0
def update_for_currency(text: str, compare_list: list):
text_split = text.split()
with_currency = False
total_currency_list = ['USD', 'EUR', 'AUD', 'JPY', 'CHF', 'GBP', 'SEK', 'CNY',
'NZD', 'CNH', 'NOK', 'SGD', 'HKD', 'ZAR', 'PLN', 'CAD',
'CZK', 'HUF', 'DKK', 'BRL', 'SKK', 'RON', 'TRY', 'BGN',
'CUP', 'MXN', 'TOP', 'ILS', 'CLF', 'XCD', 'ISK', 'IDR',
'MNT', 'AED', 'AFN', 'INR', 'ESP', 'RUB', 'CLP', 'KRW',
'ETB', 'DZD', 'XEU', 'XFO']
for split in text_split:
if split.upper() in total_currency_list:
with_currency = True
break
with_currency_list = []
without_currency_list = []
for index, compare in enumerate(compare_list):
compare_split = compare.split()
with_currency_compare = False
for split in compare_split:
if split.upper() in total_currency_list:
with_currency_compare = True
break
if with_currency_compare:
with_currency_list.append(index)
else:
without_currency_list.append(index)
if not with_currency and len(with_currency_list) == 0:
return text, compare_list
elif not with_currency and len(with_currency_list) > 0:
last_split = text_split[-1]
updated = False
if len(last_split) < 4 and last_split.upper() == last_split:
if len(without_currency_list) > 0:
for index in without_currency_list:
if last_split in compare_list[index].split():
text = text + ' ' + 'USD'
updated = True
break
if not updated:
currency_list = []
for index in with_currency_list:
compare_split = compare_list[index].split()
if last_split in compare_split:
current_currency_list = [split for split in compare_split
if split.upper() in total_currency_list]
if len(current_currency_list) > 0:
currency_list.append(current_currency_list[-1])
if len(currency_list) == 1:
text = text + ' ' + currency_list[0]
updated = True
for index in without_currency_list:
compare_list[index] = compare_list[index] + ' ' + 'USD'
if not updated:
text = text + ' ' + 'USD'
return text, compare_list
elif with_currency and len(without_currency_list) == 0:
for index in without_currency_list:
compare_list[index] = compare_list[index] + ' ' + 'USD'
return text, compare_list
else:
return text, compare_list
def remove_common_word(text_list: list):
if text_list is None or len(text_list) == 0:
return text_list
new_text_list = []
for text in text_list:
text = text.lower()
text = remove_special_characters(text)
text_splits = text.split()
while 'fund' in text_splits:
text_splits.remove('fund')
while 'portfolio' in text_splits:
text_splits.remove('portfolio')
while 'share' in text_splits:
text_splits.remove('share')
while 'class' in text_splits:
text_splits.remove('class')
text = ' '.join(text_splits)
new_text_list.append(text)
# remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words
# the result is ['Global', 'Growth']
common_word_list = []
new_text_splits_list = [text.split() for text in new_text_list]
for i in range(len(new_text_splits_list)):
for j in range(i+1, len(new_text_splits_list)):
if common_word_list is None or len(common_word_list) == 0:
common_word_list = list(
set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j])))
else:
common_word_list = list(
set(common_word_list).intersection(set(new_text_splits_list[j])))
common_word_list = list(set(common_word_list))
for i in range(len(new_text_splits_list)):
for common_word in common_word_list:
if common_word in new_text_splits_list[i]:
new_text_splits_list[i].remove(common_word)
new_text_list = [' '.join(text_splits)
for text_splits in new_text_splits_list]
return new_text_list, common_word_list
def split_words_without_space(text: str):
"""
Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund'
"""
if text is None or len(text.strip()) == 0:
return []
text = text.strip()
# splits = text.split()
# if len(splits) > 1:
# return text
# find all words with capital letter + lower letter
regex = r'[A-Z][a-z]+'
word_list = re.findall(regex, text)
if len(word_list) > 0:
for word in word_list:
text = text.replace(word, ' ' + word + ' ')
text = re.sub(r'(\s)+', ' ', text)
return text.strip()
def remove_special_characters(text):
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def get_unique_words_text(text):
text = remove_special_characters(text)
text = text.lower()
text_split = text.split()
text_split = list(set(text_split))
# sort the list
text_split.sort()
return_text = ' '.join(text_split)
return return_text
def remove_numeric_characters(text):
# remove numeric characters
text = re.sub(r'\d+', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def get_jacard_similarity(text_left,
text_right,
need_remove_special_characters=True,
need_remove_numeric_characters=True):
if need_remove_special_characters:
text_left = remove_special_characters(text_left)
text_right = remove_special_characters(text_right)
if need_remove_numeric_characters:
text_left = remove_numeric_characters(text_left)
text_right = remove_numeric_characters(text_right)
text_left = text_left.lower()
text_right = text_right.lower()
text_left = text_left.split()
text_right = text_right.split()
intersection = set(text_left).intersection(set(text_right))
union = set(text_left).union(set(text_right))
if len(union) > 0:
return round(len(intersection) / len(union), 3)
else:
return 0
def get_beginning_common_words(text_list: list):
"""
Get the beginning common words in text_list
"""
if text_list is None or len(text_list) < 2:
return []
common_words_list = []
first_text_split = text_list[0].split()
for w_i, word in enumerate(first_text_split):
all_same = True
for text in text_list[1:]:
text_split = text.split()
if w_i >= len(text_split):
all_same = False
break
if text_split[w_i] != word:
all_same = False
break
if all_same:
common_words_list.append(word)
else:
break
return ' '.join(common_words_list).strip()
def replace_abbrevation(text: str):
if text is None or len(text.strip()) == 0:
return text
text = text.strip()
if 'swiss franc' in text.lower():
text = re.sub(r'swiss\s+franc', 'CHF', text, flags=re.IGNORECASE)
elif 'us dollar' in text.lower():
text = re.sub(r'us\s+dollar', 'USD', text, flags=re.IGNORECASE)
elif 'singapore dollar' in text.lower():
text = re.sub(r'singapore\s+dollar', 'SGD', text, flags=re.IGNORECASE)
elif 'hong kong dollar' in text.lower():
text = re.sub(r'hong\s+kong\s+dollar', 'HKD', text, flags=re.IGNORECASE)
elif 'hongkong dollar' in text.lower():
text = re.sub(r'hongkong\s+dollar', 'HKD', text, flags=re.IGNORECASE)
elif 'australian dollar' in text.lower():
text = re.sub(r'australian\s+dollar', 'AUD', text, flags=re.IGNORECASE)
elif 'japanese yen' in text.lower():
text = re.sub(r'japanese\s+yen', 'JPY', text, flags=re.IGNORECASE)
elif 'south african rand' in text.lower():
text = re.sub(r'South\s+African\s+rand', 'ZAR', text, flags=re.IGNORECASE)
elif 'canadian dollar' in text.lower():
text = re.sub(r'canadian\s+dollar', 'CAD', text, flags=re.IGNORECASE)
elif 'new zealand dollar' in text.lower():
text = re.sub(r'new\s+zealand\s+dollar', 'NZD', text, flags=re.IGNORECASE)
elif 'norwegian krone' in text.lower():
text = re.sub(r'norwegian\s+krone', 'NOK', text, flags=re.IGNORECASE)
elif 'danish krone' in text.lower():
text = re.sub(r'danish\s+krone', 'DKK', text, flags=re.IGNORECASE)
elif 'swedish krona' in text.lower():
text = re.sub(r'swedish\s+krona', 'SEK', text, flags=re.IGNORECASE)
elif 'swedish kronor' in text.lower():
text = re.sub(r'swedish\s+kronor', 'SEK', text, flags=re.IGNORECASE)
elif 'sterling' in text.lower().split():
text = re.sub(r'sterling', 'GBP', text, flags=re.IGNORECASE)
elif 'euro' in text.lower().split():
text = re.sub(r'euro', 'EUR', text, flags=re.IGNORECASE)
elif '' in text.lower().split():
text = re.sub(r'\', 'EUR', text, flags=re.IGNORECASE)
elif '$' in text.lower().split():
text = re.sub(r'\$', 'USD', text, flags=re.IGNORECASE)
elif '£' in text.lower().split():
text = re.sub(r'\£', 'GBP', text, flags=re.IGNORECASE)
elif 'RMB' in text.lower().split():
text = re.sub(r'RMB', 'CNY', text, flags=re.IGNORECASE)
else:
pass
text_splits = text.split()
new_text_splits = []
for split in text_splits:
if split.lower() in ['acc', 'acc.']:
new_text_splits.append('Accumulation')
elif split.lower() in ['inc', 'inc.']:
new_text_splits.append('Income')
elif split.lower() in ['dist', 'dist.']:
new_text_splits.append('Distribution')
elif split.lower() in ['inv', 'inv.']:
new_text_splits.append('Investor')
elif split.lower() in ['inst', 'inst.', 'institution']:
new_text_splits.append('Institutional')
elif split.lower() in ['cap', 'cap.']:
new_text_splits.append('Capitalisation')
elif split.lower() in ['adm', 'adm.']:
new_text_splits.append('Admin')
elif split.lower() in ['adv', 'adv.']:
new_text_splits.append('Advantage')
elif split.lower() in ['hdg', 'hgd', 'hdg.', 'hgd.', '(h)']:
new_text_splits.append('Hedged')
elif split.lower() in ['cl', 'cl.']:
new_text_splits.append('Class')
elif split.lower() in ['ser', 'ser.']:
new_text_splits.append('Series')
elif split.lower() in ['u.s.']:
new_text_splits.append('US')
elif split.lower() in ['nc', 'nc.']:
new_text_splits.append('no trail')
else:
new_text_splits.append(split)
new_text = ' '.join(new_text_splits)
return new_text