dc-ml-emea-ar/core/data_mapping.py

335 lines
16 KiB
Python
Raw Normal View History

import os
import json
import pandas as pd
from utils.biz_utils import get_most_similar_name, remove_common_word
from utils.sql_query_util import (
query_document_fund_mapping,
query_investment_by_provider,
)
from utils.logger import logger
class DataMapping:
def __init__(
self,
doc_id,
datapoints: list,
raw_document_data_list: list,
document_mapping_info_df: pd.DataFrame,
output_data_folder: str,
):
self.doc_id = doc_id
self.datapoints = datapoints
self.raw_document_data_list = raw_document_data_list
if document_mapping_info_df is None or len(document_mapping_info_df) == 0:
self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
else:
self.document_mapping_info_df = document_mapping_info_df
if output_data_folder is None or len(output_data_folder) == 0:
output_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
os.makedirs(output_data_folder, exist_ok=True)
self.output_data_json_folder = os.path.join(output_data_folder, "json/")
os.makedirs(self.output_data_json_folder, exist_ok=True)
self.output_data_excel_folder = os.path.join(output_data_folder, "excel/")
os.makedirs(self.output_data_excel_folder, exist_ok=True)
self.set_mapping_data_by_db(self.document_mapping_info_df)
def set_mapping_data_by_db(self, document_mapping_info_df: pd.DataFrame):
logger.info("Setting document mapping data")
if document_mapping_info_df is None or len(document_mapping_info_df) == 0:
self.document_mapping_info_df = query_document_fund_mapping(self.doc_id, rerun=False)
else:
self.document_mapping_info_df = document_mapping_info_df
if len(self.document_mapping_info_df) == 0:
self.doc_fund_name_list = []
self.doc_share_name_list = []
self.doc_fund_mapping = pd.DataFrame()
self.doc_fund_class_mapping = pd.DataFrame()
else:
self.doc_fund_name_list = (
self.document_mapping_info_df["FundName"].unique().tolist()
)
self.doc_share_name_list = (
self.document_mapping_info_df["ShareClassName"].unique().tolist()
)
self.doc_fund_mapping = self.document_mapping_info_df[
["FundId", "FundName"]
].drop_duplicates()
self.doc_fund_class_mapping = self.document_mapping_info_df[
["FundId", "SecId", "ShareClassName", "CurrencyId"]
].drop_duplicates()
logger.info("Setting provider mapping data")
self.provider_mapping_df = self.get_provider_mapping()
if len(self.provider_mapping_df) == 0:
self.provider_fund_name_list = []
self.provider_share_name_list = []
self.provider_fund_mapping = pd.DataFrame()
self.provider_fund_class_mapping = pd.DataFrame()
else:
self.provider_fund_name_list = (
self.provider_mapping_df["FundName"].unique().tolist()
)
self.provider_share_name_list = (
self.provider_mapping_df["ShareClassName"].unique().tolist()
)
self.provider_fund_mapping = self.provider_mapping_df[
["FundId", "FundName"]
].drop_duplicates()
self.provider_fund_class_mapping = self.provider_mapping_df[
["FundId", "SecId", "ShareClassName", "CurrencyId"]
].drop_duplicates()
def get_provider_mapping(self):
if len(self.document_mapping_info_df) == 0:
return pd.DataFrame()
provider_id_list = (
self.document_mapping_info_df["ProviderId"].unique().tolist()
)
provider_mapping_list = []
for provider_id in provider_id_list:
provider_mapping_list.append(query_investment_by_provider(provider_id, rerun=False))
provider_mapping_df = pd.concat(provider_mapping_list)
provider_mapping_df = provider_mapping_df.drop_duplicates()
provider_mapping_df.reset_index(drop=True, inplace=True)
return provider_mapping_df
def mapping_raw_data(self):
"""
doc_id, page_index, datapoint, value,
raw_fund_name, fund_id, fund_name,
raw_share_name, share_id, share_name
"""
2024-09-19 21:29:26 +00:00
logger.info(f"Mapping raw data for document {self.doc_id}")
mapped_data_list = []
mapped_fund_cache = {}
mapped_share_cache = {}
for page_data in self.raw_document_data_list:
doc_id = page_data.get("doc_id", "")
page_index = page_data.get("page_index", "")
raw_data_list = page_data.get("extract_data", {}).get("data", [])
for raw_data in raw_data_list:
raw_fund_name = raw_data.get("fund_name", "")
if raw_fund_name is None or len(raw_fund_name) == 0:
continue
raw_share_name = raw_data.get("share_name", "")
if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0:
if len(raw_share_name) > 0:
integrated_share_name = self.integrate_share_name(raw_fund_name, raw_share_name)
raw_data_keys = list(raw_data.keys())
for datapoint in self.datapoints:
if datapoint in raw_data_keys:
mapped_data = {
"doc_id": doc_id,
"page_index": page_index,
"raw_fund_name": raw_fund_name,
"raw_share_name": raw_share_name,
"raw_name": integrated_share_name,
2024-09-12 21:00:49 +00:00
"datapoint": datapoint,
"value": raw_data[datapoint],
"investment_type": 1,
"investment_id": "",
"investment_name": "",
"similarity": 0
}
mapped_data_list.append(mapped_data)
else:
raw_data_keys = list(raw_data.keys())
for datapoint in self.datapoints:
if datapoint in raw_data_keys:
mapped_data = {
"doc_id": doc_id,
"page_index": page_index,
"raw_fund_name": raw_fund_name,
"raw_share_name": "",
"raw_name": raw_fund_name,
2024-09-12 21:00:49 +00:00
"datapoint": datapoint,
"value": raw_data[datapoint],
"investment_type": 33,
2024-09-12 21:00:49 +00:00
"investment_id": "",
"investment_name": ""
}
mapped_data_list.append(mapped_data)
else:
raw_name = ""
if raw_share_name is not None and len(raw_share_name) > 0:
raw_name = self.integrate_share_name(raw_fund_name, raw_share_name)
if mapped_share_cache.get(raw_name) is not None:
investment_info = mapped_share_cache[raw_name]
else:
if mapped_fund_cache.get(raw_fund_name) is not None:
fund_info = mapped_fund_cache[raw_fund_name]
fund_id = fund_info["id"]
else:
fund_info = self.matching_with_database(
raw_fund_name, "fund"
)
fund_id = fund_info["id"]
mapped_fund_cache[raw_fund_name] = fund_info
investment_info = self.matching_with_database(
raw_name, fund_id, "share"
)
mapped_share_cache[raw_name] = investment_info
elif raw_fund_name is not None and len(raw_fund_name) > 0:
raw_name = raw_fund_name
if mapped_fund_cache.get(raw_fund_name) is not None:
investment_info = mapped_fund_cache[raw_fund_name]
else:
investment_info = self.matching_with_database(
raw_name, "fund"
)
mapped_fund_cache[raw_fund_name] = investment_info
else:
raw_name = ""
investment_info = {
"id": "",
"legal_name": "",
"investment_type": -1,
"similarity": 0
}
raw_data_keys = list(raw_data.keys())
for datapoint in self.datapoints:
if datapoint in raw_data_keys:
mapped_data = {
"doc_id": doc_id,
"page_index": page_index,
"raw_fund_name": raw_fund_name,
"raw_share_name": raw_share_name,
"raw_name": raw_name,
2024-09-12 21:00:49 +00:00
"datapoint": datapoint,
"value": raw_data[datapoint],
"investment_type": investment_info["investment_type"],
"investment_id": investment_info["id"],
"investment_name": investment_info["legal_name"],
"similarity": investment_info["similarity"]
}
mapped_data_list.append(mapped_data)
json_data_file = os.path.join(
self.output_data_json_folder, f"{self.doc_id}.json"
)
with open(json_data_file, "w", encoding="utf-8") as f:
json.dump(mapped_data_list, f, ensure_ascii=False, indent=4)
extract_data_df = pd.DataFrame(self.raw_document_data_list)
extract_data_df.reset_index(drop=True, inplace=True)
mapping_data_df = pd.DataFrame(mapped_data_list)
mapping_data_df.reset_index(drop=True, inplace=True)
excel_data_file = os.path.join(
self.output_data_excel_folder, f"{self.doc_id}.xlsx"
)
with pd.ExcelWriter(excel_data_file) as writer:
mapping_data_df.to_excel(writer, sheet_name="mapping_data", index=False)
extract_data_df.to_excel(writer, sheet_name="extract_data", index=False)
return mapped_data_list
def integrate_share_name(self, raw_fund_name: str, raw_share_name: str):
raw_name = ""
if raw_share_name is not None and len(raw_share_name) > 0:
raw_name = raw_share_name
# some share names are very short,
# so we need to combine with fund name
raw_name_splits = raw_name.split()
raw_fund_name_splits = raw_fund_name.split()
for split in raw_name_splits:
if split not in raw_fund_name_splits:
raw_fund_name_splits.append(split)
raw_name = " ".join(raw_fund_name_splits)
return raw_name
def matching_with_database(
self, raw_name: str, parent_id: str = None, matching_type: str = "fund"
):
if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0:
data_info["id"] = ""
data_info["legal_name"] = ""
if matching_type == "fund":
investment_type = 33
else:
investment_type = 1
data_info["investment_type"] = investment_type
data_info["similarity"] = 0
return data_info
if matching_type == "fund":
doc_compare_name_list = self.doc_fund_name_list
doc_compare_mapping = self.doc_fund_mapping
provider_compare_name_list = self.provider_fund_name_list
provider_compare_mapping = self.provider_fund_mapping
compare_name_dp = "FundName"
compare_id_dp = "FundId"
investment_type = 33
else:
if parent_id is not None and len(parent_id) > 0:
# filter self.doc_fund_class_mapping by parent_id as FundId
doc_compare_mapping = self.doc_fund_class_mapping[
self.doc_fund_class_mapping["FundId"] == parent_id
]
doc_compare_name_list = (
doc_compare_mapping["ShareClassName"].unique().tolist()
)
provider_compare_mapping = self.provider_fund_class_mapping[
self.provider_fund_class_mapping["FundId"] == parent_id
]
provider_compare_name_list = (
provider_compare_mapping["ShareClassName"].unique().tolist()
)
else:
doc_compare_name_list = self.doc_share_name_list
doc_compare_mapping = self.doc_fund_class_mapping
provider_compare_name_list = self.provider_share_name_list
provider_compare_mapping = self.provider_fund_class_mapping
compare_name_dp = "ShareClassName"
compare_id_dp = "SecId"
investment_type = 1
data_info = {"name": raw_name}
if len(provider_compare_name_list) > 0:
pre_common_word_list = []
if doc_compare_name_list is not None and len(doc_compare_name_list) > 0:
_, pre_common_word_list = remove_common_word(doc_compare_name_list)
max_similarity_name, max_similarity = get_most_similar_name(
raw_name, doc_compare_name_list)
if max_similarity is not None and max_similarity >= 0.9:
data_info["id"] = doc_compare_mapping[
doc_compare_mapping[compare_name_dp] == max_similarity_name
][compare_id_dp].values[0]
data_info["legal_name"] = max_similarity_name
data_info["similarity"] = max_similarity
if data_info.get("id", None) is None or len(data_info.get("id", "")) == 0:
# set pre_common_word_list, reason: the document mapping for same fund maybe different with provider mapping
# the purpose is to get the most common word list, to improve the similarity.
max_similarity_name, max_similarity = get_most_similar_name(
raw_name, provider_compare_name_list, pre_common_word_list=pre_common_word_list
)
threshold = 0.7
if matching_type == "share":
threshold = 0.5
if max_similarity is not None and max_similarity >= threshold:
data_info["id"] = provider_compare_mapping[
provider_compare_mapping[compare_name_dp] == max_similarity_name
][compare_id_dp].values[0]
data_info["legal_name"] = max_similarity_name
data_info["similarity"] = max_similarity
else:
data_info["id"] = ""
data_info["legal_name"] = ""
data_info["similarity"] = 0
data_info["investment_type"] = investment_type
else:
data_info["id"] = ""
data_info["legal_name"] = ""
data_info["investment_type"] = investment_type
data_info["similarity"] = 0
return data_info