support auto-mapping fund/ share by raw names.

This commit is contained in:
Blade He 2024-09-09 17:34:53 -05:00
parent 878383a72c
commit 0887608719
4 changed files with 801 additions and 56 deletions

View File

@ -19,6 +19,7 @@ class DataExtraction:
output_data_folder: str, output_data_folder: str,
page_text_dict: dict, page_text_dict: dict,
datapoint_page_info: dict, datapoint_page_info: dict,
datapoints: list,
document_mapping_info_df: pd.DataFrame, document_mapping_info_df: pd.DataFrame,
) -> None: ) -> None:
self.doc_id = doc_id self.doc_id = doc_id
@ -43,8 +44,7 @@ class DataExtraction:
self.document_mapping_info_df = document_mapping_info_df self.document_mapping_info_df = document_mapping_info_df
self.datapoint_page_info = datapoint_page_info self.datapoint_page_info = datapoint_page_info
self.page_nums_with_datapoints = self.get_page_nums_from_datapoint_page_info() self.page_nums_with_datapoints = self.get_page_nums_from_datapoint_page_info()
self.datapoints = datapoints
self.datapoints = self.get_datapoints_from_datapoint_page_info()
self.instructions_config = self.get_instructions_config() self.instructions_config = self.get_instructions_config()
self.datapoint_level_config = self.get_datapoint_level() self.datapoint_level_config = self.get_datapoint_level()
self.datapoint_name_config = self.get_datapoint_name() self.datapoint_name_config = self.get_datapoint_name()
@ -72,12 +72,6 @@ class DataExtraction:
success, text, page_text_dict = pdf_util.extract_text() success, text, page_text_dict = pdf_util.extract_text()
return page_text_dict return page_text_dict
def get_datapoints_from_datapoint_page_info(self) -> list:
datapoints = list(self.datapoint_page_info.keys())
if "doc_id" in datapoints:
datapoints.remove("doc_id")
return datapoints
def get_page_nums_from_datapoint_page_info(self) -> list: def get_page_nums_from_datapoint_page_info(self) -> list:
page_nums_with_datapoints = [] page_nums_with_datapoints = []
for datapoint, page_nums in self.datapoint_page_info.items(): for datapoint, page_nums in self.datapoint_page_info.items():
@ -218,6 +212,8 @@ class DataExtraction:
data = json_repair.loads(response) data = json_repair.loads(response)
except: except:
data = {"data": []} data = {"data": []}
data = self.validate_data(data)
data_dict = {"doc_id": self.doc_id} data_dict = {"doc_id": self.doc_id}
data_dict["page_index"] = page_num data_dict["page_index"] = page_num
data_dict["datapoints"] = ", ".join(page_datapoints) data_dict["datapoints"] = ", ".join(page_datapoints)
@ -227,6 +223,49 @@ class DataExtraction:
data_dict["extract_data"] = data data_dict["extract_data"] = data
return data_dict return data_dict
def validate_data(self, extract_data_info: dict) -> dict:
"""
Validate data by the rules
1. Each data should be with fund name
2. For share level data, it should be with share name
"""
data_list = extract_data_info.get("data", [])
if len(data_list) == 0:
return extract_data_info
remove_list = []
for data in data_list:
if data.get("fund name", "") == "":
remove_list.append(data)
keys = list(data.keys())
for key in keys:
if self.datapoint_level_config.get(key, "") == "share_level":
if data.get("share name", "") == "":
remove_list.append(data)
break
for remove_data in remove_list:
if remove_data in data_list:
data_list.remove(remove_data)
# update "fund name" to be "fund_name"
# update "share name" to be "share_name"
new_data_list = []
for data in data_list:
new_data = {}
fund_name = data.get("fund name", "")
if fund_name != "":
new_data["fund_name"] = fund_name
share_name = data.get("share name", "")
if share_name != "":
new_data["share_name"] = share_name
for key, value in data.items():
if key not in ["fund name", "share name"]:
new_data[key] = value
new_data_list.append(new_data)
extract_data_info["data"] = new_data_list
return extract_data_info
def get_datapoints_by_page_num(self, page_num: int) -> list: def get_datapoints_by_page_num(self, page_num: int) -> list:
datapoints = [] datapoints = []
for datapoint in self.datapoints: for datapoint in self.datapoints:

323
core/data_mapping.py Normal file
View File

@ -0,0 +1,323 @@
import os
import json
import json_repair
import re
import fitz
import pandas as pd
from utils.gpt_utils import chat
from utils.pdf_util import PDFUtil
from utils.biz_utils import get_most_similar_name
from utils.sql_query_util import (
query_document_fund_mapping,
query_investment_by_provider,
)
from utils.logger import logger
from utils.biz_utils import add_slash_to_text_as_regex, clean_text
class DataMapping:
def __init__(
self,
doc_id,
datapoints: list,
raw_document_data_list: list,
document_mapping_info_df: pd.DataFrame,
output_data_folder: str,
):
self.doc_id = doc_id
self.datapoints = datapoints
self.raw_document_data_list = raw_document_data_list
if document_mapping_info_df is None or len(document_mapping_info_df) == 0:
self.document_mapping_info_df = query_document_fund_mapping(doc_id)
else:
self.document_mapping_info_df = document_mapping_info_df
if output_data_folder is None or len(output_data_folder) == 0:
output_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
os.makedirs(output_data_folder, exist_ok=True)
self.output_data_json_folder = os.path.join(output_data_folder, "json/")
os.makedirs(self.output_data_json_folder, exist_ok=True)
self.output_data_excel_folder = os.path.join(output_data_folder, "excel/")
os.makedirs(self.output_data_excel_folder, exist_ok=True)
self.set_mapping_data_by_db(self.document_mapping_info_df)
def set_mapping_data_by_db(self, document_mapping_info_df: pd.DataFrame):
logger.info("Setting document mapping data")
if document_mapping_info_df is None or len(document_mapping_info_df) == 0:
self.document_mapping_info_df = query_document_fund_mapping(self.doc_id)
else:
self.document_mapping_info_df = document_mapping_info_df
if len(self.document_mapping_info_df) == 0:
self.doc_fund_name_list = []
self.doc_share_name_list = []
self.doc_fund_mapping = pd.DataFrame()
self.doc_fund_class_mapping = pd.DataFrame()
else:
self.doc_fund_name_list = (
self.document_mapping_info_df["FundName"].unique().tolist()
)
self.doc_share_name_list = (
self.document_mapping_info_df["ShareClassName"].unique().tolist()
)
self.doc_fund_mapping = self.document_mapping_info_df[
["FundId", "FundName"]
].drop_duplicates()
self.doc_fund_class_mapping = self.document_mapping_info_df[
["FundId", "SecId", "ShareClassName", "CurrencyId"]
].drop_duplicates()
logger.info("Setting provider mapping data")
self.provider_mapping_df = self.get_provider_mapping()
if len(self.provider_mapping_df) == 0:
self.provider_fund_name_list = []
self.provider_share_name_list = []
self.provider_fund_mapping = pd.DataFrame()
self.provider_fund_class_mapping = pd.DataFrame()
else:
self.provider_fund_name_list = (
self.provider_mapping_df["FundName"].unique().tolist()
)
self.provider_share_name_list = (
self.provider_mapping_df["ShareClassName"].unique().tolist()
)
self.provider_fund_mapping = self.provider_mapping_df[
["FundId", "FundName"]
].drop_duplicates()
self.provider_fund_class_mapping = self.provider_mapping_df[
["FundId", "SecId", "ShareClassName", "CurrencyId"]
].drop_duplicates()
def get_provider_mapping(self):
if len(self.document_mapping_info_df) == 0:
return pd.DataFrame()
provider_id_list = (
self.document_mapping_info_df["ProviderId"].unique().tolist()
)
provider_mapping_list = []
for provider_id in provider_id_list:
provider_mapping_list.append(query_investment_by_provider(provider_id))
provider_mapping_df = pd.concat(provider_mapping_list)
provider_mapping_df = provider_mapping_df.drop_duplicates()
provider_mapping_df.reset_index(drop=True, inplace=True)
return provider_mapping_df
def mapping_raw_data(self):
"""
doc_id, page_index, datapoint, value,
raw_fund_name, fund_id, fund_name,
raw_share_name, share_id, share_name
"""
mapped_data_list = []
mapped_fund_cache = {}
mapped_share_cache = {}
for page_data in self.raw_document_data_list:
doc_id = page_data.get("doc_id", "")
page_index = page_data.get("page_index", "")
raw_data_list = page_data.get("extract_data", {}).get("data", [])
for raw_data in raw_data_list:
raw_fund_name = raw_data.get("fund_name", "")
if raw_fund_name is None or len(raw_fund_name) == 0:
continue
raw_share_name = raw_data.get("share_name", "")
if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0:
if len(raw_share_name) > 0:
integrated_share_name = self.integrate_share_name(raw_fund_name, raw_share_name)
raw_data_keys = list(raw_data.keys())
for datapoint in self.datapoints:
if datapoint in raw_data_keys:
mapped_data = {
"doc_id": doc_id,
"page_index": page_index,
"raw_name": integrated_share_name,
"investment_id": "",
"investment_name": "",
"investment_type": 1,
"similarity": 0
}
mapped_data["datapoint"] = datapoint
mapped_data["value"] = raw_data[datapoint]
mapped_data_list.append(mapped_data)
else:
raw_data_keys = list(raw_data.keys())
for datapoint in self.datapoints:
if datapoint in raw_data_keys:
mapped_data = {
"doc_id": doc_id,
"page_index": page_index,
"raw_name": raw_fund_name,
"investment_id": "",
"investment_name": "",
"investment_type": 33,
"similarity": 0
}
mapped_data["datapoint"] = datapoint
mapped_data["value"] = raw_data[datapoint]
mapped_data_list.append(mapped_data)
else:
raw_name = ""
if raw_share_name is not None and len(raw_share_name) > 0:
raw_name = self.integrate_share_name(raw_fund_name, raw_share_name)
if mapped_share_cache.get(raw_name) is not None:
investment_info = mapped_share_cache[raw_name]
else:
if mapped_fund_cache.get(raw_fund_name) is not None:
fund_info = mapped_fund_cache[raw_fund_name]
fund_id = fund_info["id"]
else:
fund_info = self.matching_with_database(
raw_fund_name, "fund"
)
fund_id = fund_info["id"]
mapped_fund_cache[raw_fund_name] = fund_info
investment_info = self.matching_with_database(
raw_name, fund_id, "share"
)
mapped_share_cache[raw_name] = investment_info
elif raw_fund_name is not None and len(raw_fund_name) > 0:
raw_name = raw_fund_name
if mapped_fund_cache.get(raw_fund_name) is not None:
investment_info = mapped_fund_cache[raw_fund_name]
else:
investment_info = self.matching_with_database(
raw_name, "fund"
)
mapped_fund_cache[raw_fund_name] = investment_info
else:
raw_name = ""
investment_info = {
"id": "",
"legal_name": "",
"investment_type": -1,
"similarity": 0
}
raw_data_keys = list(raw_data.keys())
for datapoint in self.datapoints:
if datapoint in raw_data_keys:
mapped_data = {
"doc_id": doc_id,
"page_index": page_index,
"raw_name": raw_name,
"investment_id": investment_info["id"],
"investment_name": investment_info["legal_name"],
"investment_type": investment_info["investment_type"],
"similarity": investment_info["similarity"]
}
mapped_data["datapoint"] = datapoint
mapped_data["value"] = raw_data[datapoint]
mapped_data_list.append(mapped_data)
json_data_file = os.path.join(
self.output_data_json_folder, f"{self.doc_id}.json"
)
with open(json_data_file, "w", encoding="utf-8") as f:
json.dump(mapped_data_list, f, ensure_ascii=False, indent=4)
extract_data_df = pd.DataFrame(self.raw_document_data_list)
extract_data_df.reset_index(drop=True, inplace=True)
mapping_data_df = pd.DataFrame(mapped_data_list)
mapping_data_df.reset_index(drop=True, inplace=True)
excel_data_file = os.path.join(
self.output_data_excel_folder, f"{self.doc_id}.xlsx"
)
with pd.ExcelWriter(excel_data_file) as writer:
mapping_data_df.to_excel(writer, sheet_name="mapping_data", index=False)
extract_data_df.to_excel(writer, sheet_name="extract_data", index=False)
return mapped_data_list
def integrate_share_name(self, raw_fund_name: str, raw_share_name: str):
raw_name = ""
if raw_share_name is not None and len(raw_share_name) > 0:
raw_name = raw_share_name
# some share names are very short,
# so we need to combine with fund name
raw_name_splits = raw_name.split()
raw_fund_name_splits = raw_fund_name.split()
for split in raw_name_splits:
if split not in raw_fund_name_splits:
raw_fund_name_splits.append(split)
raw_name = " ".join(raw_fund_name_splits)
return raw_name
def matching_with_database(
self, raw_name: str, parent_id: str = None, matching_type: str = "fund"
):
if len(self.doc_fund_name_list) == 0 and len(self.provider_fund_name_list) == 0:
data_info["id"] = ""
data_info["legal_name"] = ""
if matching_type == "fund":
investment_type = 33
else:
investment_type = 1
data_info["investment_type"] = investment_type
data_info["similarity"] = 0
return data_info
if matching_type == "fund":
doc_compare_name_list = self.doc_fund_name_list
doc_compare_mapping = self.doc_fund_mapping
provider_compare_name_list = self.provider_fund_name_list
provider_compare_mapping = self.provider_fund_mapping
compare_name_dp = "FundName"
compare_id_dp = "FundId"
investment_type = 33
else:
if parent_id is not None and len(parent_id) > 0:
# filter self.doc_fund_class_mapping by parent_id as FundId
doc_compare_mapping = None
doc_compare_name_list = None
provider_compare_mapping = self.provider_fund_class_mapping[
self.provider_fund_class_mapping["FundId"] == parent_id
]
provider_compare_name_list = (
provider_compare_mapping["ShareClassName"].unique().tolist()
)
else:
doc_compare_name_list = self.doc_share_name_list
doc_compare_mapping = self.doc_fund_class_mapping
provider_compare_name_list = self.provider_share_name_list
provider_compare_mapping = self.provider_fund_class_mapping
compare_name_dp = "ShareClassName"
compare_id_dp = "SecId"
investment_type = 1
data_info = {"name": raw_name}
if len(provider_compare_name_list) > 0:
if doc_compare_name_list is not None and len(doc_compare_name_list) > 0:
max_similarity_name, max_similarity = get_most_similar_name(
raw_name, doc_compare_name_list)
if max_similarity is not None and max_similarity >= 0.9:
data_info["id"] = doc_compare_mapping[
doc_compare_mapping[compare_name_dp] == max_similarity_name
][compare_id_dp].values[0]
data_info["legal_name"] = max_similarity_name
data_info["similarity"] = max_similarity
if data_info.get("id", None) is None or len(data_info.get("id", "")) == 0:
max_similarity_name, max_similarity = get_most_similar_name(
raw_name, provider_compare_name_list
)
if max_similarity is not None and max_similarity >= 0.5:
data_info["id"] = provider_compare_mapping[
provider_compare_mapping[compare_name_dp] == max_similarity_name
][compare_id_dp].values[0]
data_info["legal_name"] = max_similarity_name
data_info["similarity"] = max_similarity
else:
data_info["id"] = ""
data_info["legal_name"] = ""
data_info["similarity"] = 0
data_info["investment_type"] = investment_type
else:
data_info["id"] = ""
data_info["legal_name"] = ""
data_info["investment_type"] = investment_type
data_info["similarity"] = 0
return data_info

230
main.py
View File

@ -9,26 +9,40 @@ from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.sql_query_util import query_document_fund_mapping from utils.sql_query_util import query_document_fund_mapping
from core.page_filter import FilterPages from core.page_filter import FilterPages
from core.data_extraction import DataExtraction from core.data_extraction import DataExtraction
from core.data_mapping import DataMapping
from core.metrics import Metrics from core.metrics import Metrics
class EMEA_AR_Parsing: class EMEA_AR_Parsing:
def __init__(self, def __init__(
self,
doc_id: str, doc_id: str,
pdf_folder: str = r"/data/emea_ar/pdf/", pdf_folder: str = r"/data/emea_ar/pdf/",
output_data_folder: str = r"/data/emea_ar/output/extract_data/docs/") -> None: output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
) -> None:
self.doc_id = doc_id self.doc_id = doc_id
self.pdf_folder = pdf_folder self.pdf_folder = pdf_folder
os.makedirs(self.pdf_folder, exist_ok=True) os.makedirs(self.pdf_folder, exist_ok=True)
self.pdf_file = self.download_pdf() self.pdf_file = self.download_pdf()
self.document_mapping_info_df = query_document_fund_mapping(doc_id) self.document_mapping_info_df = query_document_fund_mapping(doc_id)
if output_data_folder is None or len(output_data_folder) == 0:
output_data_folder = r"/data/emea_ar/output/extract_data/docs/" if output_extract_data_folder is None or len(output_extract_data_folder) == 0:
self.output_data_folder = output_data_folder output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/"
os.makedirs(self.output_data_folder, exist_ok=True) self.output_extract_data_folder = output_extract_data_folder
os.makedirs(self.output_extract_data_folder, exist_ok=True)
if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0:
output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
self.output_mapping_data_folder = output_mapping_data_folder
os.makedirs(self.output_mapping_data_folder, exist_ok=True)
self.filter_pages = FilterPages( self.filter_pages = FilterPages(
self.doc_id, self.pdf_file, self.document_mapping_info_df self.doc_id, self.pdf_file, self.document_mapping_info_df
) )
self.page_text_dict = self.filter_pages.page_text_dict
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
self.datapoints = self.get_datapoints_from_datapoint_page_info()
def download_pdf(self) -> str: def download_pdf(self) -> str:
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id) pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
@ -38,30 +52,69 @@ class EMEA_AR_Parsing:
datapoint_page_info, result_details = self.filter_pages.start_job() datapoint_page_info, result_details = self.filter_pages.start_job()
return datapoint_page_info, result_details return datapoint_page_info, result_details
def get_datapoints_from_datapoint_page_info(self) -> list:
datapoints = list(self.datapoint_page_info.keys())
if "doc_id" in datapoints:
datapoints.remove("doc_id")
return datapoints
def extract_data(self, re_run: bool = False) -> list: def extract_data(self, re_run: bool = False) -> list:
if not re_run: if not re_run:
output_data_json_folder = os.path.join(self.output_data_folder, "json/") output_data_json_folder = os.path.join(
self.output_extract_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True) os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json") json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file): if os.path.exists(json_file):
logger.info(f"The document: {self.doc_id} has been parsed, loading data from {json_file}") logger.info(
f"The document: {self.doc_id} has been parsed, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f: with open(json_file, "r", encoding="utf-8") as f:
data_from_gpt = json.load(f) data_from_gpt = json.load(f)
return data_from_gpt return data_from_gpt
page_text_dict = self.filter_pages.page_text_dict
datapoint_page_info, result_details = self.get_datapoint_page_info()
data_extraction = DataExtraction( data_extraction = DataExtraction(
self.doc_id, self.doc_id,
self.pdf_file, self.pdf_file,
self.output_data_folder, self.output_extract_data_folder,
page_text_dict, self.page_text_dict,
datapoint_page_info, self.datapoint_page_info,
self.datapoints,
self.document_mapping_info_df, self.document_mapping_info_df,
) )
data_from_gpt = data_extraction.extract_data() data_from_gpt = data_extraction.extract_data()
return data_from_gpt return data_from_gpt
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
if not re_run:
output_data_json_folder = os.path.join(
self.output_mapping_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
doc_mapping_data = json.load(f)
return doc_mapping_data
"""
doc_id,
datapoints: list,
raw_document_data_list: list,
document_mapping_info_df: pd.DataFrame,
output_data_folder: str,
"""
data_mapping = DataMapping(
self.doc_id,
self.datapoints,
data_from_gpt,
self.document_mapping_info_df,
self.output_mapping_data_folder,
)
return data_mapping.mapping_raw_data()
def filter_pages(doc_id: str, pdf_folder: str) -> None: def filter_pages(doc_id: str, pdf_folder: str) -> None:
logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}") logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}")
@ -70,16 +123,39 @@ def filter_pages(doc_id: str, pdf_folder: str) -> None:
return datapoint_page_info, result_details return datapoint_page_info, result_details
def extract_data(doc_id: str, def extract_data(
pdf_folder: str, doc_id: str, pdf_folder: str, output_data_folder: str, re_run: bool = False
output_data_folder: str, ) -> None:
re_run: bool = False) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}") logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(doc_id, pdf_folder, output_data_folder) emea_ar_parsing = EMEA_AR_Parsing(
doc_id, pdf_folder, output_extract_data_folder=output_data_folder
)
data_from_gpt = emea_ar_parsing.extract_data(re_run) data_from_gpt = emea_ar_parsing.extract_data(re_run)
return data_from_gpt return data_from_gpt
def mapping_data(
doc_id: str,
pdf_folder: str,
output_extract_data_folder: str,
output_mapping_folder: str,
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
pdf_folder,
output_extract_data_folder=output_extract_data_folder,
output_mapping_data_folder=output_mapping_folder,
)
doc_data_from_gpt = emea_ar_parsing.extract_data(re_run=re_run_extract_data)
doc_mapping_data = emea_ar_parsing.mapping_data(
data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data
)
return doc_data_from_gpt, doc_mapping_data
def batch_extract_data( def batch_extract_data(
pdf_folder: str, pdf_folder: str,
doc_data_excel_file: str = None, doc_data_excel_file: str = None,
@ -112,10 +188,11 @@ def batch_extract_data(
doc_id=doc_id, doc_id=doc_id,
pdf_folder=pdf_folder, pdf_folder=pdf_folder,
output_data_folder=output_child_folder, output_data_folder=output_child_folder,
re_run=re_run re_run=re_run,
) )
result_list.extend(data_from_gpt) result_list.extend(data_from_gpt)
if special_doc_id_list is None or len(special_doc_id_list) == 0:
result_df = pd.DataFrame(result_list) result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True) result_df.reset_index(drop=True, inplace=True)
@ -130,6 +207,84 @@ def batch_extract_data(
result_df.to_excel(writer, index=False, sheet_name="extract_data_info") result_df.to_excel(writer, index=False, sheet_name="extract_data_info")
def batch_start_job(
pdf_folder: str,
doc_data_excel_file: str = None,
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
special_doc_id_list: list = None,
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
):
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_extract_data_list = []
result_mapping_data_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
doc_data_from_gpt, doc_mapping_data_list = mapping_data(
doc_id=doc_id,
pdf_folder=pdf_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_folder=output_mapping_child_folder,
re_run_extract_data=re_run_extract_data,
re_run_mapping_data=re_run_mapping_data,
)
result_extract_data_list.extend(doc_data_from_gpt)
result_mapping_data_list.extend(doc_mapping_data_list)
if special_doc_id_list is None or len(special_doc_id_list) == 0:
result_extract_data_df = pd.DataFrame(result_extract_data_list)
result_extract_data_df.reset_index(drop=True, inplace=True)
result_mappingdata_df = pd.DataFrame(result_mapping_data_list)
result_mappingdata_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving extract data to {output_extract_data_total_folder}")
os.makedirs(output_extract_data_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_extract_data_total_folder,
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data_info"
)
logger.info(f"Saving mapping data to {output_mapping_total_folder}")
os.makedirs(output_mapping_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_mapping_total_folder,
f"mapping_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_mappingdata_df.to_excel(
writer, index=False, sheet_name="mapping_data"
)
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data"
)
def batch_filter_pdf_files( def batch_filter_pdf_files(
pdf_folder: str, pdf_folder: str,
doc_data_excel_file: str = None, doc_data_excel_file: str = None,
@ -336,15 +491,32 @@ if __name__ == "__main__":
# test_auto_generate_instructions() # test_auto_generate_instructions()
output_child_folder = r"/data/emea_ar/output/extract_data/docs/" output_extract_data_child_folder = r"/data/emea_ar/output/extract_data/docs/"
output_total_folder = r"/data/emea_ar/output/extract_data/total/" output_extract_data_total_folder = r"/data/emea_ar/output/extract_data/total/"
re_run = True re_run_extract_data = False
batch_extract_data(pdf_folder, # batch_extract_data(
page_filter_ground_truth_file, # pdf_folder,
output_child_folder, # page_filter_ground_truth_file,
output_total_folder, # output_extract_data_child_folder,
special_doc_id_list, # output_extract_data_total_folder,
re_run) # special_doc_id_list,
# re_run,
# )
# doc_id = "476492237" # doc_id = "476492237"
# extract_data(doc_id, pdf_folder, output_child_folder, re_run) # extract_data(doc_id, pdf_folder, output_extract_data_child_folder, re_run)
special_doc_id_list = []
output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/"
output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/"
re_run_mapping_data = True
batch_start_job(
pdf_folder,
page_filter_ground_truth_file,
output_extract_data_child_folder,
output_mapping_child_folder,
output_extract_data_total_folder,
output_mapping_total_folder,
special_doc_id_list,
re_run_extract_data,
re_run_mapping_data,
)

View File

@ -1,4 +1,5 @@
import re import re
from copy import deepcopy
def add_slash_to_text_as_regex(text: str): def add_slash_to_text_as_regex(text: str):
if text is None or len(text) == 0: if text is None or len(text) == 0:
@ -20,3 +21,213 @@ def clean_text(text: str) -> str:
text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text) text = re.sub(r"\\u[A-Z0-9a-z]{4}", ' ', text)
text = re.sub(r"( ){2,}", ' ', text.strip()) text = re.sub(r"( ){2,}", ' ', text.strip())
return text return text
def get_most_similar_name(text: str, name_list: list):
"""
Get the most similar fund name from fund_name_list by jacard similarity
"""
try:
copy_fund_name_list = deepcopy(name_list)
if text is None or len(text.split()) == 0 or \
copy_fund_name_list is None or len(copy_fund_name_list) == 0:
return None, None
copy_fund_name_list = [replace_abbrevation(copy_fund_name) for copy_fund_name
in copy_fund_name_list]
# get common words in fund_name_list
common_word_list = []
if len(name_list) > 1:
_, common_word_list = remove_common_word(copy_fund_name_list)
text = text.strip()
text = remove_special_characters(text)
text = replace_abbrevation(text)
text_splits = text.split()
if len(text_splits) == 1:
text = split_words_without_space(text)
else:
new_splits = []
for split in text_splits:
if len(split) > 1:
new_splits.extend(split_words_without_space(split).split())
else:
new_splits.append(split)
lower_new_splits = [split.lower() for split in new_splits]
for word in common_word_list:
if word not in lower_new_splits:
# remove word in fund_name_list
for i in range(len(copy_fund_name_list)):
temp_splits = copy_fund_name_list[i].split()
for temp in temp_splits:
if remove_special_characters(temp).lower() == word:
copy_fund_name_list[i] = re.sub(r'\s+', ' ',
copy_fund_name_list[i].replace(temp, ' '))
for i in range(len(copy_fund_name_list)):
temp_splits = copy_fund_name_list[i].split()
for temp in temp_splits:
if remove_special_characters(temp).lower() in ['fund', 'portfolio', 'class', 'share', 'shares']:
copy_fund_name_list[i] = \
re.sub(r'\s+', ' ', copy_fund_name_list[i].replace(temp, ' '))
final_splits = []
for split in new_splits:
if split.lower() not in ['fund', 'portfolio', 'class', 'share', 'shares']:
final_splits.append(split)
text = ' '.join(final_splits)
max_similarity = 0
max_similarity_fund_name = None
for fund_name, copy_fund_name in zip(name_list , copy_fund_name_list):
copy_fund_name = remove_special_characters(copy_fund_name)
copy_fund_name = split_words_without_space(copy_fund_name)
similarity = get_jacard_similarity(text,
copy_fund_name,
need_remove_numeric_characters=False)
if similarity > max_similarity:
max_similarity = similarity
max_similarity_fund_name = fund_name
if max_similarity == 1:
break
if max_similarity < 0.35:
return None, max_similarity
return max_similarity_fund_name, max_similarity
except Exception as e:
print(e)
return None, 0.0
def remove_common_word(text_list: list):
if text_list is None or len(text_list) == 0:
return text_list
new_text_list = []
for text in text_list:
text = text.lower()
text = remove_special_characters(text)
text_splits = text.split()
while 'fund' in text_splits:
text_splits.remove('fund')
while 'portfolio' in text_splits:
text_splits.remove('portfolio')
while 'share' in text_splits:
text_splits.remove('share')
while 'class' in text_splits:
text_splits.remove('class')
text = ' '.join(text_splits)
new_text_list.append(text)
# remove common word in new_text_list, such as 'Blackrock Global Fund' and 'Blackrock Growth Fund', then 'Blackrock', 'Fund' are common words
# the result is ['Global', 'Growth']
common_word_list = []
new_text_splits_list = [text.split() for text in new_text_list]
for i in range(len(new_text_splits_list)):
for j in range(i+1, len(new_text_splits_list)):
if common_word_list is None or len(common_word_list) == 0:
common_word_list = list(
set(new_text_splits_list[i]).intersection(set(new_text_splits_list[j])))
else:
common_word_list = list(
set(common_word_list).intersection(set(new_text_splits_list[j])))
common_word_list = list(set(common_word_list))
for i in range(len(new_text_splits_list)):
for common_word in common_word_list:
if common_word in new_text_splits_list[i]:
new_text_splits_list[i].remove(common_word)
new_text_list = [' '.join(text_splits)
for text_splits in new_text_splits_list]
return new_text_list, common_word_list
def split_words_without_space(text: str):
"""
Split words without space, such as 'BlackrockGlobalFund' will be split to 'Blackrock', 'Global', 'Fund'
"""
if text is None or len(text.strip()) == 0:
return []
text = text.strip()
# splits = text.split()
# if len(splits) > 1:
# return text
# find all words with capital letter + lower letter
regex = r'[A-Z][a-z]+'
word_list = re.findall(regex, text)
if len(word_list) > 0:
for word in word_list:
text = text.replace(word, ' ' + word + ' ')
text = re.sub(r'(\s)+', ' ', text)
return text.strip()
def remove_special_characters(text):
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def remove_numeric_characters(text):
# remove numeric characters
text = re.sub(r'\d+', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def get_jacard_similarity(text_left,
text_right,
need_remove_special_characters=True,
need_remove_numeric_characters=True):
if need_remove_special_characters:
text_left = remove_special_characters(text_left)
text_right = remove_special_characters(text_right)
if need_remove_numeric_characters:
text_left = remove_numeric_characters(text_left)
text_right = remove_numeric_characters(text_right)
text_left = text_left.lower()
text_right = text_right.lower()
text_left = text_left.split()
text_right = text_right.split()
intersection = set(text_left).intersection(set(text_right))
union = set(text_left).union(set(text_right))
if len(union) > 0:
return round(len(intersection) / len(union), 3)
else:
return 0
def replace_abbrevation(text: str):
if text is None or len(text.strip()) == 0:
return text
text = text.strip()
text_splits = text.split()
new_text_splits = []
for split in text_splits:
if split.lower() in ['acc']:
new_text_splits.append('Accumulation')
elif split.lower() in ['inc']:
new_text_splits.append('Income')
elif split.lower() in ['dist']:
new_text_splits.append('Distribution')
elif split.lower() in ['inv']:
new_text_splits.append('Investor')
elif split.lower() in ['inst', 'inst', 'institution']:
new_text_splits.append('Institutional')
elif split.lower() in ['adm']:
new_text_splits.append('Admin')
elif split.lower() in ['adv']:
new_text_splits.append('Advantage')
elif split.lower() in ['hdg', 'hgd', '(h)']:
new_text_splits.append('Hedged')
elif split.lower() in ['cl']:
new_text_splits.append('Class')
elif split.lower() in ['ser']:
new_text_splits.append('Series')
elif split.lower() in ['u.s.']:
new_text_splits.append('US')
elif split.lower() in ['nc']:
new_text_splits.append('no trail')
else:
new_text_splits.append(split)
new_text = ' '.join(new_text_splits)
return new_text