dc-ml-emea-ar/main.py

1230 lines
47 KiB
Python
Raw Normal View History

import os
import json
import pandas as pd
from glob import glob
from tqdm import tqdm
import time
2024-11-08 17:22:35 +00:00
import fitz
import re
from io import BytesIO
from traceback import print_exc
from utils.logger import logger
from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.sql_query_util import query_document_fund_mapping
2024-11-08 17:22:35 +00:00
from utils.pdf_util import PDFUtil
from utils.biz_utils import add_slash_to_text_as_regex
from core.page_filter import FilterPages
from core.data_extraction import DataExtraction
from core.data_mapping import DataMapping
2024-09-03 22:07:53 +00:00
from core.metrics import Metrics
class EMEA_AR_Parsing:
def __init__(
self,
doc_id: str,
pdf_folder: str = r"/data/emea_ar/pdf/",
output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
2024-11-08 17:22:35 +00:00
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
) -> None:
self.doc_id = doc_id
self.pdf_folder = pdf_folder
os.makedirs(self.pdf_folder, exist_ok=True)
self.pdf_file = self.download_pdf()
self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
2024-09-19 21:29:26 +00:00
if extract_way is None or len(extract_way) == 0:
extract_way = "text"
self.extract_way = extract_way
self.output_extract_image_folder = None
if self.extract_way == "image":
self.output_extract_image_folder = (
r"/data/emea_ar/output/extract_data/images/"
)
2024-09-19 21:29:26 +00:00
os.makedirs(self.output_extract_image_folder, exist_ok=True)
if output_extract_data_folder is None or len(output_extract_data_folder) == 0:
output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/"
2024-09-19 21:29:26 +00:00
if not output_extract_data_folder.endswith("/"):
output_extract_data_folder = f"{output_extract_data_folder}/"
if extract_way is not None and len(extract_way) > 0:
output_extract_data_folder = (
f"{output_extract_data_folder}by_{extract_way}/"
)
self.output_extract_data_folder = output_extract_data_folder
os.makedirs(self.output_extract_data_folder, exist_ok=True)
if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0:
output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
2024-09-19 21:29:26 +00:00
if not output_mapping_data_folder.endswith("/"):
output_mapping_data_folder = f"{output_mapping_data_folder}/"
if extract_way is not None and len(extract_way) > 0:
output_mapping_data_folder = (
f"{output_mapping_data_folder}by_{extract_way}/"
)
self.output_mapping_data_folder = output_mapping_data_folder
os.makedirs(self.output_mapping_data_folder, exist_ok=True)
self.filter_pages = FilterPages(
self.doc_id, self.pdf_file, self.document_mapping_info_df
)
self.page_text_dict = self.filter_pages.page_text_dict
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
self.datapoints = self.get_datapoints_from_datapoint_page_info()
2024-11-08 17:22:35 +00:00
if drilldown_folder is None or len(drilldown_folder) == 0:
drilldown_folder = r"/data/emea_ar/output/drilldown/"
os.makedirs(drilldown_folder, exist_ok=True)
self.drilldown_folder = drilldown_folder
def download_pdf(self) -> str:
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
return pdf_file
def get_datapoint_page_info(self) -> tuple:
datapoint_page_info, result_details = self.filter_pages.start_job()
2024-09-03 22:07:53 +00:00
return datapoint_page_info, result_details
def get_datapoints_from_datapoint_page_info(self) -> list:
datapoints = list(self.datapoint_page_info.keys())
if "doc_id" in datapoints:
datapoints.remove("doc_id")
return datapoints
def extract_data(
self,
re_run: bool = False,
) -> list:
2024-11-08 17:22:35 +00:00
found_data = False
if not re_run:
output_data_json_folder = os.path.join(
self.output_extract_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The document: {self.doc_id} has been parsed, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
data_from_gpt = json.load(f)
2024-11-08 17:22:35 +00:00
found_data = True
if not found_data:
try:
data_extraction = DataExtraction(
self.doc_id,
self.pdf_file,
self.output_extract_data_folder,
self.page_text_dict,
self.datapoint_page_info,
self.datapoints,
self.document_mapping_info_df,
extract_way=self.extract_way,
output_image_folder=self.output_extract_image_folder,
)
data_from_gpt = data_extraction.extract_data()
except Exception as e:
logger.error(f"Error: {e}")
data_from_gpt = {"data": []}
# Drilldown data to relevant PDF document
annotation_list = self.drilldown_pdf_document(data_from_gpt)
return data_from_gpt, annotation_list
2024-11-08 17:22:35 +00:00
def drilldown_pdf_document(self, data_from_gpt: list) -> list:
logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}")
pdf_util = PDFUtil(self.pdf_file)
drilldown_data_list = []
2024-11-08 17:22:35 +00:00
for data in data_from_gpt:
doc_id = str(data.get("doc_id", ""))
2024-11-08 17:22:35 +00:00
page_index = data.get("page_index", -1)
if page_index == -1:
continue
extract_data_list = data.get("extract_data", {}).get("data", [])
dp_reported_name_dict = data.get("extract_data", {}).get("dp_reported_name", {})
highlighted_value_list = []
for extract_data in extract_data_list:
for data_point, value in extract_data.items():
if value in highlighted_value_list:
continue
if data_point in ["ter", "ogc", "performance_fee"]:
continue
drilldown_data = {
"doc_id": doc_id,
"page_index": page_index,
"data_point": data_point,
"parent_text_block": None,
"value": value,
"annotation_attribute": {}
}
drilldown_data_list.append(drilldown_data)
2024-11-08 17:22:35 +00:00
highlighted_value_list.append(value)
for data_point, reported_name in dp_reported_name_dict.items():
if reported_name in highlighted_value_list:
continue
data_point = f"{data_point}_reported_name"
drilldown_data = {
"doc_id": doc_id,
"page_index": page_index,
"data_point": data_point,
"parent_text_block": None,
"value": reported_name,
"annotation_attribute": {}
}
drilldown_data_list.append(drilldown_data)
2024-11-08 17:22:35 +00:00
highlighted_value_list.append(reported_name)
drilldown_result = pdf_util.batch_drilldown(drilldown_data_list=drilldown_data_list,
output_pdf_folder=self.drilldown_folder)
annotation_list = []
if len(drilldown_result) > 0:
logger.info(f"Drilldown PDF document for doc_id: {doc_id} successfully")
annotation_list = drilldown_result.get("annotation_list", [])
for annotation in annotation_list:
annotation["doc_id"] = doc_id
if self.drilldown_folder is not None and len(self.drilldown_folder) > 0:
drilldown_data_folder = os.path.join(self.drilldown_folder, "data/")
os.makedirs(drilldown_data_folder, exist_ok=True)
drilldown_file = os.path.join(drilldown_data_folder, f"{doc_id}_drilldown.xlsx")
drilldown_source_df = pd.DataFrame(drilldown_data_list)
annotation_list_df = pd.DataFrame(annotation_list)
# set drilldown_result_df column order as doc_id, pdf_file, page_index,
# data_point, value, matching_val_area, normalized_bbox
annotation_list_df = annotation_list_df[["doc_id", "pdf_file", "page_index",
"data_point", "value", "matching_val_area", "normalized_bbox"]]
logger.info(f"Writing drilldown data to {drilldown_file}")
try:
with pd.ExcelWriter(drilldown_file) as writer:
drilldown_source_df.to_excel(writer, index=False, sheet_name="source_data")
annotation_list_df.to_excel(writer, index=False, sheet_name="drilldown_data")
except Exception as e:
logger.error(f"Error: {e}")
annotation_list = annotation_list_df.to_dict(orient="records")
return annotation_list
2024-11-08 17:22:35 +00:00
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
if not re_run:
output_data_json_folder = os.path.join(
self.output_mapping_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
doc_mapping_data = json.load(f)
return doc_mapping_data
"""
doc_id,
datapoints: list,
raw_document_data_list: list,
document_mapping_info_df: pd.DataFrame,
output_data_folder: str,
"""
data_mapping = DataMapping(
self.doc_id,
self.datapoints,
data_from_gpt,
self.document_mapping_info_df,
self.output_mapping_data_folder,
)
return data_mapping.mapping_raw_data()
def filter_pages(doc_id: str, pdf_folder: str) -> None:
logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(doc_id, pdf_folder)
datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info()
return datapoint_page_info, result_details
def extract_data(
doc_id: str,
pdf_folder: str,
2024-09-19 21:29:26 +00:00
output_data_folder: str,
extract_way: str = "text",
re_run: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
pdf_folder,
2024-09-19 21:29:26 +00:00
output_extract_data_folder=output_data_folder,
extract_way=extract_way,
)
data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run)
return data_from_gpt, annotation_list
def mapping_data(
doc_id: str,
pdf_folder: str,
output_extract_data_folder: str,
output_mapping_folder: str,
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
pdf_folder,
output_extract_data_folder=output_extract_data_folder,
output_mapping_data_folder=output_mapping_folder,
2024-09-19 21:29:26 +00:00
extract_way=extract_way,
)
doc_data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run=re_run_extract_data)
doc_mapping_data = emea_ar_parsing.mapping_data(
data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data
)
return doc_data_from_gpt, annotation_list, doc_mapping_data
def batch_extract_data(
pdf_folder: str,
doc_data_excel_file: str = None,
output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
special_doc_id_list: list = None,
re_run: bool = False,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
data_from_gpt = extract_data(
doc_id=doc_id,
pdf_folder=pdf_folder,
output_data_folder=output_child_folder,
2024-09-19 21:29:26 +00:00
extract_way=extract_way,
re_run=re_run,
)
result_list.extend(data_from_gpt)
if special_doc_id_list is None or len(special_doc_id_list) == 0:
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_total_folder}")
os.makedirs(output_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_total_folder,
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_df.to_excel(writer, index=False, sheet_name="extract_data_info")
def batch_start_job(
pdf_folder: str,
doc_data_excel_file: str = None,
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
special_doc_id_list: list = None,
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
force_save_total_data: bool = False,
2024-10-08 22:16:01 +00:00
calculate_metrics: bool = False,
):
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
2024-10-11 17:16:34 +00:00
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
doc_list.append(doc_id)
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_extract_data_list = []
result_mapping_data_list = []
2024-10-11 17:16:34 +00:00
for doc_id in tqdm(doc_list):
doc_data_from_gpt, annotation_list, doc_mapping_data_list = mapping_data(
doc_id=doc_id,
pdf_folder=pdf_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_folder=output_mapping_child_folder,
2024-09-19 21:29:26 +00:00
extract_way=extract_way,
re_run_extract_data=re_run_extract_data,
re_run_mapping_data=re_run_mapping_data,
)
result_extract_data_list.extend(doc_data_from_gpt)
result_mapping_data_list.extend(doc_mapping_data_list)
if force_save_total_data or (
special_doc_id_list is None or len(special_doc_id_list) == 0
):
result_extract_data_df = pd.DataFrame(result_extract_data_list)
result_extract_data_df.reset_index(drop=True, inplace=True)
result_mappingdata_df = pd.DataFrame(result_mapping_data_list)
result_mappingdata_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving extract data to {output_extract_data_total_folder}")
unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist()
os.makedirs(output_extract_data_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_extract_data_total_folder,
f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data_info"
)
logger.info(f"Saving mapping data to {output_mapping_total_folder}")
unique_doc_ids = result_mappingdata_df["doc_id"].unique().tolist()
os.makedirs(output_mapping_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_mapping_total_folder,
f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx",
)
2024-10-08 22:16:01 +00:00
doc_mapping_data_in_db = only_output_mapping_data_in_db(result_mappingdata_df)
with pd.ExcelWriter(output_file) as writer:
2024-10-08 22:16:01 +00:00
doc_mapping_data_in_db.to_excel(
writer, index=False, sheet_name="data_in_doc_mapping"
)
result_mappingdata_df.to_excel(
2024-10-08 22:16:01 +00:00
writer, index=False, sheet_name="total_mapping_data"
)
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data"
)
2024-10-08 22:16:01 +00:00
if calculate_metrics:
2024-10-09 04:53:55 +00:00
prediction_sheet_name = "total_mapping_data"
2024-10-08 22:16:01 +00:00
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
ground_truth_sheet_name = "mapping_data"
metrics_output_folder = r"/data/emea_ar/output/metrics/"
# logger.info(f"Calculating metrics for data extraction")
# missing_error_list, metrics_list, metrics_file = get_metrics(
# "data_extraction",
# output_file,
# prediction_sheet_name,
# ground_truth_file,
# ground_truth_sheet_name,
# metrics_output_folder,
# )
# logger.info(f"Calculating metrics for investment mapping by actual document mapping")
# missing_error_list, metrics_list, metrics_file = get_metrics(
# "investment_mapping",
# output_file,
# prediction_sheet_name,
# ground_truth_file,
# ground_truth_sheet_name,
# metrics_output_folder,
# )
logger.info(f"Calculating metrics for investment mapping by database document mapping")
missing_error_list, metrics_list, metrics_file = get_metrics(
"document_mapping_in_db",
output_file,
prediction_sheet_name,
ground_truth_file,
ground_truth_sheet_name,
metrics_output_folder,
)
def only_output_mapping_data_in_db(mapping_data: pd.DataFrame) -> None:
doc_id_list = mapping_data["doc_id"].unique().tolist()
data_in_mapping_df_list = []
for doc_id in doc_id_list:
doc_mapping_data = mapping_data[mapping_data["doc_id"] == doc_id]
2024-10-08 22:16:01 +00:00
document_mapping = query_document_fund_mapping(doc_id, rerun=False)
fund_id_list = document_mapping["FundId"].unique().tolist()
sec_id_list = document_mapping["SecId"].unique().tolist()
id_list = fund_id_list + sec_id_list
# filter doc_mapping_data by id_list or empty id
filter_doc_mapping_data = doc_mapping_data[(doc_mapping_data["investment_id"].isin(id_list)) | (doc_mapping_data["investment_id"] == "")]
2024-10-08 22:16:01 +00:00
data_in_mapping_df_list.append(filter_doc_mapping_data)
result_mapping_data_df = pd.concat(data_in_mapping_df_list)
result_mapping_data_df.reset_index(drop=True, inplace=True)
2024-10-08 22:16:01 +00:00
return result_mapping_data_df
2024-09-03 22:07:53 +00:00
def batch_filter_pdf_files(
pdf_folder: str,
doc_data_excel_file: str = None,
output_folder: str = r"/data/emea_ar/output/filter_pages/",
special_doc_id_list: list = None,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
2024-09-03 22:07:53 +00:00
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
2024-09-03 22:07:53 +00:00
result_details = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
2024-09-03 22:07:53 +00:00
if doc_list is not None and doc_id not in doc_list:
continue
doc_datapoint_page_info, doc_result_details = filter_pages(
doc_id=doc_id, pdf_folder=pdf_folder
)
2024-09-03 22:07:53 +00:00
result_list.append(doc_datapoint_page_info)
result_details.extend(doc_result_details)
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
2024-09-03 22:07:53 +00:00
result_details_df = pd.DataFrame(result_details)
result_details_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_folder}")
os.makedirs(output_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_folder,
f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
2024-09-03 22:07:53 +00:00
result_df.to_excel(writer, index=False, sheet_name="dp_page_info")
result_details_df.to_excel(
writer, index=False, sheet_name="dp_page_info_details"
)
2024-09-03 22:07:53 +00:00
if len(special_doc_id_list) == 0:
logger.info(f"Calculating metrics for {output_file}")
metrics_output_folder = r"/data/emea_ar/output/metrics/"
missing_error_list, metrics_list, metrics_file = get_metrics(
data_type="page_filter",
prediction_file=output_file,
prediction_sheet_name="dp_page_info",
ground_truth_file=doc_data_excel_file,
output_folder=metrics_output_folder,
)
return missing_error_list, metrics_list, metrics_file
def get_metrics(
data_type: str,
prediction_file: str,
prediction_sheet_name: str,
ground_truth_file: str,
2024-09-19 16:44:17 +00:00
ground_truth_sheet_name: str = None,
output_folder: str = None,
2024-09-03 22:07:53 +00:00
) -> None:
metrics = Metrics(
data_type=data_type,
prediction_file=prediction_file,
prediction_sheet_name=prediction_sheet_name,
ground_truth_file=ground_truth_file,
2024-09-19 16:44:17 +00:00
ground_truth_sheet_name=ground_truth_sheet_name,
output_folder=output_folder,
2024-09-03 22:07:53 +00:00
)
2024-10-07 15:34:13 +00:00
missing_error_list, metrics_list, metrics_file = metrics.get_metrics(strict_model=False)
2024-09-03 22:07:53 +00:00
return missing_error_list, metrics_list, metrics_file
def test_auto_generate_instructions():
"""
doc_id: str,
pdf_file: str,
page_text_dict: dict,
datapoint_page_info: dict,
document_mapping_info_df: pd.DataFrame
"""
doc_id = "402397014"
pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf"
document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df)
page_text_dict = filter_pages.page_text_dict
datapoint_page_info, datapoint_page_info_details = filter_pages.start_job()
datapoint_list = list(datapoint_page_info.keys())
datapoint_list.remove("doc_id")
data_extraction = DataExtraction(
doc_id, pdf_file, page_text_dict, datapoint_page_info, document_mapping_info_df
)
page_index_list = list(page_text_dict.keys())
if len(page_index_list) > 0:
page_text = ""
for datapoint in datapoint_list:
if len(datapoint_page_info[datapoint]) > 0:
page_index_list = datapoint_page_info[datapoint]
page_text = page_text_dict[page_index_list[0]]
break
output_folder = (
r"/data/emea_ar/basic_information/prompts_example/generate_by_config/"
)
os.makedirs(output_folder, exist_ok=True)
tor_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["tor"]
)
with open(
os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(tor_instructions_text)
ter_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter"]
)
with open(
os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ter_instructions_text)
ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ogc"]
)
with open(
os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ogc_instructions_text)
performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["performance_fee"]
)
)
with open(
os.path.join(output_folder, "performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(performance_fee_instructions_text)
ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "ogc"]
)
with open(
os.path.join(output_folder, "ter_ogc_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_ogc_instructions_text)
ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_performance_fee_instructions_text)
ogc_ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ogc", "ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ogc_ter_performance_fee_instructions_text)
2024-09-19 16:44:17 +00:00
def test_data_extraction_metrics():
data_type = "data_extraction"
# prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx"
prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_text_20240922152517.xlsx"
# prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx"
2024-09-19 16:44:17 +00:00
prediction_sheet_name = "mapping_data"
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
2024-09-19 16:44:17 +00:00
ground_truth_sheet_name = "mapping_data"
metrics_output_folder = r"/data/emea_ar/output/metrics/"
missing_error_list, metrics_list, metrics_file = get_metrics(
data_type,
prediction_file,
2024-09-19 16:44:17 +00:00
prediction_sheet_name,
ground_truth_file,
ground_truth_sheet_name,
metrics_output_folder,
)
def test_mapping_raw_name():
doc_id = "337293427"
# KBC Bonds Inflation-Linked Bonds Distribution Shares
# KBC Bonds Inflation-Linked Bonds Institutional B Shares
raw_name = "KBC Bonds Inflation-Linked Bonds Institutional B Shares"
raw_share_name = "Institutional B Shares"
output_folder = r"/data/emea_ar/output/mapping_data/docs/by_text/"
data_mapping = DataMapping(
doc_id,
datapoints=None,
raw_document_data_list=None,
document_mapping_info_df=None,
output_data_folder=output_folder,
)
2024-09-27 21:39:56 +00:00
process_cache = {}
mapping_info = data_mapping.matching_with_database(
raw_name=raw_name,
raw_share_name=raw_share_name,
parent_id="FSGBR051XK",
2024-09-27 21:39:56 +00:00
matching_type="share",
process_cache=process_cache
2024-09-19 16:44:17 +00:00
)
print(mapping_info)
2024-10-28 20:15:55 +00:00
def test_translate_pdf():
from core.data_translate import Translate_PDF
pdf_file = r"/data/emea_ar/pdf/451063582.pdf"
output_folder = r"/data/translate/output/"
translate_pdf = Translate_PDF(pdf_file, output_folder)
translate_pdf.start_job()
def test_replace_abbrevation():
from utils.biz_utils import replace_abbrevation
text_list= ["M&G European Credit Investment Fund A CHFH Acc",
"M&G European Credit Investment Fund A CHFHInc",
"M&G European Credit Investment Fund A USDHAcc",
"M&G European High Yield Credit Investment Fund E GBPHedgedAcc",
"M&G Sustainable European Credit Investment Fd Cl L GBPH Acc",
"M&G Sustainable Total Return Credit Investment Fd AI HGBPInc",
"M&G Total Return Credit Investment Fund Class WI GBPHedgedInc",
"M&G Total Return Credit Investment Fund Class W GBP HedgedInc",
"M&G Total Return Credit Investment Fund Class P CHF H Acc",
"M&G Total Return Credit Investment Fund P EUR Inc"]
for text in text_list:
result = replace_abbrevation(text)
logger.info(f"Original text: {text}, replaced text: {result}")
2024-09-19 16:44:17 +00:00
2024-11-18 22:13:24 +00:00
def test_calculate_metrics():
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx"
mapping_file = r"/data/emea_ar/basic_information/English/sample_doc/emea_doc_with_all_4_dp/doc_ar_data_with_all_4_dp.xlsx"
2024-11-18 22:13:24 +00:00
data_df = pd.read_excel(data_file, sheet_name="data_in_doc_mapping")
data_df = data_df[data_df["check"].isin([0, 1])]
data_df.fillna("", inplace=True)
data_df.reset_index(drop=True, inplace=True)
mapping_df = pd.read_excel(mapping_file, sheet_name="doc_ar_data_in_db")
mapping_fund_id = mapping_df["FundId"].unique().tolist()
mapping_share_id = mapping_df["FundClassId"].unique().tolist()
mapping_id_list = mapping_fund_id + mapping_share_id
# filter data_df whether investment_id in mapping_id_list
filter_data_df = data_df[(data_df["investment_id"].isin(mapping_id_list)) | (data_df["investment_id"] == "")]
# Investment mapping data
mapping_metrics = get_sub_metrics(filter_data_df, "investment_mapping")
logger.info(f"Investment mapping metrics: {mapping_metrics}")
# tor data
tor_data_df = filter_data_df[filter_data_df["datapoint"] == "tor"]
2024-11-18 22:13:24 +00:00
tor_metrics = get_sub_metrics(tor_data_df, "tor")
logger.info(f"TOR metrics: {tor_metrics}")
# ter data
ter_data_df = filter_data_df[filter_data_df["datapoint"] == "ter"]
2024-11-18 22:13:24 +00:00
ter_metrics = get_sub_metrics(ter_data_df, "ter")
logger.info(f"TER metrics: {ter_metrics}")
# ogc data
ogc_data_df = filter_data_df[filter_data_df["datapoint"] == "ogc"]
2024-11-18 22:13:24 +00:00
ogc_metrics = get_sub_metrics(ogc_data_df, "ogc")
logger.info(f"OGC metrics: {ogc_metrics}")
# performance_fee data
performance_fee_data_df = filter_data_df[filter_data_df["datapoint"] == "performance_fee"]
2024-11-18 22:13:24 +00:00
performance_fee_metrics = get_sub_metrics(performance_fee_data_df, "performance_fee")
logger.info(f"Performance fee metrics: {performance_fee_metrics}")
metrics_df = pd.DataFrame([mapping_metrics, tor_metrics, ter_metrics, ogc_metrics, performance_fee_metrics])
metrics_df.reset_index(drop=True, inplace=True)
output_folder = r"/data/emea_ar/ground_truth/data_extraction/verify/"
output_metrics_file = os.path.join(output_folder,
r"mapping_data_info_30_documents_all_4_datapoints_roughly_metrics.xlsx")
with pd.ExcelWriter(output_metrics_file) as writer:
metrics_df.to_excel(writer, index=False, sheet_name="metrics")
def get_sub_metrics(data_df: pd.DataFrame, data_point: str) -> dict:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
gt_list = [1] * len(data_df)
pre_list = data_df["check"].tolist()
# convert pre_list member to be integer
pre_list = [int(pre) for pre in pre_list]
for index, row in data_df.iterrows():
if row["check"] == 0 and len(row["investment_id"].strip()) > 0:
pre_list.append(1)
gt_list.append(0)
2024-11-18 22:13:24 +00:00
# calculate metrics
accuracy = accuracy_score(gt_list, pre_list)
precision = precision_score(gt_list, pre_list)
recall = recall_score(gt_list, pre_list)
f1 = f1_score(gt_list, pre_list)
support = len(data_df)
metrics = {
"DataPoint": data_point,
"F1": f1,
"Precision": precision,
"Recall": recall,
"Accuracy": accuracy,
"Support": support
}
return metrics
def replace_rerun_data(new_data_file: str, original_data_file: str):
data_in_doc_mapping_sheet = "data_in_doc_mapping"
total_mapping_data_sheet = "total_mapping_data"
extract_data_sheet = "extract_data"
new_data_in_doc_mapping = pd.read_excel(new_data_file, sheet_name=data_in_doc_mapping_sheet)
new_total_mapping_data = pd.read_excel(new_data_file, sheet_name=total_mapping_data_sheet)
new_extract_data = pd.read_excel(new_data_file, sheet_name=extract_data_sheet)
document_list = new_data_in_doc_mapping["doc_id"].unique().tolist()
original_data_in_doc_mapping = pd.read_excel(original_data_file, sheet_name=data_in_doc_mapping_sheet)
original_total_mapping_data = pd.read_excel(original_data_file, sheet_name=total_mapping_data_sheet)
original_extract_data = pd.read_excel(original_data_file, sheet_name=extract_data_sheet)
# remove data in original data by document_list
original_data_in_doc_mapping = original_data_in_doc_mapping[~original_data_in_doc_mapping["doc_id"].isin(document_list)]
original_total_mapping_data = original_total_mapping_data[~original_total_mapping_data["doc_id"].isin(document_list)]
original_extract_data = original_extract_data[~original_extract_data["doc_id"].isin(document_list)]
# merge new data to original data
new_data_in_doc_mapping = pd.concat([original_data_in_doc_mapping, new_data_in_doc_mapping])
new_data_in_doc_mapping.reset_index(drop=True, inplace=True)
new_total_mapping_data = pd.concat([original_total_mapping_data, new_total_mapping_data])
new_total_mapping_data.reset_index(drop=True, inplace=True)
new_extract_data = pd.concat([original_extract_data, new_extract_data])
new_extract_data.reset_index(drop=True, inplace=True)
with pd.ExcelWriter(original_data_file) as writer:
new_data_in_doc_mapping.to_excel(writer, index=False, sheet_name=data_in_doc_mapping_sheet)
new_total_mapping_data.to_excel(writer, index=False, sheet_name=total_mapping_data_sheet)
new_extract_data.to_excel(writer, index=False, sheet_name=extract_data_sheet)
def batch_run_documents():
# special_doc_id_list = ["505174428", "510326848", "349679479"]
2024-10-08 22:16:01 +00:00
# check_mapping_doc_id_list = [
# "327956364",
# "391456740",
# "391736837",
# "458359181",
# "486383912",
# "497497599",
# "529925114",
# "321733631",
# "334718372",
# "344636875",
# "362246081",
# "445256897",
# "449623976",
# "458291624",
# "478585901",
# "492121213",
# "502821436",
# "507967525",
# "481475385",
# "508854243",
# "520879048",
# "402181770",
# "463081566",
# "502693599",
# "509845549",
# "389171486",
# "323390570",
# "366179419",
# "486378555",
# "506559375",
# "479793787",
# "471641628",
# ]
2024-10-11 17:16:34 +00:00
# English documents with ground truth
2024-10-09 04:53:55 +00:00
# check_db_mapping_doc_id_list = [
# "292989214",
# "316237292",
# "321733631",
# "323390570",
# "327956364",
# "332223498",
# "333207452",
# "334718372",
# "344636875",
# "362246081",
# "366179419",
# "380945052",
# "382366116",
# "387202452",
# "389171486",
# "391456740",
# "391736837",
# "394778487",
# "401684600",
# "402113224",
# "402181770",
# "402397014",
# "405803396",
# "445102363",
# "445256897",
# "448265376",
# "449555622",
# "449623976",
# "458291624",
# "458359181",
# "463081566",
# "469138353",
# "471641628",
# "476492237",
# "478585901",
# "478586066",
# "479042264",
# "479042269",
# "479793787",
# "481475385",
# "483617247",
# "486378555",
# "486383912",
# "492121213",
# "497497599",
# "502693599"
2024-10-09 04:53:55 +00:00
# ]
2024-10-11 17:16:34 +00:00
# Documents in EMEA Case 1.docx
# check_db_mapping_doc_id_list = [
# "424976833",
# "425480144",
# "427637151",
# "429564034",
# "429950833",
# "430240853",
# "431073795",
# "434710819",
# "434851173",
# "434902020",
# "434924914",
# "435128656",
# "440029306",
# "466371135",
# "466528487",
# "466859621",
# "466860852",
# "467595142",
# "467788879",
# "470515549"
# ]
2024-10-21 16:04:53 +00:00
# documents in New EMEA Documents sample.xlsx as typical documents
2024-10-23 21:07:54 +00:00
# """
# Below 9 documents can't get data by keywords or ChatGPT
# 526747539,
# 534112077,
# 535798742,
# 536299372,
# 539566148,
# 541343431,
# 541923319,
# 543243585,
# 543243654
# """
2024-10-28 20:15:55 +00:00
# check_db_mapping_doc_id_list = [
# "511052670",
# "520733219",
# "524306810",
# "526747539",
# "528783089",
# "532422720",
# "532438210",
# "534112077",
# "534538571",
# "534538682",
# "535798742",
# "536299372",
# "539566148",
# "539604165",
# "540056900",
# "541343431",
# "541669780",
# "541669996",
# "541670397",
# "541923319",
# "542335994",
# "543243585",
# "543243654",
# "543244170",
# "543519140",
# "543519615",
# "543628379",
# "543809340",
# "543944737"
# ]
# documents in Final list of EMEA documents.xlsx as typical documents
# check_db_mapping_doc_id_list = [
# "532500349",
# "535324239",
# "532442891",
# "543243650",
# "528588598",
# "532437639",
# "527525440",
# "534987291",
# "534112055",
# "533482585",
# "544208174",
# "534547266",
# "544713166",
# "526463547",
# "534535569",
# "534106067",
# "532486560",
# "532781760",
# "533727067",
# "527256381",
# "533392425",
# "532179676",
# "534300608",
# "539233950",
# # "533727908",
# "532438414",
# "533681744",
# "537654645",
# "533594905",
# "537926443",
# "533499655",
# "533862814",
# "544918611",
# "539087870",
# "536343790"
# ]
2024-11-06 22:39:42 +00:00
# document samples 2024-11-06
# check_db_mapping_doc_id_list = ["546483469",
# "546375582",
# "546375575",
# "546375576",
# "546375577",
# "546375568",
# "546371033",
# "546632761",
# "546632544",
# "546632464",
# "546724583",
# "546724552",
# "546694677",
# "546660422",
# "546638908",
# "546632845",
# "546105299",
# "546085481",
# "546078693",
# "546078650",
# "546289930",
# "546289910",
# "542967371",
# "542798238",
# "546048730",
# "546048143",
# "546047619",
# "546047528",
# "546046730",
# "546919329"]
# document samples: 30 documents, all with 4 data points
# check_db_mapping_doc_id_list = ["479742284",
# "501380497",
# "501380553",
# "501380775",
# "501380801",
# "501600428",
# "501600429",
# "501600541",
# "501600549",
# "503659548",
# "506326520",
# "507720522",
# "507928179",
# "508981020",
# "509133771",
# "509743502",
# "514636951",
# "514636952",
# "514636953",
# "514636954",
# "514636955",
# "514636957",
# "514636958",
# "514636959",
# "514636985",
# "514636988",
# "514636990",
# "514636993",
# "514636994",
# "539794746",
# ]
# Sample documents with special cases
check_db_mapping_doc_id_list = [
"334584772",
"406913630",
"407275419",
"337937633",
"337293427",
"404712928",
"451063582",
"451878128",
"425595958",
"536344026",
"532422548",
"423418540",
"423418395",
"532998065",
"540307575",
"423395975",
"508704368",
"481482392",
"466580448",
"423365707",
"423364758",
"422761666",
"422760156",
"422760148",
"422686965",
"492029971",
"510300817",
"512745032",
"514213638",
"527525440",
"534535767"
2024-11-06 22:39:42 +00:00
]
special_doc_id_list = check_db_mapping_doc_id_list
2024-12-06 20:50:34 +00:00
# special_doc_id_list = ["407275419", "425595958", "451063582", "451878128"]
pdf_folder = r"/data/emea_ar/pdf/"
page_filter_ground_truth_file = (
r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx"
)
output_extract_data_child_folder = r"/data/emea_ar/output/extract_data/docs/"
output_extract_data_total_folder = r"/data/emea_ar/output/extract_data/total/"
output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/"
output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/"
2024-12-06 20:50:34 +00:00
re_run_extract_data = False
re_run_mapping_data = False
force_save_total_data = True
2024-10-11 17:16:34 +00:00
calculate_metrics = False
extract_ways = ["text"]
2024-11-22 20:54:52 +00:00
for extract_way in extract_ways:
batch_start_job(
pdf_folder,
page_filter_ground_truth_file,
output_extract_data_child_folder,
output_mapping_child_folder,
output_extract_data_total_folder,
output_mapping_total_folder,
extract_way,
special_doc_id_list,
re_run_extract_data,
re_run_mapping_data,
force_save_total_data=force_save_total_data,
calculate_metrics=calculate_metrics,
)
if __name__ == "__main__":
batch_run_documents()
# new_data_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_15_documents_by_text_20241121154243.xlsx"
# original_data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx"
# replace_rerun_data(new_data_file, original_data_file)
# test_calculate_metrics()
# test_replace_abbrevation()
# test_translate_pdf()
# test_mapping_raw_name()
# test_data_extraction_metrics()
# batch_filter_pdf_files(
# pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list
# )
# data_type = "page_filter"
# prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx"
# missing_error_list, metrics_list, metrics_file = get_metrics(
# data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder
# )
# test_auto_generate_instructions()
# batch_extract_data(
# pdf_folder,
# page_filter_ground_truth_file,
# output_extract_data_child_folder,
# output_extract_data_total_folder,
# special_doc_id_list,
# re_run,
# )
# doc_id = "476492237"
# extract_way = "image"
# extract_data(doc_id,
# pdf_folder,
# output_extract_data_child_folder,
# extract_way,
# re_run_extract_data)