diff --git a/core/data_extraction.py b/core/data_extraction.py index 5abef3e..ecd925e 100644 --- a/core/data_extraction.py +++ b/core/data_extraction.py @@ -19,6 +19,7 @@ class DataExtraction: output_data_folder: str, page_text_dict: dict, datapoint_page_info: dict, + datapoints: list, document_mapping_info_df: pd.DataFrame, ) -> None: self.doc_id = doc_id @@ -43,8 +44,7 @@ class DataExtraction: self.document_mapping_info_df = document_mapping_info_df self.datapoint_page_info = datapoint_page_info self.page_nums_with_datapoints = self.get_page_nums_from_datapoint_page_info() - - self.datapoints = self.get_datapoints_from_datapoint_page_info() + self.datapoints = datapoints self.instructions_config = self.get_instructions_config() self.datapoint_level_config = self.get_datapoint_level() self.datapoint_name_config = self.get_datapoint_name() @@ -72,12 +72,6 @@ class DataExtraction: success, text, page_text_dict = pdf_util.extract_text() return page_text_dict - def get_datapoints_from_datapoint_page_info(self) -> list: - datapoints = list(self.datapoint_page_info.keys()) - if "doc_id" in datapoints: - datapoints.remove("doc_id") - return datapoints - def get_page_nums_from_datapoint_page_info(self) -> list: page_nums_with_datapoints = [] for datapoint, page_nums in self.datapoint_page_info.items(): @@ -218,6 +212,8 @@ class DataExtraction: data = json_repair.loads(response) except: data = {"data": []} + data = self.validate_data(data) + data_dict = {"doc_id": self.doc_id} data_dict["page_index"] = page_num data_dict["datapoints"] = ", ".join(page_datapoints) @@ -226,6 +222,49 @@ class DataExtraction: data_dict["raw_answer"] = response data_dict["extract_data"] = data return data_dict + + def validate_data(self, extract_data_info: dict) -> dict: + """ + Validate data by the rules + 1. Each data should be with fund name + 2. For share level data, it should be with share name + """ + data_list = extract_data_info.get("data", []) + if len(data_list) == 0: + return extract_data_info + remove_list = [] + for data in data_list: + if data.get("fund name", "") == "": + remove_list.append(data) + keys = list(data.keys()) + for key in keys: + if self.datapoint_level_config.get(key, "") == "share_level": + if data.get("share name", "") == "": + remove_list.append(data) + break + for remove_data in remove_list: + if remove_data in data_list: + data_list.remove(remove_data) + # update "fund name" to be "fund_name" + # update "share name" to be "share_name" + new_data_list = [] + for data in data_list: + new_data = {} + fund_name = data.get("fund name", "") + if fund_name != "": + new_data["fund_name"] = fund_name + share_name = data.get("share name", "") + if share_name != "": + new_data["share_name"] = share_name + + for key, value in data.items(): + if key not in ["fund name", "share name"]: + new_data[key] = value + new_data_list.append(new_data) + + extract_data_info["data"] = new_data_list + return extract_data_info + def get_datapoints_by_page_num(self, page_num: int) -> list: datapoints = [] diff --git a/core/data_mapping.py b/core/data_mapping.py new file mode 100644 index 0000000..d8241ac --- /dev/null +++ b/core/data_mapping.py @@ -0,0 +1,323 @@ +import os +import json +import json_repair +import re +import fitz +import pandas as pd +from utils.gpt_utils import chat +from utils.pdf_util import PDFUtil +from utils.biz_utils import get_most_similar_name +from utils.sql_query_util import ( + query_document_fund_mapping, + query_investment_by_provider, +) +from utils.logger import logger +from utils.biz_utils import add_slash_to_text_as_regex, clean_text + + +class DataMapping: + def __init__( + self, + doc_id, + datapoints: list, + raw_document_data_list: list, + document_mapping_info_df: pd.DataFrame, + output_data_folder: str, + ): + self.doc_id = doc_id + self.datapoints = datapoints + self.raw_document_data_list = raw_document_data_list + if document_mapping_info_df is None or len(document_mapping_info_df) == 0: + self.document_mapping_info_df = query_document_fund_mapping(doc_id) + else: + self.document_mapping_info_df = document_mapping_info_df + + if output_data_folder is None or len(output_data_folder) == 0: + output_data_folder = r"/data/emea_ar/output/mapping_data/docs/" + os.makedirs(output_data_folder, exist_ok=True) + + self.output_data_json_folder = os.path.join(output_data_folder, "json/") + os.makedirs(self.output_data_json_folder, exist_ok=True) + + self.output_data_excel_folder = os.path.join(output_data_folder, "excel/") + os.makedirs(self.output_data_excel_folder, exist_ok=True) + + self.set_mapping_data_by_db(self.document_mapping_info_df) + + def set_mapping_data_by_db(self, document_mapping_info_df: pd.DataFrame): + logger.info("Setting document mapping data") + if document_mapping_info_df is None or len(document_mapping_info_df) == 0: + self.document_mapping_info_df = query_document_fund_mapping(self.doc_id) + else: + self.document_mapping_info_df = document_mapping_info_df + if len(self.document_mapping_info_df) == 0: + self.doc_fund_name_list = [] + self.doc_share_name_list = [] + self.doc_fund_mapping = pd.DataFrame() + self.doc_fund_class_mapping = pd.DataFrame() + else: + self.doc_fund_name_list = ( + self.document_mapping_info_df["FundName"].unique().tolist() + ) + self.doc_share_name_list = ( + self.document_mapping_info_df["ShareClassName"].unique().tolist() + ) + self.doc_fund_mapping = self.document_mapping_info_df[ + ["FundId", "FundName"] + ].drop_duplicates() + self.doc_fund_class_mapping = self.document_mapping_info_df[ + ["FundId", "SecId", "ShareClassName", "CurrencyId"] + ].drop_duplicates() + + logger.info("Setting provider mapping data") + self.provider_mapping_df = self.get_provider_mapping() + if len(self.provider_mapping_df) == 0: + self.provider_fund_name_list = [] + self.provider_share_name_list = [] + self.provider_fund_mapping = pd.DataFrame() + self.provider_fund_class_mapping = pd.DataFrame() + else: + self.provider_fund_name_list = ( + self.provider_mapping_df["FundName"].unique().tolist() + ) + self.provider_share_name_list = ( + self.provider_mapping_df["ShareClassName"].unique().tolist() + ) + self.provider_fund_mapping = self.provider_mapping_df[ + ["FundId", "FundName"] + ].drop_duplicates() + self.provider_fund_class_mapping = self.provider_mapping_df[ + ["FundId", "SecId", "ShareClassName", "CurrencyId"] + ].drop_duplicates() + + def get_provider_mapping(self): + if len(self.document_mapping_info_df) == 0: + return pd.DataFrame() + provider_id_list = ( + self.document_mapping_info_df["ProviderId"].unique().tolist() + ) + provider_mapping_list = [] + for provider_id in provider_id_list: + provider_mapping_list.append(query_investment_by_provider(provider_id)) + provider_mapping_df = pd.concat(provider_mapping_list) + provider_mapping_df = provider_mapping_df.drop_duplicates() + provider_mapping_df.reset_index(drop=True, inplace=True) + return provider_mapping_df + + def mapping_raw_data(self): + """ + doc_id, page_index, datapoint, value, + raw_fund_name, fund_id, fund_name, + raw_share_name, share_id, share_name + """ + mapped_data_list = [] + mapped_fund_cache = {} + mapped_share_cache = {} + for page_data in self.raw_document_data_list: + doc_id = page_data.get("doc_id", "") + page_index = page_data.get("page_index", "") + raw_data_list = page_data.get("extract_data", {}).get("data", []) + for raw_data in raw_data_list: + raw_fund_name = raw_data.get("fund_name", "") + if raw_fund_name is None or len(raw_fund_name) == 0: + continue + raw_share_name = raw_data.get("share_name", "") + if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0: + if len(raw_share_name) > 0: + integrated_share_name = self.integrate_share_name(raw_fund_name, raw_share_name) + raw_data_keys = list(raw_data.keys()) + for datapoint in self.datapoints: + if datapoint in raw_data_keys: + mapped_data = { + "doc_id": doc_id, + "page_index": page_index, + "raw_name": integrated_share_name, + "investment_id": "", + "investment_name": "", + "investment_type": 1, + "similarity": 0 + } + mapped_data["datapoint"] = datapoint + mapped_data["value"] = raw_data[datapoint] + mapped_data_list.append(mapped_data) + else: + raw_data_keys = list(raw_data.keys()) + for datapoint in self.datapoints: + if datapoint in raw_data_keys: + mapped_data = { + "doc_id": doc_id, + "page_index": page_index, + "raw_name": raw_fund_name, + "investment_id": "", + "investment_name": "", + "investment_type": 33, + "similarity": 0 + } + mapped_data["datapoint"] = datapoint + mapped_data["value"] = raw_data[datapoint] + mapped_data_list.append(mapped_data) + else: + raw_name = "" + if raw_share_name is not None and len(raw_share_name) > 0: + raw_name = self.integrate_share_name(raw_fund_name, raw_share_name) + if mapped_share_cache.get(raw_name) is not None: + investment_info = mapped_share_cache[raw_name] + else: + if mapped_fund_cache.get(raw_fund_name) is not None: + fund_info = mapped_fund_cache[raw_fund_name] + fund_id = fund_info["id"] + else: + fund_info = self.matching_with_database( + raw_fund_name, "fund" + ) + fund_id = fund_info["id"] + mapped_fund_cache[raw_fund_name] = fund_info + investment_info = self.matching_with_database( + raw_name, fund_id, "share" + ) + mapped_share_cache[raw_name] = investment_info + elif raw_fund_name is not None and len(raw_fund_name) > 0: + raw_name = raw_fund_name + if mapped_fund_cache.get(raw_fund_name) is not None: + investment_info = mapped_fund_cache[raw_fund_name] + else: + investment_info = self.matching_with_database( + raw_name, "fund" + ) + mapped_fund_cache[raw_fund_name] = investment_info + else: + raw_name = "" + investment_info = { + "id": "", + "legal_name": "", + "investment_type": -1, + "similarity": 0 + } + + raw_data_keys = list(raw_data.keys()) + for datapoint in self.datapoints: + if datapoint in raw_data_keys: + mapped_data = { + "doc_id": doc_id, + "page_index": page_index, + "raw_name": raw_name, + "investment_id": investment_info["id"], + "investment_name": investment_info["legal_name"], + "investment_type": investment_info["investment_type"], + "similarity": investment_info["similarity"] + } + mapped_data["datapoint"] = datapoint + mapped_data["value"] = raw_data[datapoint] + mapped_data_list.append(mapped_data) + + json_data_file = os.path.join( + self.output_data_json_folder, f"{self.doc_id}.json" + ) + with open(json_data_file, "w", encoding="utf-8") as f: + json.dump(mapped_data_list, f, ensure_ascii=False, indent=4) + + extract_data_df = pd.DataFrame(self.raw_document_data_list) + extract_data_df.reset_index(drop=True, inplace=True) + + mapping_data_df = pd.DataFrame(mapped_data_list) + mapping_data_df.reset_index(drop=True, inplace=True) + + excel_data_file = os.path.join( + self.output_data_excel_folder, f"{self.doc_id}.xlsx" + ) + with pd.ExcelWriter(excel_data_file) as writer: + mapping_data_df.to_excel(writer, sheet_name="mapping_data", index=False) + extract_data_df.to_excel(writer, sheet_name="extract_data", index=False) + + return mapped_data_list + + def integrate_share_name(self, raw_fund_name: str, raw_share_name: str): + raw_name = "" + if raw_share_name is not None and len(raw_share_name) > 0: + raw_name = raw_share_name + # some share names are very short, + # so we need to combine with fund name + raw_name_splits = raw_name.split() + raw_fund_name_splits = raw_fund_name.split() + for split in raw_name_splits: + if split not in raw_fund_name_splits: + raw_fund_name_splits.append(split) + raw_name = " ".join(raw_fund_name_splits) + return raw_name + + def matching_with_database( + self, raw_name: str, parent_id: str = None, matching_type: str = "fund" + ): + if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0: + data_info["id"] = "" + data_info["legal_name"] = "" + if matching_type == "fund": + investment_type = 33 + else: + investment_type = 1 + data_info["investment_type"] = investment_type + data_info["similarity"] = 0 + return data_info + + if matching_type == "fund": + doc_compare_name_list = self.doc_fund_name_list + doc_compare_mapping = self.doc_fund_mapping + provider_compare_name_list = self.provider_fund_name_list + provider_compare_mapping = self.provider_fund_mapping + compare_name_dp = "FundName" + compare_id_dp = "FundId" + investment_type = 33 + else: + if parent_id is not None and len(parent_id) > 0: + # filter self.doc_fund_class_mapping by parent_id as FundId + doc_compare_mapping = None + doc_compare_name_list = None + + provider_compare_mapping = self.provider_fund_class_mapping[ + self.provider_fund_class_mapping["FundId"] == parent_id + ] + provider_compare_name_list = ( + provider_compare_mapping["ShareClassName"].unique().tolist() + ) + else: + doc_compare_name_list = self.doc_share_name_list + doc_compare_mapping = self.doc_fund_class_mapping + provider_compare_name_list = self.provider_share_name_list + provider_compare_mapping = self.provider_fund_class_mapping + compare_name_dp = "ShareClassName" + compare_id_dp = "SecId" + investment_type = 1 + + data_info = {"name": raw_name} + if len(provider_compare_name_list) > 0: + if doc_compare_name_list is not None and len(doc_compare_name_list) > 0: + max_similarity_name, max_similarity = get_most_similar_name( + raw_name, doc_compare_name_list) + if max_similarity is not None and max_similarity >= 0.9: + data_info["id"] = doc_compare_mapping[ + doc_compare_mapping[compare_name_dp] == max_similarity_name + ][compare_id_dp].values[0] + data_info["legal_name"] = max_similarity_name + data_info["similarity"] = max_similarity + + if data_info.get("id", None) is None or len(data_info.get("id", "")) == 0: + max_similarity_name, max_similarity = get_most_similar_name( + raw_name, provider_compare_name_list + ) + if max_similarity is not None and max_similarity >= 0.5: + data_info["id"] = provider_compare_mapping[ + provider_compare_mapping[compare_name_dp] == max_similarity_name + ][compare_id_dp].values[0] + data_info["legal_name"] = max_similarity_name + data_info["similarity"] = max_similarity + else: + data_info["id"] = "" + data_info["legal_name"] = "" + data_info["similarity"] = 0 + data_info["investment_type"] = investment_type + else: + data_info["id"] = "" + data_info["legal_name"] = "" + data_info["investment_type"] = investment_type + data_info["similarity"] = 0 + return data_info diff --git a/main.py b/main.py index 96d436f..8461b02 100644 --- a/main.py +++ b/main.py @@ -9,26 +9,40 @@ from utils.pdf_download import download_pdf_from_documents_warehouse from utils.sql_query_util import query_document_fund_mapping from core.page_filter import FilterPages from core.data_extraction import DataExtraction +from core.data_mapping import DataMapping from core.metrics import Metrics class EMEA_AR_Parsing: - def __init__(self, - doc_id: str, - pdf_folder: str = r"/data/emea_ar/pdf/", - output_data_folder: str = r"/data/emea_ar/output/extract_data/docs/") -> None: + def __init__( + self, + doc_id: str, + pdf_folder: str = r"/data/emea_ar/pdf/", + output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/", + output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/", + ) -> None: self.doc_id = doc_id self.pdf_folder = pdf_folder os.makedirs(self.pdf_folder, exist_ok=True) self.pdf_file = self.download_pdf() self.document_mapping_info_df = query_document_fund_mapping(doc_id) - if output_data_folder is None or len(output_data_folder) == 0: - output_data_folder = r"/data/emea_ar/output/extract_data/docs/" - self.output_data_folder = output_data_folder - os.makedirs(self.output_data_folder, exist_ok=True) + + if output_extract_data_folder is None or len(output_extract_data_folder) == 0: + output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/" + self.output_extract_data_folder = output_extract_data_folder + os.makedirs(self.output_extract_data_folder, exist_ok=True) + + if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0: + output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/" + self.output_mapping_data_folder = output_mapping_data_folder + os.makedirs(self.output_mapping_data_folder, exist_ok=True) + self.filter_pages = FilterPages( self.doc_id, self.pdf_file, self.document_mapping_info_df ) + self.page_text_dict = self.filter_pages.page_text_dict + self.datapoint_page_info, self.result_details = self.get_datapoint_page_info() + self.datapoints = self.get_datapoints_from_datapoint_page_info() def download_pdf(self) -> str: pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id) @@ -38,30 +52,69 @@ class EMEA_AR_Parsing: datapoint_page_info, result_details = self.filter_pages.start_job() return datapoint_page_info, result_details + def get_datapoints_from_datapoint_page_info(self) -> list: + datapoints = list(self.datapoint_page_info.keys()) + if "doc_id" in datapoints: + datapoints.remove("doc_id") + return datapoints + def extract_data(self, re_run: bool = False) -> list: if not re_run: - output_data_json_folder = os.path.join(self.output_data_folder, "json/") + output_data_json_folder = os.path.join( + self.output_extract_data_folder, "json/" + ) os.makedirs(output_data_json_folder, exist_ok=True) json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") if os.path.exists(json_file): - logger.info(f"The document: {self.doc_id} has been parsed, loading data from {json_file}") + logger.info( + f"The document: {self.doc_id} has been parsed, loading data from {json_file}" + ) with open(json_file, "r", encoding="utf-8") as f: data_from_gpt = json.load(f) return data_from_gpt - - page_text_dict = self.filter_pages.page_text_dict - datapoint_page_info, result_details = self.get_datapoint_page_info() + data_extraction = DataExtraction( self.doc_id, self.pdf_file, - self.output_data_folder, - page_text_dict, - datapoint_page_info, + self.output_extract_data_folder, + self.page_text_dict, + self.datapoint_page_info, + self.datapoints, self.document_mapping_info_df, ) data_from_gpt = data_extraction.extract_data() return data_from_gpt + def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list: + if not re_run: + output_data_json_folder = os.path.join( + self.output_mapping_data_folder, "json/" + ) + os.makedirs(output_data_json_folder, exist_ok=True) + json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") + if os.path.exists(json_file): + logger.info( + f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}" + ) + with open(json_file, "r", encoding="utf-8") as f: + doc_mapping_data = json.load(f) + return doc_mapping_data + """ + doc_id, + datapoints: list, + raw_document_data_list: list, + document_mapping_info_df: pd.DataFrame, + output_data_folder: str, + """ + data_mapping = DataMapping( + self.doc_id, + self.datapoints, + data_from_gpt, + self.document_mapping_info_df, + self.output_mapping_data_folder, + ) + return data_mapping.mapping_raw_data() + def filter_pages(doc_id: str, pdf_folder: str) -> None: logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}") @@ -70,16 +123,39 @@ def filter_pages(doc_id: str, pdf_folder: str) -> None: return datapoint_page_info, result_details -def extract_data(doc_id: str, - pdf_folder: str, - output_data_folder: str, - re_run: bool = False) -> None: +def extract_data( + doc_id: str, pdf_folder: str, output_data_folder: str, re_run: bool = False +) -> None: logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") - emea_ar_parsing = EMEA_AR_Parsing(doc_id, pdf_folder, output_data_folder) + emea_ar_parsing = EMEA_AR_Parsing( + doc_id, pdf_folder, output_extract_data_folder=output_data_folder + ) data_from_gpt = emea_ar_parsing.extract_data(re_run) return data_from_gpt +def mapping_data( + doc_id: str, + pdf_folder: str, + output_extract_data_folder: str, + output_mapping_folder: str, + re_run_extract_data: bool = False, + re_run_mapping_data: bool = False, +) -> None: + logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") + emea_ar_parsing = EMEA_AR_Parsing( + doc_id, + pdf_folder, + output_extract_data_folder=output_extract_data_folder, + output_mapping_data_folder=output_mapping_folder, + ) + doc_data_from_gpt = emea_ar_parsing.extract_data(re_run=re_run_extract_data) + doc_mapping_data = emea_ar_parsing.mapping_data( + data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data + ) + return doc_data_from_gpt, doc_mapping_data + + def batch_extract_data( pdf_folder: str, doc_data_excel_file: str = None, @@ -101,7 +177,7 @@ def batch_extract_data( doc_data_df = pd.read_excel(doc_data_excel_file) doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] - + result_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file) @@ -109,27 +185,106 @@ def batch_extract_data( if doc_list is not None and doc_id not in doc_list: continue data_from_gpt = extract_data( - doc_id=doc_id, - pdf_folder=pdf_folder, + doc_id=doc_id, + pdf_folder=pdf_folder, output_data_folder=output_child_folder, - re_run=re_run + re_run=re_run, ) result_list.extend(data_from_gpt) - result_df = pd.DataFrame(result_list) - result_df.reset_index(drop=True, inplace=True) + if special_doc_id_list is None or len(special_doc_id_list) == 0: + result_df = pd.DataFrame(result_list) + result_df.reset_index(drop=True, inplace=True) - logger.info(f"Saving the result to {output_total_folder}") - os.makedirs(output_total_folder, exist_ok=True) - time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) - output_file = os.path.join( - output_total_folder, - f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx", - ) - with pd.ExcelWriter(output_file) as writer: - result_df.to_excel(writer, index=False, sheet_name="extract_data_info") + logger.info(f"Saving the result to {output_total_folder}") + os.makedirs(output_total_folder, exist_ok=True) + time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) + output_file = os.path.join( + output_total_folder, + f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx", + ) + with pd.ExcelWriter(output_file) as writer: + result_df.to_excel(writer, index=False, sheet_name="extract_data_info") +def batch_start_job( + pdf_folder: str, + doc_data_excel_file: str = None, + output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/", + output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/", + output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/", + output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/", + special_doc_id_list: list = None, + re_run_extract_data: bool = False, + re_run_mapping_data: bool = False, +): + pdf_files = glob(pdf_folder + "*.pdf") + doc_list = [] + if special_doc_id_list is not None and len(special_doc_id_list) > 0: + doc_list = special_doc_id_list + if ( + len(doc_list) == 0 + and doc_data_excel_file is not None + and len(doc_data_excel_file) > 0 + and os.path.exists(doc_data_excel_file) + ): + doc_data_df = pd.read_excel(doc_data_excel_file) + doc_data_df = doc_data_df[doc_data_df["Checked"] == 1] + doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()] + + result_extract_data_list = [] + result_mapping_data_list = [] + for pdf_file in tqdm(pdf_files): + pdf_base_name = os.path.basename(pdf_file) + doc_id = pdf_base_name.split(".")[0] + if doc_list is not None and doc_id not in doc_list: + continue + doc_data_from_gpt, doc_mapping_data_list = mapping_data( + doc_id=doc_id, + pdf_folder=pdf_folder, + output_extract_data_folder=output_extract_data_child_folder, + output_mapping_folder=output_mapping_child_folder, + re_run_extract_data=re_run_extract_data, + re_run_mapping_data=re_run_mapping_data, + ) + result_extract_data_list.extend(doc_data_from_gpt) + result_mapping_data_list.extend(doc_mapping_data_list) + + if special_doc_id_list is None or len(special_doc_id_list) == 0: + result_extract_data_df = pd.DataFrame(result_extract_data_list) + result_extract_data_df.reset_index(drop=True, inplace=True) + + result_mappingdata_df = pd.DataFrame(result_mapping_data_list) + result_mappingdata_df.reset_index(drop=True, inplace=True) + + logger.info(f"Saving extract data to {output_extract_data_total_folder}") + os.makedirs(output_extract_data_total_folder, exist_ok=True) + time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) + output_file = os.path.join( + output_extract_data_total_folder, + f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx", + ) + with pd.ExcelWriter(output_file) as writer: + result_extract_data_df.to_excel( + writer, index=False, sheet_name="extract_data_info" + ) + + logger.info(f"Saving mapping data to {output_mapping_total_folder}") + os.makedirs(output_mapping_total_folder, exist_ok=True) + time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) + output_file = os.path.join( + output_mapping_total_folder, + f"mapping_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx", + ) + with pd.ExcelWriter(output_file) as writer: + result_mappingdata_df.to_excel( + writer, index=False, sheet_name="mapping_data" + ) + result_extract_data_df.to_excel( + writer, index=False, sheet_name="extract_data" + ) + + def batch_filter_pdf_files( pdf_folder: str, doc_data_excel_file: str = None, @@ -335,16 +490,33 @@ if __name__ == "__main__": # ) # test_auto_generate_instructions() - - output_child_folder = r"/data/emea_ar/output/extract_data/docs/" - output_total_folder = r"/data/emea_ar/output/extract_data/total/" - re_run = True - batch_extract_data(pdf_folder, - page_filter_ground_truth_file, - output_child_folder, - output_total_folder, - special_doc_id_list, - re_run) + + output_extract_data_child_folder = r"/data/emea_ar/output/extract_data/docs/" + output_extract_data_total_folder = r"/data/emea_ar/output/extract_data/total/" + re_run_extract_data = False + # batch_extract_data( + # pdf_folder, + # page_filter_ground_truth_file, + # output_extract_data_child_folder, + # output_extract_data_total_folder, + # special_doc_id_list, + # re_run, + # ) # doc_id = "476492237" - # extract_data(doc_id, pdf_folder, output_child_folder, re_run) + # extract_data(doc_id, pdf_folder, output_extract_data_child_folder, re_run) + special_doc_id_list = [] + output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/" + output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/" + re_run_mapping_data = True + batch_start_job( + pdf_folder, + page_filter_ground_truth_file, + output_extract_data_child_folder, + output_mapping_child_folder, + output_extract_data_total_folder, + output_mapping_total_folder, + special_doc_id_list, + re_run_extract_data, + re_run_mapping_data, + ) diff --git a/utils/biz_utils.py b/utils/biz_utils.py index 8517652..5e7824e 100644 --- a/utils/biz_utils.py +++ b/utils/biz_utils.py @@ -1,4 +1,5 @@ import re +from copy import deepcopy def add_slash_to_text_as_regex(text: str): if text is None or len(text) == 0: @@ -19,4 +20,214 @@ def clean_text(text: str) -> str: # update the specical character which begin with \u, e.g \u2004 or \u00a0 to be space text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text) text = re.sub(r"( ){2,}", ' ', text.strip()) - return text \ No newline at end of file + return text + + +def get_most_similar_name(text: str, name_list: list): + """ + Get the most similar fund name from fund_name_list by jacard similarity + """ + try: + copy_fund_name_list = deepcopy(name_list) + if text is None or len(text.split()) == 0 or \ + copy_fund_name_list is None or len(copy_fund_name_list) == 0: + return None, None + + copy_fund_name_list = [replace_abbrevation(copy_fund_name) for copy_fund_name + in copy_fund_name_list] + + # get common words in fund_name_list + common_word_list = [] + if len(name_list) > 1: + _, common_word_list = remove_common_word(copy_fund_name_list) + + text = text.strip() + text = remove_special_characters(text) + text = replace_abbrevation(text) + text_splits = text.split() + if len(text_splits) == 1: + text = split_words_without_space(text) + else: + new_splits = [] + for split in text_splits: + if len(split) > 1: + new_splits.extend(split_words_without_space(split).split()) + else: + new_splits.append(split) + + lower_new_splits = [split.lower() for split in new_splits] + for word in common_word_list: + if word not in lower_new_splits: + # remove word in fund_name_list + for i in range(len(copy_fund_name_list)): + temp_splits = copy_fund_name_list[i].split() + for temp in temp_splits: + if remove_special_characters(temp).lower() == word: + copy_fund_name_list[i] = re.sub(r'\s+', ' ', + copy_fund_name_list[i].replace(temp, ' ')) + + for i in range(len(copy_fund_name_list)): + temp_splits = copy_fund_name_list[i].split() + for temp in temp_splits: + if remove_special_characters(temp).lower() in ['fund', 'portfolio', 'class', 'share', 'shares']: + copy_fund_name_list[i] = \ + re.sub(r'\s+', ' ', copy_fund_name_list[i].replace(temp, ' ')) + final_splits = [] + for split in new_splits: + if split.lower() not in ['fund', 'portfolio', 'class', 'share', 'shares']: + final_splits.append(split) + + text = ' '.join(final_splits) + max_similarity = 0 + max_similarity_fund_name = None + for fund_name, copy_fund_name in zip(name_list , copy_fund_name_list): + copy_fund_name = remove_special_characters(copy_fund_name) + copy_fund_name = split_words_without_space(copy_fund_name) + similarity = get_jacard_similarity(text, + copy_fund_name, + need_remove_numeric_characters=False) + if similarity > max_similarity: + max_similarity = similarity + max_similarity_fund_name = fund_name + if max_similarity == 1: + break + if max_similarity < 0.35: + return None, max_similarity + return max_similarity_fund_name, max_similarity + except Exception as e: + print(e) + return None, 0.0 + + +def remove_common_word(text_list: list): + if text_list is None or len(text_list) == 0: + return text_list + new_text_list = [] + for text in text_list: + text = text.lower() + text = remove_special_characters(text) + text_splits = text.split() + while 'fund' in text_splits: + text_splits.remove('fund') + while 'portfolio' in text_splits: + text_splits.remove('portfolio') + while 'share' in text_splits: + text_splits.remove('share') + while 'class' in text_splits: + text_splits.remove('class') + text = ' '.join(text_splits) + new_text_list.append(text) + # remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words + # the result is ['Global', 'Growth'] + common_word_list = [] + new_text_splits_list = [text.split() for text in new_text_list] + for i in range(len(new_text_splits_list)): + for j in range(i+1, len(new_text_splits_list)): + if common_word_list is None or len(common_word_list) == 0: + common_word_list = list( + set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j]))) + else: + common_word_list = list( + set(common_word_list).intersection(set(new_text_splits_list[j]))) + common_word_list = list(set(common_word_list)) + for i in range(len(new_text_splits_list)): + for common_word in common_word_list: + if common_word in new_text_splits_list[i]: + new_text_splits_list[i].remove(common_word) + new_text_list = [' '.join(text_splits) + for text_splits in new_text_splits_list] + return new_text_list, common_word_list + + +def split_words_without_space(text: str): + """ + Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund' + """ + if text is None or len(text.strip()) == 0: + return [] + text = text.strip() + # splits = text.split() + # if len(splits) > 1: + # return text + # find all words with capital letter + lower letter + regex = r'[A-Z][a-z]+' + word_list = re.findall(regex, text) + if len(word_list) > 0: + for word in word_list: + text = text.replace(word, ' ' + word + ' ') + text = re.sub(r'(\s)+', ' ', text) + return text.strip() + + +def remove_special_characters(text): + text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text) + text = re.sub(r'\s+', ' ', text) + text = text.strip() + return text + + +def remove_numeric_characters(text): + # remove numeric characters + text = re.sub(r'\d+', ' ', text) + text = re.sub(r'\s+', ' ', text) + text = text.strip() + return text + + +def get_jacard_similarity(text_left, + text_right, + need_remove_special_characters=True, + need_remove_numeric_characters=True): + if need_remove_special_characters: + text_left = remove_special_characters(text_left) + text_right = remove_special_characters(text_right) + if need_remove_numeric_characters: + text_left = remove_numeric_characters(text_left) + text_right = remove_numeric_characters(text_right) + text_left = text_left.lower() + text_right = text_right.lower() + text_left = text_left.split() + text_right = text_right.split() + intersection = set(text_left).intersection(set(text_right)) + union = set(text_left).union(set(text_right)) + if len(union) > 0: + return round(len(intersection) / len(union), 3) + else: + return 0 + + +def replace_abbrevation(text: str): + if text is None or len(text.strip()) == 0: + return text + text = text.strip() + text_splits = text.split() + new_text_splits = [] + for split in text_splits: + if split.lower() in ['acc']: + new_text_splits.append('Accumulation') + elif split.lower() in ['inc']: + new_text_splits.append('Income') + elif split.lower() in ['dist']: + new_text_splits.append('Distribution') + elif split.lower() in ['inv']: + new_text_splits.append('Investor') + elif split.lower() in ['inst', 'inst', 'institution']: + new_text_splits.append('Institutional') + elif split.lower() in ['adm']: + new_text_splits.append('Admin') + elif split.lower() in ['adv']: + new_text_splits.append('Advantage') + elif split.lower() in ['hdg', 'hgd', '(h)']: + new_text_splits.append('Hedged') + elif split.lower() in ['cl']: + new_text_splits.append('Class') + elif split.lower() in ['ser']: + new_text_splits.append('Series') + elif split.lower() in ['u.s.']: + new_text_splits.append('US') + elif split.lower() in ['nc']: + new_text_splits.append('no trail') + else: + new_text_splits.append(split) + new_text = ' '.join(new_text_splits) + return new_text \ No newline at end of file