dc-ml-emea-ar/main.py

1620 lines
68 KiB
Python
Raw Normal View History

import os
import json
import numpy as np
import pandas as pd
from glob import glob
from tqdm import tqdm
import time
2024-11-08 17:22:35 +00:00
import fitz
import re
from io import BytesIO
from traceback import print_exc
from utils.logger import logger
from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.sql_query_util import query_document_fund_mapping
2024-11-08 17:22:35 +00:00
from utils.pdf_util import PDFUtil
from utils.biz_utils import add_slash_to_text_as_regex
from core.page_filter import FilterPages
from core.data_extraction import DataExtraction
from core.data_mapping import DataMapping
from core.auz_nz.hybrid_solution_script import api_for_fund_matching_call
2024-09-03 22:07:53 +00:00
from core.metrics import Metrics
import certifi
class EMEA_AR_Parsing:
def __init__(
self,
doc_id: str,
doc_source: str = "emea_ar",
pdf_folder: str = r"/data/emea_ar/pdf/",
2025-01-14 22:21:48 +00:00
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
2024-11-08 17:22:35 +00:00
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
2025-01-27 18:32:36 +00:00
compare_with_provider: bool = True
) -> None:
self.doc_id = doc_id
self.doc_source = doc_source
self.pdf_folder = pdf_folder
os.makedirs(self.pdf_folder, exist_ok=True)
2025-01-27 18:32:36 +00:00
self.compare_with_provider = compare_with_provider
self.pdf_file = self.download_pdf()
self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
2024-09-19 21:29:26 +00:00
if extract_way is None or len(extract_way) == 0:
extract_way = "text"
self.extract_way = extract_way
self.output_extract_image_folder = None
if self.extract_way == "image":
self.output_extract_image_folder = (
r"/data/emea_ar/output/extract_data/images/"
)
2024-09-19 21:29:26 +00:00
os.makedirs(self.output_extract_image_folder, exist_ok=True)
if output_extract_data_folder is None or len(output_extract_data_folder) == 0:
output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/"
2024-09-19 21:29:26 +00:00
if not output_extract_data_folder.endswith("/"):
output_extract_data_folder = f"{output_extract_data_folder}/"
if extract_way is not None and len(extract_way) > 0:
output_extract_data_folder = (
f"{output_extract_data_folder}by_{extract_way}/"
)
self.output_extract_data_folder = output_extract_data_folder
os.makedirs(self.output_extract_data_folder, exist_ok=True)
if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0:
output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
2024-09-19 21:29:26 +00:00
if not output_mapping_data_folder.endswith("/"):
output_mapping_data_folder = f"{output_mapping_data_folder}/"
if extract_way is not None and len(extract_way) > 0:
output_mapping_data_folder = (
f"{output_mapping_data_folder}by_{extract_way}/"
)
self.output_mapping_data_folder = output_mapping_data_folder
os.makedirs(self.output_mapping_data_folder, exist_ok=True)
self.filter_pages = FilterPages(
2025-01-27 18:32:36 +00:00
self.doc_id,
self.pdf_file,
self.document_mapping_info_df,
self.doc_source,
2025-01-27 18:32:36 +00:00
output_pdf_text_folder,
)
self.page_text_dict = self.filter_pages.page_text_dict
2025-01-16 17:30:44 +00:00
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
self.datapoints = self.get_datapoints_from_datapoint_page_info()
2025-01-16 17:30:44 +00:00
2024-11-08 17:22:35 +00:00
if drilldown_folder is None or len(drilldown_folder) == 0:
drilldown_folder = r"/data/emea_ar/output/drilldown/"
os.makedirs(drilldown_folder, exist_ok=True)
self.drilldown_folder = drilldown_folder
2025-01-27 18:32:36 +00:00
misc_config_file = os.path.join(
f"./configuration/{doc_source}/", "misc_config.json"
)
2025-01-16 19:54:45 +00:00
if os.path.exists(misc_config_file):
with open(misc_config_file, "r", encoding="utf-8") as f:
misc_config = json.load(f)
self.apply_drilldown = misc_config.get("apply_drilldown", False)
else:
self.apply_drilldown = False
2025-01-16 17:30:44 +00:00
def download_pdf(self) -> str:
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
return pdf_file
def get_datapoint_page_info(self) -> tuple:
datapoint_page_info, result_details = self.filter_pages.start_job()
2024-09-03 22:07:53 +00:00
return datapoint_page_info, result_details
def get_datapoints_from_datapoint_page_info(self) -> list:
datapoints = list(self.datapoint_page_info.keys())
if "doc_id" in datapoints:
datapoints.remove("doc_id")
return datapoints
def extract_data(
self,
re_run: bool = False,
) -> list:
2024-11-08 17:22:35 +00:00
found_data = False
if not re_run:
output_data_json_folder = os.path.join(
self.output_extract_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The document: {self.doc_id} has been parsed, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
data_from_gpt = json.load(f)
2024-11-08 17:22:35 +00:00
found_data = True
2025-01-16 17:30:44 +00:00
2024-11-08 17:22:35 +00:00
if not found_data:
try:
data_extraction = DataExtraction(
self.doc_source,
2024-11-08 17:22:35 +00:00
self.doc_id,
self.pdf_file,
self.output_extract_data_folder,
self.page_text_dict,
self.datapoint_page_info,
self.datapoints,
self.document_mapping_info_df,
extract_way=self.extract_way,
output_image_folder=self.output_extract_image_folder,
)
data_from_gpt = data_extraction.extract_data()
except Exception as e:
logger.error(f"Error: {e}")
print_exc()
2024-11-08 17:22:35 +00:00
data_from_gpt = {"data": []}
2025-01-16 17:30:44 +00:00
2024-11-08 17:22:35 +00:00
# Drilldown data to relevant PDF document
annotation_list = []
2025-01-16 19:54:45 +00:00
if self.apply_drilldown:
try:
annotation_list = self.drilldown_pdf_document(data_from_gpt)
except Exception as e:
logger.error(f"Error: {e}")
return data_from_gpt, annotation_list
2025-01-16 17:30:44 +00:00
2024-11-08 17:22:35 +00:00
def drilldown_pdf_document(self, data_from_gpt: list) -> list:
logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}")
pdf_util = PDFUtil(self.pdf_file)
drilldown_data_list = []
2024-11-08 17:22:35 +00:00
for data in data_from_gpt:
doc_id = str(data.get("doc_id", ""))
2024-11-08 17:22:35 +00:00
page_index = data.get("page_index", -1)
if page_index == -1:
continue
extract_data_list = data.get("extract_data", {}).get("data", [])
2025-01-16 17:30:44 +00:00
dp_reported_name_dict = data.get("extract_data", {}).get(
"dp_reported_name", {}
)
2024-11-08 17:22:35 +00:00
highlighted_value_list = []
for extract_data in extract_data_list:
for data_point, value in extract_data.items():
if value in highlighted_value_list:
continue
if data_point in ["ter", "ogc", "performance_fee"]:
continue
drilldown_data = {
"doc_id": doc_id,
"page_index": page_index,
"data_point": data_point,
"parent_text_block": None,
"value": value,
2025-01-16 17:30:44 +00:00
"annotation_attribute": {},
}
drilldown_data_list.append(drilldown_data)
2024-11-08 17:22:35 +00:00
highlighted_value_list.append(value)
2025-01-16 17:30:44 +00:00
2024-11-08 17:22:35 +00:00
for data_point, reported_name in dp_reported_name_dict.items():
if reported_name in highlighted_value_list:
continue
data_point = f"{data_point}_reported_name"
drilldown_data = {
2025-01-16 17:30:44 +00:00
"doc_id": doc_id,
"page_index": page_index,
"data_point": data_point,
"parent_text_block": None,
"value": reported_name,
"annotation_attribute": {},
}
drilldown_data_list.append(drilldown_data)
2024-11-08 17:22:35 +00:00
highlighted_value_list.append(reported_name)
2025-01-16 17:30:44 +00:00
drilldown_result = pdf_util.batch_drilldown(
drilldown_data_list=drilldown_data_list,
output_pdf_folder=self.drilldown_folder,
)
annotation_list = []
if len(drilldown_result) > 0:
logger.info(f"Drilldown PDF document for doc_id: {doc_id} successfully")
annotation_list = drilldown_result.get("annotation_list", [])
for annotation in annotation_list:
annotation["doc_id"] = doc_id
if self.drilldown_folder is not None and len(self.drilldown_folder) > 0:
drilldown_data_folder = os.path.join(self.drilldown_folder, "data/")
os.makedirs(drilldown_data_folder, exist_ok=True)
2025-01-16 17:30:44 +00:00
drilldown_file = os.path.join(
drilldown_data_folder, f"{doc_id}_drilldown.xlsx"
)
drilldown_source_df = pd.DataFrame(drilldown_data_list)
annotation_list_df = pd.DataFrame(annotation_list)
2025-01-16 17:30:44 +00:00
# set drilldown_result_df column order as doc_id, pdf_file, page_index,
# data_point, value, matching_val_area, normalized_bbox
try:
2025-01-16 17:30:44 +00:00
annotation_list_df = annotation_list_df[
[
"doc_id",
"pdf_file",
"page_index",
"data_point",
"value",
"matching_val_area",
"normalized_bbox",
]
]
except Exception as e:
logger.error(f"Error: {e}")
logger.info(f"Writing drilldown data to {drilldown_file}")
try:
with pd.ExcelWriter(drilldown_file) as writer:
2025-01-16 17:30:44 +00:00
drilldown_source_df.to_excel(
writer, index=False, sheet_name="source_data"
)
annotation_list_df.to_excel(
writer, index=False, sheet_name="drilldown_data"
)
except Exception as e:
logger.error(f"Error: {e}")
annotation_list = annotation_list_df.to_dict(orient="records")
try:
drilldown_json_file = os.path.join(
drilldown_data_folder, f"{doc_id}_drilldown.json"
)
with open(drilldown_json_file, "w", encoding="utf-8") as f:
json.dump(annotation_list, f, ensure_ascii=False, indent=4)
except Exception as e:
logger.error(f"Error: {e}")
return annotation_list
2025-01-16 17:30:44 +00:00
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
if not re_run:
output_data_json_folder = os.path.join(
self.output_mapping_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
doc_mapping_data = json.load(f)
if self.doc_source == "aus_prospectus":
output_data_folder_splits = output_data_json_folder.split("output")
if len(output_data_folder_splits) == 2:
merged_data_folder = f'{output_data_folder_splits[0]}output/merged_data/docs/'
os.makedirs(merged_data_folder, exist_ok=True)
merged_data_json_folder = os.path.join(merged_data_folder, "json/")
os.makedirs(merged_data_json_folder, exist_ok=True)
merged_data_excel_folder = os.path.join(merged_data_folder, "excel/")
os.makedirs(merged_data_excel_folder, exist_ok=True)
merged_data_file = os.path.join(merged_data_json_folder, f"merged_{self.doc_id}.json")
if os.path.exists(merged_data_file):
with open(merged_data_file, "r", encoding="utf-8") as f:
merged_data_list = json.load(f)
return merged_data_list
else:
data_mapping = DataMapping(
self.doc_id,
self.datapoints,
data_from_gpt,
self.document_mapping_info_df,
self.output_mapping_data_folder,
self.doc_source,
compare_with_provider=self.compare_with_provider
)
merged_data_list = data_mapping.merge_output_data_aus_prospectus(doc_mapping_data,
merged_data_json_folder,
merged_data_excel_folder)
return merged_data_list
else:
return doc_mapping_data
"""
doc_id,
datapoints: list,
raw_document_data_list: list,
document_mapping_info_df: pd.DataFrame,
output_data_folder: str,
"""
data_mapping = DataMapping(
self.doc_id,
self.datapoints,
data_from_gpt,
self.document_mapping_info_df,
self.output_mapping_data_folder,
2025-01-27 18:32:36 +00:00
self.doc_source,
compare_with_provider=self.compare_with_provider
)
return data_mapping.mapping_raw_data_entrance()
def filter_pages(doc_id: str, pdf_folder: str, doc_source: str) -> None:
logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}")
2025-01-16 17:30:44 +00:00
emea_ar_parsing = EMEA_AR_Parsing(
doc_id, doc_source=doc_source, pdf_folder=pdf_folder
)
datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info()
return datapoint_page_info, result_details
def extract_data(
doc_id: str,
doc_source: str,
pdf_folder: str,
2024-09-19 21:29:26 +00:00
output_data_folder: str,
extract_way: str = "text",
re_run: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
2024-09-19 21:29:26 +00:00
output_extract_data_folder=output_data_folder,
extract_way=extract_way,
)
data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run)
return data_from_gpt, annotation_list
def mapping_data(
doc_id: str,
pdf_folder: str,
2025-01-14 22:21:48 +00:00
output_pdf_text_folder: str,
output_extract_data_folder: str,
output_mapping_folder: str,
doc_source: str = "emea_ar",
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
2025-01-14 22:21:48 +00:00
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_folder,
output_mapping_data_folder=output_mapping_folder,
2024-09-19 21:29:26 +00:00
extract_way=extract_way,
drilldown_folder=drilldown_folder,
2025-01-27 18:32:36 +00:00
compare_with_provider=False
)
2025-01-16 17:30:44 +00:00
doc_data_from_gpt, annotation_list = emea_ar_parsing.extract_data(
re_run=re_run_extract_data
)
doc_mapping_data = emea_ar_parsing.mapping_data(
data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data
)
return doc_data_from_gpt, annotation_list, doc_mapping_data
def batch_extract_data(
pdf_folder: str,
doc_source: str = "emea_ar",
doc_data_excel_file: str = None,
output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
special_doc_id_list: list = None,
re_run: bool = False,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
data_from_gpt = extract_data(
doc_id=doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_data_folder=output_child_folder,
2024-09-19 21:29:26 +00:00
extract_way=extract_way,
re_run=re_run,
)
result_list.extend(data_from_gpt)
if special_doc_id_list is None or len(special_doc_id_list) == 0:
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_total_folder}")
os.makedirs(output_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_total_folder,
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_df.to_excel(writer, index=False, sheet_name="extract_data_info")
def batch_start_job(
doc_source: str = "emea_ar",
pdf_folder: str = "/data/emea_ar/pdf/",
2025-01-14 22:21:48 +00:00
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
doc_data_excel_file: str = None,
document_mapping_file: str = None,
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
2024-09-19 21:29:26 +00:00
extract_way: str = "text",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
special_doc_id_list: list = None,
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
force_save_total_data: bool = False,
2024-10-08 22:16:01 +00:00
calculate_metrics: bool = False,
2025-01-16 17:30:44 +00:00
total_data_prefix: str = None,
):
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
2024-10-11 17:16:34 +00:00
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
doc_list.append(doc_id)
2025-01-16 17:30:44 +00:00
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_extract_data_list = []
result_mapping_data_list = []
2024-10-11 17:16:34 +00:00
for doc_id in tqdm(doc_list):
try:
doc_data_from_gpt, annotation_list, doc_mapping_data_list = mapping_data(
doc_id=doc_id,
pdf_folder=pdf_folder,
2025-01-14 22:21:48 +00:00
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_folder=output_mapping_child_folder,
doc_source=doc_source,
extract_way=extract_way,
drilldown_folder=drilldown_folder,
re_run_extract_data=re_run_extract_data,
re_run_mapping_data=re_run_mapping_data,
)
result_extract_data_list.extend(doc_data_from_gpt)
result_mapping_data_list.extend(doc_mapping_data_list)
except Exception as e:
logger.error(f"Document: {doc_id} met error: {e}")
print_exc()
if force_save_total_data or (
special_doc_id_list is None or len(special_doc_id_list) == 0
):
result_extract_data_df = pd.DataFrame(result_extract_data_list)
result_extract_data_df.reset_index(drop=True, inplace=True)
result_mappingdata_df = pd.DataFrame(result_mapping_data_list)
result_mappingdata_df.reset_index(drop=True, inplace=True)
2025-01-16 17:30:44 +00:00
logger.info(f"Saving extract data to {output_extract_data_total_folder}")
unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist()
os.makedirs(output_extract_data_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
file_name = f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx"
if total_data_prefix is not None and len(total_data_prefix) > 0:
file_name = f"{total_data_prefix}_{file_name}"
output_file = os.path.join(output_extract_data_total_folder, file_name)
with pd.ExcelWriter(output_file) as writer:
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data_info"
)
logger.info(f"Saving mapping data to {output_mapping_total_folder}")
result_mappingdata_df_columns = list(result_mappingdata_df.columns)
doc_id_column = ""
if "doc_id" in result_mappingdata_df_columns:
doc_id_column = "doc_id"
if "DocumentId" in result_mappingdata_df_columns:
doc_id_column = "DocumentId"
if doc_id_column == "":
logger.error(f"Cannot find doc_id column in mapping data")
return
unique_doc_ids = result_mappingdata_df[doc_id_column].unique().tolist()
os.makedirs(output_mapping_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
file_name = f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx"
if total_data_prefix is not None and len(total_data_prefix) > 0:
file_name = f"{total_data_prefix}_{file_name}"
output_file = os.path.join(output_mapping_total_folder, file_name)
2025-01-16 17:30:44 +00:00
# doc_mapping_data_in_db = only_output_mapping_data_in_db(result_mappingdata_df)
with pd.ExcelWriter(output_file) as writer:
# doc_mapping_data_in_db.to_excel(
# writer, index=False, sheet_name="data_in_doc_mapping"
# )
result_mappingdata_df.to_excel(
2024-10-08 22:16:01 +00:00
writer, index=False, sheet_name="total_mapping_data"
)
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data"
)
2025-01-27 18:32:36 +00:00
2024-10-08 22:16:01 +00:00
if calculate_metrics:
2025-01-27 18:32:36 +00:00
prediction_sheet_name = "data_in_doc_mapping"
2024-10-08 22:16:01 +00:00
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
ground_truth_sheet_name = "mapping_data"
metrics_output_folder = r"/data/emea_ar/output/metrics/"
2025-01-16 17:30:44 +00:00
2024-10-08 22:16:01 +00:00
# logger.info(f"Calculating metrics for data extraction")
# missing_error_list, metrics_list, metrics_file = get_metrics(
# "data_extraction",
# output_file,
# prediction_sheet_name,
# ground_truth_file,
# ground_truth_sheet_name,
# metrics_output_folder,
# )
# logger.info(f"Calculating metrics for investment mapping by actual document mapping")
# missing_error_list, metrics_list, metrics_file = get_metrics(
# "investment_mapping",
# output_file,
# prediction_sheet_name,
# ground_truth_file,
# ground_truth_sheet_name,
# metrics_output_folder,
# )
2025-01-16 17:30:44 +00:00
logger.info(
f"Calculating metrics for investment mapping by database document mapping"
)
2024-10-08 22:16:01 +00:00
missing_error_list, metrics_list, metrics_file = get_metrics(
"document_mapping_in_db",
output_file,
prediction_sheet_name,
ground_truth_file,
ground_truth_sheet_name,
metrics_output_folder,
)
def only_output_mapping_data_in_db(mapping_data: pd.DataFrame) -> None:
doc_id_list = mapping_data["doc_id"].unique().tolist()
data_in_mapping_df_list = []
for doc_id in doc_id_list:
doc_mapping_data = mapping_data[mapping_data["doc_id"] == doc_id]
2025-01-16 17:30:44 +00:00
2024-10-08 22:16:01 +00:00
document_mapping = query_document_fund_mapping(doc_id, rerun=False)
fund_id_list = document_mapping["FundId"].unique().tolist()
sec_id_list = document_mapping["SecId"].unique().tolist()
id_list = fund_id_list + sec_id_list
# filter doc_mapping_data by id_list or empty id
2025-01-16 17:30:44 +00:00
filter_doc_mapping_data = doc_mapping_data[
(doc_mapping_data["investment_id"].isin(id_list))
| (doc_mapping_data["investment_id"] == "")
]
2024-10-08 22:16:01 +00:00
data_in_mapping_df_list.append(filter_doc_mapping_data)
result_mapping_data_df = pd.concat(data_in_mapping_df_list)
result_mapping_data_df.reset_index(drop=True, inplace=True)
2025-01-16 17:30:44 +00:00
2024-10-08 22:16:01 +00:00
return result_mapping_data_df
2024-09-03 22:07:53 +00:00
def batch_filter_pdf_files(
pdf_folder: str,
doc_source: str = "emea_ar",
2024-09-03 22:07:53 +00:00
doc_data_excel_file: str = None,
output_folder: str = r"/data/emea_ar/output/filter_pages/",
special_doc_id_list: list = None,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
2024-09-03 22:07:53 +00:00
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
2024-09-03 22:07:53 +00:00
result_details = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
2024-09-03 22:07:53 +00:00
if doc_list is not None and doc_id not in doc_list:
continue
doc_datapoint_page_info, doc_result_details = filter_pages(
doc_id=doc_id, pdf_folder=pdf_folder, doc_source=doc_source
)
2024-09-03 22:07:53 +00:00
result_list.append(doc_datapoint_page_info)
result_details.extend(doc_result_details)
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
2024-09-03 22:07:53 +00:00
result_details_df = pd.DataFrame(result_details)
result_details_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_folder}")
os.makedirs(output_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_folder,
f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
2024-09-03 22:07:53 +00:00
result_df.to_excel(writer, index=False, sheet_name="dp_page_info")
result_details_df.to_excel(
writer, index=False, sheet_name="dp_page_info_details"
)
2024-09-03 22:07:53 +00:00
if len(special_doc_id_list) == 0:
logger.info(f"Calculating metrics for {output_file}")
metrics_output_folder = r"/data/emea_ar/output/metrics/"
missing_error_list, metrics_list, metrics_file = get_metrics(
data_type="page_filter",
prediction_file=output_file,
prediction_sheet_name="dp_page_info",
ground_truth_file=doc_data_excel_file,
output_folder=metrics_output_folder,
)
return missing_error_list, metrics_list, metrics_file
def get_metrics(
data_type: str,
prediction_file: str,
prediction_sheet_name: str,
ground_truth_file: str,
2024-09-19 16:44:17 +00:00
ground_truth_sheet_name: str = None,
output_folder: str = None,
2024-09-03 22:07:53 +00:00
) -> None:
metrics = Metrics(
data_type=data_type,
prediction_file=prediction_file,
prediction_sheet_name=prediction_sheet_name,
ground_truth_file=ground_truth_file,
2024-09-19 16:44:17 +00:00
ground_truth_sheet_name=ground_truth_sheet_name,
output_folder=output_folder,
2024-09-03 22:07:53 +00:00
)
2025-01-16 17:30:44 +00:00
missing_error_list, metrics_list, metrics_file = metrics.get_metrics(
strict_model=False
)
2024-09-03 22:07:53 +00:00
return missing_error_list, metrics_list, metrics_file
def test_auto_generate_instructions():
"""
doc_id: str,
pdf_file: str,
page_text_dict: dict,
datapoint_page_info: dict,
document_mapping_info_df: pd.DataFrame
"""
doc_id = "402397014"
pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf"
document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df)
page_text_dict = filter_pages.page_text_dict
datapoint_page_info, datapoint_page_info_details = filter_pages.start_job()
datapoint_list = list(datapoint_page_info.keys())
datapoint_list.remove("doc_id")
data_extraction = DataExtraction(
2025-01-16 17:30:44 +00:00
"emear_ar",
doc_id,
pdf_file,
page_text_dict,
datapoint_page_info,
document_mapping_info_df,
)
page_index_list = list(page_text_dict.keys())
if len(page_index_list) > 0:
page_text = ""
for datapoint in datapoint_list:
if len(datapoint_page_info[datapoint]) > 0:
page_index_list = datapoint_page_info[datapoint]
page_text = page_text_dict[page_index_list[0]]
break
output_folder = (
r"/data/emea_ar/basic_information/prompts_example/generate_by_config/"
)
os.makedirs(output_folder, exist_ok=True)
tor_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["tor"]
)
with open(
os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(tor_instructions_text)
ter_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter"]
)
with open(
os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ter_instructions_text)
ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ogc"]
)
with open(
os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ogc_instructions_text)
performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["performance_fee"]
)
)
with open(
os.path.join(output_folder, "performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(performance_fee_instructions_text)
ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "ogc"]
)
with open(
os.path.join(output_folder, "ter_ogc_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_ogc_instructions_text)
ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_performance_fee_instructions_text)
ogc_ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ogc", "ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ogc_ter_performance_fee_instructions_text)
2024-09-19 16:44:17 +00:00
def test_data_extraction_metrics():
2025-01-27 18:32:36 +00:00
data_type = "document_mapping_in_db"
# prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx"
2025-01-27 18:32:36 +00:00
prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_51_documents_by_text_20250127104008.xlsx"
# prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx"
2025-01-27 18:32:36 +00:00
prediction_sheet_name = "data_in_doc_mapping"
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
2024-09-19 16:44:17 +00:00
ground_truth_sheet_name = "mapping_data"
metrics_output_folder = r"/data/emea_ar/output/metrics/"
missing_error_list, metrics_list, metrics_file = get_metrics(
data_type,
prediction_file,
2024-09-19 16:44:17 +00:00
prediction_sheet_name,
ground_truth_file,
ground_truth_sheet_name,
metrics_output_folder,
)
def test_mapping_raw_name():
doc_id = "337293427"
# KBC Bonds Inflation-Linked Bonds Distribution Shares
# KBC Bonds Inflation-Linked Bonds Institutional B Shares
raw_name = "KBC Bonds Inflation-Linked Bonds Institutional B Shares"
raw_share_name = "Institutional B Shares"
output_folder = r"/data/emea_ar/output/mapping_data/docs/by_text/"
data_mapping = DataMapping(
doc_id,
datapoints=None,
raw_document_data_list=None,
document_mapping_info_df=None,
output_data_folder=output_folder,
)
2024-09-27 21:39:56 +00:00
process_cache = {}
mapping_info = data_mapping.matching_with_database(
raw_name=raw_name,
raw_share_name=raw_share_name,
2025-01-16 17:30:44 +00:00
parent_id="FSGBR051XK",
2024-09-27 21:39:56 +00:00
matching_type="share",
2025-01-16 17:30:44 +00:00
process_cache=process_cache,
2024-09-19 16:44:17 +00:00
)
print(mapping_info)
2025-01-16 17:30:44 +00:00
2024-10-28 20:15:55 +00:00
def test_translate_pdf():
from core.data_translate import Translate_PDF
2025-01-16 17:30:44 +00:00
2024-10-28 20:15:55 +00:00
pdf_file = r"/data/emea_ar/pdf/451063582.pdf"
output_folder = r"/data/translate/output/"
translate_pdf = Translate_PDF(pdf_file, output_folder)
translate_pdf.start_job()
2025-01-16 17:30:44 +00:00
def test_replace_abbrevation():
from utils.biz_utils import replace_abbrevation
2025-01-16 17:30:44 +00:00
text_list = [
"M&G European Credit Investment Fund A CHFH Acc",
"M&G European Credit Investment Fund A CHFHInc",
"M&G European Credit Investment Fund A USDHAcc",
"M&G European High Yield Credit Investment Fund E GBPHedgedAcc",
"M&G Sustainable European Credit Investment Fd Cl L GBPH Acc",
"M&G Sustainable Total Return Credit Investment Fd AI HGBPInc",
"M&G Total Return Credit Investment Fund Class WI GBPHedgedInc",
"M&G Total Return Credit Investment Fund Class W GBP HedgedInc",
"M&G Total Return Credit Investment Fund Class P CHF H Acc",
"M&G Total Return Credit Investment Fund P EUR Inc",
]
for text in text_list:
result = replace_abbrevation(text)
logger.info(f"Original text: {text}, replaced text: {result}")
2024-09-19 16:44:17 +00:00
2024-11-18 22:13:24 +00:00
def test_calculate_metrics():
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx"
mapping_file = r"/data/emea_ar/basic_information/English/sample_doc/emea_doc_with_all_4_dp/doc_ar_data_with_all_4_dp.xlsx"
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
data_df = pd.read_excel(data_file, sheet_name="data_in_doc_mapping")
data_df = data_df[data_df["check"].isin([0, 1])]
data_df.fillna("", inplace=True)
data_df.reset_index(drop=True, inplace=True)
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
mapping_df = pd.read_excel(mapping_file, sheet_name="doc_ar_data_in_db")
mapping_fund_id = mapping_df["FundId"].unique().tolist()
mapping_share_id = mapping_df["FundClassId"].unique().tolist()
mapping_id_list = mapping_fund_id + mapping_share_id
# filter data_df whether investment_id in mapping_id_list
2025-01-16 17:30:44 +00:00
filter_data_df = data_df[
(data_df["investment_id"].isin(mapping_id_list))
| (data_df["investment_id"] == "")
]
2024-11-18 22:13:24 +00:00
# Investment mapping data
mapping_metrics = get_sub_metrics(filter_data_df, "investment_mapping")
logger.info(f"Investment mapping metrics: {mapping_metrics}")
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
# tor data
tor_data_df = filter_data_df[filter_data_df["datapoint"] == "tor"]
2024-11-18 22:13:24 +00:00
tor_metrics = get_sub_metrics(tor_data_df, "tor")
logger.info(f"TOR metrics: {tor_metrics}")
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
# ter data
ter_data_df = filter_data_df[filter_data_df["datapoint"] == "ter"]
2024-11-18 22:13:24 +00:00
ter_metrics = get_sub_metrics(ter_data_df, "ter")
logger.info(f"TER metrics: {ter_metrics}")
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
# ogc data
ogc_data_df = filter_data_df[filter_data_df["datapoint"] == "ogc"]
2024-11-18 22:13:24 +00:00
ogc_metrics = get_sub_metrics(ogc_data_df, "ogc")
logger.info(f"OGC metrics: {ogc_metrics}")
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
# performance_fee data
2025-01-16 17:30:44 +00:00
performance_fee_data_df = filter_data_df[
filter_data_df["datapoint"] == "performance_fee"
]
performance_fee_metrics = get_sub_metrics(
performance_fee_data_df, "performance_fee"
)
2024-11-18 22:13:24 +00:00
logger.info(f"Performance fee metrics: {performance_fee_metrics}")
2025-01-16 17:30:44 +00:00
metrics_df = pd.DataFrame(
[
mapping_metrics,
tor_metrics,
ter_metrics,
ogc_metrics,
performance_fee_metrics,
]
)
2024-11-18 22:13:24 +00:00
metrics_df.reset_index(drop=True, inplace=True)
output_folder = r"/data/emea_ar/ground_truth/data_extraction/verify/"
2025-01-16 17:30:44 +00:00
output_metrics_file = os.path.join(
output_folder,
r"mapping_data_info_30_documents_all_4_datapoints_roughly_metrics.xlsx",
)
2024-11-18 22:13:24 +00:00
with pd.ExcelWriter(output_metrics_file) as writer:
metrics_df.to_excel(writer, index=False, sheet_name="metrics")
def get_sub_metrics(data_df: pd.DataFrame, data_point: str) -> dict:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
gt_list = [1] * len(data_df)
pre_list = data_df["check"].tolist()
# convert pre_list member to be integer
pre_list = [int(pre) for pre in pre_list]
2025-01-16 17:30:44 +00:00
for index, row in data_df.iterrows():
if row["check"] == 0 and len(row["investment_id"].strip()) > 0:
pre_list.append(1)
gt_list.append(0)
2024-11-18 22:13:24 +00:00
# calculate metrics
accuracy = accuracy_score(gt_list, pre_list)
precision = precision_score(gt_list, pre_list)
recall = recall_score(gt_list, pre_list)
f1 = f1_score(gt_list, pre_list)
support = len(data_df)
2025-01-16 17:30:44 +00:00
2024-11-18 22:13:24 +00:00
metrics = {
2025-01-16 17:30:44 +00:00
"DataPoint": data_point,
"F1": f1,
"Precision": precision,
"Recall": recall,
"Accuracy": accuracy,
"Support": support,
2024-11-18 22:13:24 +00:00
}
return metrics
2025-01-16 17:30:44 +00:00
def replace_rerun_data(new_data_file: str, original_data_file: str):
data_in_doc_mapping_sheet = "data_in_doc_mapping"
total_mapping_data_sheet = "total_mapping_data"
extract_data_sheet = "extract_data"
2025-01-16 17:30:44 +00:00
new_data_in_doc_mapping = pd.read_excel(
new_data_file, sheet_name=data_in_doc_mapping_sheet
)
new_total_mapping_data = pd.read_excel(
new_data_file, sheet_name=total_mapping_data_sheet
)
new_extract_data = pd.read_excel(new_data_file, sheet_name=extract_data_sheet)
2025-01-16 17:30:44 +00:00
document_list = new_data_in_doc_mapping["doc_id"].unique().tolist()
2025-01-16 17:30:44 +00:00
original_data_in_doc_mapping = pd.read_excel(
original_data_file, sheet_name=data_in_doc_mapping_sheet
)
original_total_mapping_data = pd.read_excel(
original_data_file, sheet_name=total_mapping_data_sheet
)
original_extract_data = pd.read_excel(
original_data_file, sheet_name=extract_data_sheet
)
# remove data in original data by document_list
2025-01-16 17:30:44 +00:00
original_data_in_doc_mapping = original_data_in_doc_mapping[
~original_data_in_doc_mapping["doc_id"].isin(document_list)
]
original_total_mapping_data = original_total_mapping_data[
~original_total_mapping_data["doc_id"].isin(document_list)
]
original_extract_data = original_extract_data[
~original_extract_data["doc_id"].isin(document_list)
]
# merge new data to original data
2025-01-16 17:30:44 +00:00
new_data_in_doc_mapping = pd.concat(
[original_data_in_doc_mapping, new_data_in_doc_mapping]
)
new_data_in_doc_mapping.reset_index(drop=True, inplace=True)
2025-01-16 17:30:44 +00:00
new_total_mapping_data = pd.concat(
[original_total_mapping_data, new_total_mapping_data]
)
new_total_mapping_data.reset_index(drop=True, inplace=True)
new_extract_data = pd.concat([original_extract_data, new_extract_data])
new_extract_data.reset_index(drop=True, inplace=True)
2025-01-16 17:30:44 +00:00
with pd.ExcelWriter(original_data_file) as writer:
2025-01-16 17:30:44 +00:00
new_data_in_doc_mapping.to_excel(
writer, index=False, sheet_name=data_in_doc_mapping_sheet
)
new_total_mapping_data.to_excel(
writer, index=False, sheet_name=total_mapping_data_sheet
)
new_extract_data.to_excel(writer, index=False, sheet_name=extract_data_sheet)
2025-01-16 17:30:44 +00:00
def batch_run_documents(
doc_source: str = "emea_ar",
special_doc_id_list: list = None,
pdf_folder: str = r"/data/emea_ar/pdf/",
document_mapping_file: str = None,
2025-01-16 17:30:44 +00:00
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
re_run_extract_data: bool = True,
re_run_mapping_data: bool = True,
force_save_total_data: bool = False
2025-01-16 17:30:44 +00:00
):
sample_document_list_folder = r"./sample_documents/"
2024-12-10 04:24:40 +00:00
document_list_files = glob(sample_document_list_folder + "*.txt")
page_filter_ground_truth_file = (
r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx"
)
2024-10-11 17:16:34 +00:00
calculate_metrics = False
2024-12-10 04:24:40 +00:00
extract_way = "text"
2025-01-06 19:14:20 +00:00
# special_doc_id_list = []
if special_doc_id_list is None or len(special_doc_id_list) == 0:
2024-12-10 04:24:40 +00:00
force_save_total_data = True
file_base_name_candidates = []
2024-12-10 04:24:40 +00:00
for document_list_file in document_list_files:
file_base_name = os.path.basename(document_list_file).replace(".txt", "")
2025-01-16 17:30:44 +00:00
if (
file_base_name_candidates is not None
and len(file_base_name_candidates) > 0
and file_base_name not in file_base_name_candidates
):
2024-12-10 04:24:40 +00:00
continue
with open(document_list_file, "r", encoding="utf-8") as f:
doc_id_list = f.readlines()
doc_id_list = [doc_id.strip() for doc_id in doc_id_list]
batch_start_job(
doc_source,
2024-12-10 04:24:40 +00:00
pdf_folder,
2025-01-14 22:21:48 +00:00
output_pdf_text_folder,
2024-12-10 04:24:40 +00:00
page_filter_ground_truth_file,
document_mapping_file,
2024-12-10 04:24:40 +00:00
output_extract_data_child_folder,
output_mapping_child_folder,
output_extract_data_total_folder,
output_mapping_total_folder,
extract_way,
drilldown_folder,
2024-12-10 04:24:40 +00:00
doc_id_list,
re_run_extract_data,
re_run_mapping_data,
force_save_total_data=force_save_total_data,
calculate_metrics=calculate_metrics,
2025-01-16 17:30:44 +00:00
total_data_prefix=file_base_name,
2024-12-10 04:24:40 +00:00
)
else:
2024-11-22 20:54:52 +00:00
batch_start_job(
doc_source,
2024-11-22 20:54:52 +00:00
pdf_folder,
2025-01-14 22:21:48 +00:00
output_pdf_text_folder,
2024-11-22 20:54:52 +00:00
page_filter_ground_truth_file,
document_mapping_file,
2024-11-22 20:54:52 +00:00
output_extract_data_child_folder,
output_mapping_child_folder,
output_extract_data_total_folder,
output_mapping_total_folder,
extract_way,
drilldown_folder,
2024-11-22 20:54:52 +00:00
special_doc_id_list,
re_run_extract_data,
re_run_mapping_data,
force_save_total_data=force_save_total_data,
calculate_metrics=calculate_metrics,
)
2025-01-16 17:30:44 +00:00
def batch_initial_document(
sample_document_list_folder: str = r"./sample_documents/",
document_list_file: str = "sample_document_complex.txt",
doc_source: str = "emea_ar",
pdf_folder: str = r"/data/emea_ar/pdf/",
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
2025-01-16 17:30:44 +00:00
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
):
document_list_file_path = os.path.join(
sample_document_list_folder, document_list_file
)
with open(document_list_file_path, "r", encoding="utf-8") as f:
doc_id_list = f.readlines()
doc_id_list = [doc_id.strip() for doc_id in doc_id_list]
for doc_id in tqdm(doc_id_list):
logger.info(f"Start to initial document: {doc_id}")
2025-01-16 17:30:44 +00:00
emea_ar_parsing = EMEA_AR_Parsing(
doc_id=doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_pdf_text_folder=output_pdf_text_folder,
2025-01-16 17:30:44 +00:00
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_data_folder=output_mapping_child_folder,
)
2025-01-16 17:30:44 +00:00
def merge_output_data(
data_file_path: str, document_mapping_file: str, output_data_file_path: str
):
data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data")
document_mapping_df = pd.read_excel(document_mapping_file, sheet_name="doc_date")
# set doc_id to be string type
data_df["doc_id"] = data_df["doc_id"].astype(str)
document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str)
"""
doc_id page_index raw_name datapoint value raw_check comment investment_type investment_id investment_name similarity
553242368 344 Deutsche MSCI World Index Fund tor 61 33 FS0000AY1Y Xtrackers MSCI World Index Fund 0.75
553242368 344 db x-trackers EUR Liquid Corporate 12.5 UCITS ETF - Klasse 1C ter 0.35 1 F000018PY1 Xtrackers EUR Corporate Green Bond UCITS ETF 1C 0.462
"""
doc_id_list = data_df["doc_id"].unique().tolist()
data_point_dict = {
"tor": "TurnoverRatio",
"ter": "NetExpenseRatio",
"ogc": "OngoingCharge",
2025-01-16 17:30:44 +00:00
"performance_fee": "PerformanceFee",
}
total_data_list = []
for doc_id in tqdm(doc_id_list):
doc_data_list = []
doc_data_df = data_df[data_df["doc_id"] == doc_id]
2025-01-16 17:30:44 +00:00
doc_date = str(
document_mapping_df[document_mapping_df["DocumentId"] == doc_id][
"EffectiveDate"
].values[0]
)[0:10]
exist_raw_name_list = []
for index, row in doc_data_df.iterrows():
doc_id = str(row["doc_id"])
page_index = int(row["page_index"])
raw_name = str(row["raw_name"])
datapoint = str(row["datapoint"])
value = row["value"]
investment_type = row["investment_type"]
investment_id = row["investment_id"]
investment_name = row["investment_name"]
2025-01-16 17:30:44 +00:00
exist = False
for exist_raw_name_info in exist_raw_name_list:
exist_raw_name = exist_raw_name_info["raw_name"]
exist_investment_type = exist_raw_name_info["investment_type"]
2025-01-16 17:30:44 +00:00
if (
exist_raw_name == raw_name
and exist_investment_type == investment_type
):
exist = True
break
if not exist:
data = {
"DocumentId": doc_id,
"investment_type": investment_type,
"investment_id": investment_id,
"investment_name": investment_name,
"EffectiveDate": doc_date,
"page_index": [],
"RawName": raw_name,
"NetExpenseRatio": "",
"OngoingCharge": "",
"TurnoverRatio": "",
2025-01-16 17:30:44 +00:00
"PerformanceFee": "",
}
2025-01-16 17:30:44 +00:00
exist_raw_name_list.append(
{"raw_name": raw_name, "investment_type": investment_type}
)
doc_data_list.append(data)
# find data from total_data_list by raw_name
for data in doc_data_list:
2025-01-16 17:30:44 +00:00
if (
data["RawName"] == raw_name
and data["investment_type"] == investment_type
):
update_key = data_point_dict[datapoint]
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
break
total_data_list.extend(doc_data_list)
total_data_df = pd.DataFrame(total_data_list)
total_data_df.fillna("", inplace=True)
with pd.ExcelWriter(output_data_file_path) as writer:
total_data_df.to_excel(writer, index=False, sheet_name="total_data")
2025-01-16 00:22:08 +00:00
2025-01-16 17:30:44 +00:00
def merge_output_data_aus_prospectus(
data_file_path: str, document_mapping_file: str, output_data_file_path: str
):
2025-01-16 00:22:08 +00:00
# TODO: merge output data for aus prospectus, plan to realize it on 2025-01-16
data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data")
data_df.fillna("", inplace=True)
2025-01-27 18:32:36 +00:00
document_mapping_df = pd.read_excel(
document_mapping_file, sheet_name="document_mapping"
)
document_mapping_df.fillna("", inplace=True)
2025-01-16 00:22:08 +00:00
# set doc_id to be string type
data_df["doc_id"] = data_df["doc_id"].astype(str)
document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str)
2025-01-16 22:31:04 +00:00
2025-01-16 00:22:08 +00:00
doc_id_list = data_df["doc_id"].unique().tolist()
2025-01-27 18:32:36 +00:00
datapoint_keyword_config_file = (
r"./configuration/aus_prospectus/datapoint_name.json"
)
2025-01-16 22:31:04 +00:00
with open(datapoint_keyword_config_file, "r", encoding="utf-8") as f:
datapoint_keyword_config = json.load(f)
datapoint_name_list = list(datapoint_keyword_config.keys())
2025-01-16 00:22:08 +00:00
total_data_list = []
for doc_id in tqdm(doc_id_list):
doc_data_list = []
2025-01-16 17:30:44 +00:00
doc_date = str(
document_mapping_df[document_mapping_df["DocumentId"] == doc_id][
"EffectiveDate"
].values[0]
)[0:10]
2025-01-27 18:32:36 +00:00
share_doc_data_df = data_df[
(data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 1)
]
2025-01-16 00:22:08 +00:00
exist_raw_name_list = []
2025-01-17 17:41:58 +00:00
for index, row in share_doc_data_df.iterrows():
2025-01-16 00:22:08 +00:00
doc_id = str(row["doc_id"])
page_index = int(row["page_index"])
2025-01-17 17:41:58 +00:00
raw_fund_name = str(row["raw_fund_name"])
raw_share_name = str(row["raw_share_name"])
2025-01-16 00:22:08 +00:00
raw_name = str(row["raw_name"])
datapoint = str(row["datapoint"])
value = row["value"]
investment_type = row["investment_type"]
2025-01-17 17:41:58 +00:00
share_class_id = row["investment_id"]
share_class_legal_name = row["investment_name"]
fund_id = ""
fund_legal_name = ""
if share_class_id != "":
2025-01-27 18:32:36 +00:00
record_row = document_mapping_df[
document_mapping_df["FundClassId"] == share_class_id
]
2025-01-17 17:41:58 +00:00
if len(record_row) > 0:
fund_id = record_row["FundId"].values[0]
fund_legal_name = record_row["FundLegalName"].values[0]
2025-01-16 17:30:44 +00:00
2025-01-16 00:22:08 +00:00
exist = False
for exist_raw_name_info in exist_raw_name_list:
exist_raw_name = exist_raw_name_info["raw_name"]
exist_investment_type = exist_raw_name_info["investment_type"]
2025-02-19 20:32:08 +00:00
exist_investment_id = exist_raw_name_info["investment_id"]
2025-01-16 17:30:44 +00:00
if (
exist_raw_name == raw_name
and exist_investment_type == investment_type
2025-02-19 20:32:08 +00:00
) or (len(exist_investment_id) > 0 and exist_investment_id == share_class_id):
2025-01-16 00:22:08 +00:00
exist = True
break
if not exist:
data = {
"DocumentId": doc_id,
2025-01-17 17:41:58 +00:00
"raw_fund_name": raw_fund_name,
"raw_share_name": raw_share_name,
"raw_name": raw_name,
"fund_id": fund_id,
"fund_name": fund_legal_name,
"sec_id": share_class_id,
"sec_name": share_class_legal_name,
2025-01-16 00:22:08 +00:00
"EffectiveDate": doc_date,
"page_index": [],
"RawName": raw_name,
}
2025-01-16 22:31:04 +00:00
for datapoint_name in datapoint_name_list:
data[datapoint_name] = ""
2025-01-16 17:30:44 +00:00
exist_raw_name_list.append(
2025-02-19 20:32:08 +00:00
{"raw_name": raw_name, "investment_type": investment_type, "investment_id": share_class_id}
2025-01-16 17:30:44 +00:00
)
2025-01-16 00:22:08 +00:00
doc_data_list.append(data)
# find data from total_data_list by raw_name
for data in doc_data_list:
2025-01-27 18:32:36 +00:00
if data["raw_name"] == raw_name:
2025-01-16 22:31:04 +00:00
update_key = datapoint
2025-01-16 00:22:08 +00:00
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
break
2025-02-19 20:32:08 +00:00
if len(share_class_id) > 0 and data["sec_id"] == share_class_id:
update_key = datapoint
if len(str(data[update_key])) == 0:
2025-02-19 20:32:08 +00:00
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
break
2025-01-27 18:32:36 +00:00
fund_doc_data_df = data_df[
(data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 33)
]
fund_doc_data_df.fillna("", inplace=True)
2025-01-17 17:41:58 +00:00
for index, row in fund_doc_data_df.iterrows():
doc_id = str(row["doc_id"])
page_index = int(row["page_index"])
raw_fund_name = str(row["raw_fund_name"])
raw_share_name = ""
raw_name = str(row["raw_name"])
datapoint = str(row["datapoint"])
value = row["value"]
fund_id = row["investment_id"]
fund_legal_name = row["investment_name"]
exist = False
if fund_id != "":
for data in doc_data_list:
2025-01-27 18:32:36 +00:00
if (fund_id != "" and data["fund_id"] == fund_id) or (
data["raw_fund_name"] == raw_fund_name
):
2025-01-17 17:41:58 +00:00
update_key = datapoint
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
exist = True
else:
for data in doc_data_list:
if data["raw_name"] == raw_name:
update_key = datapoint
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
exist = True
2025-01-17 17:41:58 +00:00
if not exist:
data = {
"DocumentId": doc_id,
"raw_fund_name": raw_fund_name,
"raw_share_name": "",
"raw_name": raw_name,
"fund_id": fund_id,
"fund_name": fund_legal_name,
"sec_id": "",
"sec_name": "",
"EffectiveDate": doc_date,
"page_index": [page_index],
"RawName": raw_name,
}
for datapoint_name in datapoint_name_list:
data[datapoint_name] = ""
data[datapoint] = value
doc_data_list.append(data)
2025-01-16 00:22:08 +00:00
total_data_list.extend(doc_data_list)
total_data_df = pd.DataFrame(total_data_list)
total_data_df.fillna("", inplace=True)
with pd.ExcelWriter(output_data_file_path) as writer:
total_data_df.to_excel(writer, index=False, sheet_name="total_data")
2025-01-16 17:30:44 +00:00
def get_aus_prospectus_document_category():
document_sample_file = (
r"./sample_documents/aus_prospectus_17_documents_sample.txt"
)
with open(document_sample_file, "r", encoding="utf-8") as f:
special_doc_id_list = [doc_id.strip() for doc_id in f.readlines()]
pdf_folder: str = r"/data/aus_prospectus/pdf/"
output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
output_extract_data_child_folder: str = (
r"/data/aus_prospectus/output/extract_data/docs/"
)
output_mapping_child_folder: str = (
r"/data/aus_prospectus/output/mapping_data/docs/"
)
drilldown_folder = r"/data/aus_prospectus/output/drilldown/"
doc_source = "aus_prospectus"
extract_way = "text"
document_category_dict = {}
for doc_id in special_doc_id_list:
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_data_folder=output_mapping_child_folder,
extract_way=extract_way,
drilldown_folder=drilldown_folder,
compare_with_provider=False
)
data_extraction = DataExtraction(
doc_source=emea_ar_parsing.doc_source,
doc_id=emea_ar_parsing.doc_id,
pdf_file=emea_ar_parsing.pdf_file,
output_data_folder=emea_ar_parsing.output_extract_data_folder,
page_text_dict=emea_ar_parsing.page_text_dict,
datapoint_page_info=emea_ar_parsing.datapoint_page_info,
datapoints=emea_ar_parsing.datapoints,
document_mapping_info_df=emea_ar_parsing.document_mapping_info_df,
extract_way=extract_way
)
logger.info(f"Document: {doc_id}, \ncategory: {data_extraction.document_category}, \nproduction: {data_extraction.document_production}")
document_category_dict[doc_id] = {"category": data_extraction.document_category, "production": data_extraction.document_production}
output_extract_document_category_folder: str = (
r"/data/aus_prospectus/output/document_category/"
)
os.makedirs(output_extract_document_category_folder, exist_ok=True)
document_sample_file_base_name = os.path.basename(document_sample_file).replace(".txt", "").replace("aus_prospectus_", "")
output_file = os.path.join(output_extract_document_category_folder, f"{document_sample_file_base_name}_category_production.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(document_category_dict, f, ensure_ascii=False, indent=4)
logger.info(f"Document category and production: {document_category_dict}")
def test_post_adjust_extract_data():
doc_id = "397107472"
pdf_folder: str = r"/data/aus_prospectus/pdf/"
output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
output_extract_data_child_folder: str = (
r"/data/aus_prospectus/output/extract_data/docs/"
)
output_mapping_child_folder: str = (
r"/data/aus_prospectus/output/mapping_data/docs/"
)
drilldown_folder = r"/data/aus_prospectus/output/drilldown/"
doc_source = "aus_prospectus"
extract_way = "text"
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_data_folder=output_mapping_child_folder,
extract_way=extract_way,
drilldown_folder=drilldown_folder,
compare_with_provider=False
)
data_extraction = DataExtraction(doc_source=emea_ar_parsing.doc_source,
doc_id=emea_ar_parsing.doc_id,
pdf_file=emea_ar_parsing.pdf_file,
output_data_folder=emea_ar_parsing.output_extract_data_folder,
page_text_dict=emea_ar_parsing.page_text_dict,
datapoint_page_info=emea_ar_parsing.datapoint_page_info,
datapoints=emea_ar_parsing.datapoints,
document_mapping_info_df=emea_ar_parsing.document_mapping_info_df,
extract_way=extract_way)
data_folder = r"/data/aus_prospectus/output/extract_data/docs/by_text/json/"
data_file = f"{doc_id}.json"
data_file_path = os.path.join(data_folder, data_file)
with open(data_file_path, "r", encoding="utf-8") as f:
data_list = json.load(f)
# data_list = data_extraction.remove_duplicate_data(data_list)
# data_list = data_extraction.post_adjust_for_value_with_production_name(data_list)
data_list = data_extraction.post_supplement_data(data_list)
if __name__ == "__main__":
# test_post_adjust_extract_data()
# get_aus_prospectus_document_category()
2025-01-27 18:32:36 +00:00
# test_data_extraction_metrics()
# data_file_path = r"/data/aus_prospectus/output/mapping_data/total/mapping_data_info_1_documents_by_text_20250226155259.xlsx"
# document_mapping_file_path = r"/data/aus_prospectus/basic_information/biz_rule/phase1_document_mapping.xlsx"
# merged_total_data_folder = r'/data/aus_prospectus/output/mapping_data/total/merged/'
# os.makedirs(merged_total_data_folder, exist_ok=True)
# data_file_base_name = os.path.basename(data_file_path)
# output_data_file_path = os.path.join(merged_total_data_folder, "merged_" + data_file_base_name)
# merge_output_data_aus_prospectus(data_file_path, document_mapping_file_path, output_data_file_path)
2025-01-16 17:30:44 +00:00
os.environ["SSL_CERT_FILE"] = certifi.where()
doc_source = "aus_prospectus"
sample_document_list_folder: str = r'./sample_documents/'
document_list_file: str = "aus_prospectus_29_documents_sample.txt"
pdf_folder: str = r"/data/aus_prospectus/pdf/"
output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
output_extract_data_child_folder: str = r"/data/aus_prospectus/output/extract_data/docs/"
output_mapping_child_folder: str = r"/data/aus_prospectus/output/mapping_data/docs/"
# batch_initial_document(sample_document_list_folder=sample_document_list_folder,
# document_list_file=document_list_file,
# doc_source=doc_source,
# pdf_folder=pdf_folder,
# output_pdf_text_folder=output_pdf_text_folder,
# output_extract_data_child_folder=output_extract_data_child_folder,
# output_mapping_child_folder=output_mapping_child_folder)
# get_aus_prospectus_document_category()
2025-01-16 17:30:44 +00:00
re_run_extract_data = True
re_run_mapping_data = True
force_save_total_data = True
doc_source = "aus_prospectus"
# doc_source = "emea_ar"
if doc_source == "aus_prospectus":
document_sample_file = (
2025-03-17 11:39:48 +00:00
r"./sample_documents/aus_prospectus_46_documents_sample.txt"
)
with open(document_sample_file, "r", encoding="utf-8") as f:
special_doc_id_list = [doc_id.strip() for doc_id in f.readlines()]
document_mapping_file = r"/data/aus_prospectus/basic_information/46_documents/aus_prospectus_46_documents_mapping.xlsx"
# special_doc_id_list = ["521606755", "384508026", "544886057"]
pdf_folder: str = r"/data/aus_prospectus/pdf/"
output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
output_extract_data_child_folder: str = (
r"/data/aus_prospectus/output/extract_data/docs/"
)
output_extract_data_total_folder: str = (
r"/data/aus_prospectus/output/extract_data/total/"
)
output_mapping_child_folder: str = (
r"/data/aus_prospectus/output/mapping_data/docs/"
)
output_mapping_total_folder: str = (
r"/data/aus_prospectus/output/mapping_data/total/"
)
drilldown_folder = r"/data/aus_prospectus/output/drilldown/"
2025-01-27 18:32:36 +00:00
batch_run_documents(
doc_source=doc_source,
special_doc_id_list=special_doc_id_list,
pdf_folder=pdf_folder,
document_mapping_file=document_mapping_file,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_child_folder=output_extract_data_child_folder,
output_extract_data_total_folder=output_extract_data_total_folder,
output_mapping_child_folder=output_mapping_child_folder,
output_mapping_total_folder=output_mapping_total_folder,
drilldown_folder=drilldown_folder,
re_run_extract_data=re_run_extract_data,
re_run_mapping_data=re_run_mapping_data,
force_save_total_data=force_save_total_data
)
elif doc_source == "emea_ar":
special_doc_id_list = ["321733631"]
batch_run_documents(
doc_source=doc_source,
special_doc_id_list=special_doc_id_list,
re_run_extract_data=re_run_extract_data,
re_run_mapping_data=re_run_mapping_data,
force_save_total_data=force_save_total_data
)
2025-01-16 17:30:44 +00:00
# new_data_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_15_documents_by_text_20241121154243.xlsx"
# original_data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx"
# replace_rerun_data(new_data_file, original_data_file)
# test_calculate_metrics()
# test_replace_abbrevation()
# test_translate_pdf()
# test_mapping_raw_name()
# test_data_extraction_metrics()
2025-01-16 17:30:44 +00:00
# batch_filter_pdf_files(
# pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list
# )
# data_type = "page_filter"
# prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx"
# missing_error_list, metrics_list, metrics_file = get_metrics(
# data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder
# )
# test_auto_generate_instructions()
2025-01-16 17:30:44 +00:00
# batch_extract_data(
# pdf_folder,
# page_filter_ground_truth_file,
# output_extract_data_child_folder,
# output_extract_data_total_folder,
# special_doc_id_list,
# re_run,
# )
# doc_id = "476492237"
# extract_way = "image"
# extract_data(doc_id,
# pdf_folder,
# output_extract_data_child_folder,
# extract_way,
# re_run_extract_data)