dc-ml-emea-ar/main.py

1546 lines
62 KiB
Python

import os
import json
import numpy as np
import pandas as pd
from glob import glob
from tqdm import tqdm
import time
import fitz
import re
from io import BytesIO
from traceback import print_exc
from utils.logger import logger
from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.sql_query_util import query_document_fund_mapping
from utils.pdf_util import PDFUtil
from utils.biz_utils import add_slash_to_text_as_regex
from core.page_filter import FilterPages
from core.data_extraction import DataExtraction
from core.data_mapping import DataMapping
from core.auz_nz.hybrid_solution_script import api_for_fund_matching_call
from core.metrics import Metrics
class EMEA_AR_Parsing:
def __init__(
self,
doc_id: str,
doc_source: str = "emea_ar",
pdf_folder: str = r"/data/emea_ar/pdf/",
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
extract_way: str = "text",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
compare_with_provider: bool = True
) -> None:
self.doc_id = doc_id
self.doc_source = doc_source
self.pdf_folder = pdf_folder
os.makedirs(self.pdf_folder, exist_ok=True)
self.compare_with_provider = compare_with_provider
self.pdf_file = self.download_pdf()
self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
if extract_way is None or len(extract_way) == 0:
extract_way = "text"
self.extract_way = extract_way
self.output_extract_image_folder = None
if self.extract_way == "image":
self.output_extract_image_folder = (
r"/data/emea_ar/output/extract_data/images/"
)
os.makedirs(self.output_extract_image_folder, exist_ok=True)
if output_extract_data_folder is None or len(output_extract_data_folder) == 0:
output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/"
if not output_extract_data_folder.endswith("/"):
output_extract_data_folder = f"{output_extract_data_folder}/"
if extract_way is not None and len(extract_way) > 0:
output_extract_data_folder = (
f"{output_extract_data_folder}by_{extract_way}/"
)
self.output_extract_data_folder = output_extract_data_folder
os.makedirs(self.output_extract_data_folder, exist_ok=True)
if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0:
output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
if not output_mapping_data_folder.endswith("/"):
output_mapping_data_folder = f"{output_mapping_data_folder}/"
if extract_way is not None and len(extract_way) > 0:
output_mapping_data_folder = (
f"{output_mapping_data_folder}by_{extract_way}/"
)
self.output_mapping_data_folder = output_mapping_data_folder
os.makedirs(self.output_mapping_data_folder, exist_ok=True)
self.filter_pages = FilterPages(
self.doc_id,
self.pdf_file,
self.document_mapping_info_df,
self.doc_source,
output_pdf_text_folder,
)
self.page_text_dict = self.filter_pages.page_text_dict
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
self.datapoints = self.get_datapoints_from_datapoint_page_info()
if drilldown_folder is None or len(drilldown_folder) == 0:
drilldown_folder = r"/data/emea_ar/output/drilldown/"
os.makedirs(drilldown_folder, exist_ok=True)
self.drilldown_folder = drilldown_folder
misc_config_file = os.path.join(
f"./configuration/{doc_source}/", "misc_config.json"
)
if os.path.exists(misc_config_file):
with open(misc_config_file, "r", encoding="utf-8") as f:
misc_config = json.load(f)
self.apply_drilldown = misc_config.get("apply_drilldown", False)
else:
self.apply_drilldown = False
def download_pdf(self) -> str:
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
return pdf_file
def get_datapoint_page_info(self) -> tuple:
datapoint_page_info, result_details = self.filter_pages.start_job()
return datapoint_page_info, result_details
def get_datapoints_from_datapoint_page_info(self) -> list:
datapoints = list(self.datapoint_page_info.keys())
if "doc_id" in datapoints:
datapoints.remove("doc_id")
return datapoints
def extract_data(
self,
re_run: bool = False,
) -> list:
found_data = False
if not re_run:
output_data_json_folder = os.path.join(
self.output_extract_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The document: {self.doc_id} has been parsed, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
data_from_gpt = json.load(f)
found_data = True
if not found_data:
try:
data_extraction = DataExtraction(
self.doc_source,
self.doc_id,
self.pdf_file,
self.output_extract_data_folder,
self.page_text_dict,
self.datapoint_page_info,
self.datapoints,
self.document_mapping_info_df,
extract_way=self.extract_way,
output_image_folder=self.output_extract_image_folder,
)
data_from_gpt = data_extraction.extract_data()
except Exception as e:
logger.error(f"Error: {e}")
data_from_gpt = {"data": []}
# Drilldown data to relevant PDF document
annotation_list = []
if self.apply_drilldown:
try:
annotation_list = self.drilldown_pdf_document(data_from_gpt)
except Exception as e:
logger.error(f"Error: {e}")
return data_from_gpt, annotation_list
def drilldown_pdf_document(self, data_from_gpt: list) -> list:
logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}")
pdf_util = PDFUtil(self.pdf_file)
drilldown_data_list = []
for data in data_from_gpt:
doc_id = str(data.get("doc_id", ""))
page_index = data.get("page_index", -1)
if page_index == -1:
continue
extract_data_list = data.get("extract_data", {}).get("data", [])
dp_reported_name_dict = data.get("extract_data", {}).get(
"dp_reported_name", {}
)
highlighted_value_list = []
for extract_data in extract_data_list:
for data_point, value in extract_data.items():
if value in highlighted_value_list:
continue
if data_point in ["ter", "ogc", "performance_fee"]:
continue
drilldown_data = {
"doc_id": doc_id,
"page_index": page_index,
"data_point": data_point,
"parent_text_block": None,
"value": value,
"annotation_attribute": {},
}
drilldown_data_list.append(drilldown_data)
highlighted_value_list.append(value)
for data_point, reported_name in dp_reported_name_dict.items():
if reported_name in highlighted_value_list:
continue
data_point = f"{data_point}_reported_name"
drilldown_data = {
"doc_id": doc_id,
"page_index": page_index,
"data_point": data_point,
"parent_text_block": None,
"value": reported_name,
"annotation_attribute": {},
}
drilldown_data_list.append(drilldown_data)
highlighted_value_list.append(reported_name)
drilldown_result = pdf_util.batch_drilldown(
drilldown_data_list=drilldown_data_list,
output_pdf_folder=self.drilldown_folder,
)
annotation_list = []
if len(drilldown_result) > 0:
logger.info(f"Drilldown PDF document for doc_id: {doc_id} successfully")
annotation_list = drilldown_result.get("annotation_list", [])
for annotation in annotation_list:
annotation["doc_id"] = doc_id
if self.drilldown_folder is not None and len(self.drilldown_folder) > 0:
drilldown_data_folder = os.path.join(self.drilldown_folder, "data/")
os.makedirs(drilldown_data_folder, exist_ok=True)
drilldown_file = os.path.join(
drilldown_data_folder, f"{doc_id}_drilldown.xlsx"
)
drilldown_source_df = pd.DataFrame(drilldown_data_list)
annotation_list_df = pd.DataFrame(annotation_list)
# set drilldown_result_df column order as doc_id, pdf_file, page_index,
# data_point, value, matching_val_area, normalized_bbox
try:
annotation_list_df = annotation_list_df[
[
"doc_id",
"pdf_file",
"page_index",
"data_point",
"value",
"matching_val_area",
"normalized_bbox",
]
]
except Exception as e:
logger.error(f"Error: {e}")
logger.info(f"Writing drilldown data to {drilldown_file}")
try:
with pd.ExcelWriter(drilldown_file) as writer:
drilldown_source_df.to_excel(
writer, index=False, sheet_name="source_data"
)
annotation_list_df.to_excel(
writer, index=False, sheet_name="drilldown_data"
)
except Exception as e:
logger.error(f"Error: {e}")
annotation_list = annotation_list_df.to_dict(orient="records")
try:
drilldown_json_file = os.path.join(
drilldown_data_folder, f"{doc_id}_drilldown.json"
)
with open(drilldown_json_file, "w", encoding="utf-8") as f:
json.dump(annotation_list, f, ensure_ascii=False, indent=4)
except Exception as e:
logger.error(f"Error: {e}")
return annotation_list
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
if not re_run:
output_data_json_folder = os.path.join(
self.output_mapping_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
doc_mapping_data = json.load(f)
return doc_mapping_data
"""
doc_id,
datapoints: list,
raw_document_data_list: list,
document_mapping_info_df: pd.DataFrame,
output_data_folder: str,
"""
data_mapping = DataMapping(
self.doc_id,
self.datapoints,
data_from_gpt,
self.document_mapping_info_df,
self.output_mapping_data_folder,
self.doc_source,
compare_with_provider=self.compare_with_provider
)
return data_mapping.mapping_raw_data_entrance()
def filter_pages(doc_id: str, pdf_folder: str, doc_source: str) -> None:
logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id, doc_source=doc_source, pdf_folder=pdf_folder
)
datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info()
return datapoint_page_info, result_details
def extract_data(
doc_id: str,
doc_source: str,
pdf_folder: str,
output_data_folder: str,
extract_way: str = "text",
re_run: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_extract_data_folder=output_data_folder,
extract_way=extract_way,
)
data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run)
return data_from_gpt, annotation_list
def mapping_data(
doc_id: str,
pdf_folder: str,
output_pdf_text_folder: str,
output_extract_data_folder: str,
output_mapping_folder: str,
doc_source: str = "emea_ar",
extract_way: str = "text",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_folder,
output_mapping_data_folder=output_mapping_folder,
extract_way=extract_way,
drilldown_folder=drilldown_folder,
compare_with_provider=False
)
doc_data_from_gpt, annotation_list = emea_ar_parsing.extract_data(
re_run=re_run_extract_data
)
doc_mapping_data = emea_ar_parsing.mapping_data(
data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data
)
return doc_data_from_gpt, annotation_list, doc_mapping_data
def batch_extract_data(
pdf_folder: str,
doc_source: str = "emea_ar",
doc_data_excel_file: str = None,
output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
extract_way: str = "text",
special_doc_id_list: list = None,
re_run: bool = False,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
data_from_gpt = extract_data(
doc_id=doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_data_folder=output_child_folder,
extract_way=extract_way,
re_run=re_run,
)
result_list.extend(data_from_gpt)
if special_doc_id_list is None or len(special_doc_id_list) == 0:
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_total_folder}")
os.makedirs(output_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_total_folder,
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_df.to_excel(writer, index=False, sheet_name="extract_data_info")
def batch_start_job(
doc_source: str = "emea_ar",
pdf_folder: str = "/data/emea_ar/pdf/",
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
doc_data_excel_file: str = None,
document_mapping_file: str = None,
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
extract_way: str = "text",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
special_doc_id_list: list = None,
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
force_save_total_data: bool = False,
calculate_metrics: bool = False,
total_data_prefix: str = None,
):
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
doc_list.append(doc_id)
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_extract_data_list = []
result_mapping_data_list = []
for doc_id in tqdm(doc_list):
try:
doc_data_from_gpt, annotation_list, doc_mapping_data_list = mapping_data(
doc_id=doc_id,
pdf_folder=pdf_folder,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_folder=output_mapping_child_folder,
doc_source=doc_source,
extract_way=extract_way,
drilldown_folder=drilldown_folder,
re_run_extract_data=re_run_extract_data,
re_run_mapping_data=re_run_mapping_data,
)
result_extract_data_list.extend(doc_data_from_gpt)
result_mapping_data_list.extend(doc_mapping_data_list)
except Exception as e:
logger.error(f"Document: {doc_id} met error: {e}")
print_exc()
if force_save_total_data or (
special_doc_id_list is None or len(special_doc_id_list) == 0
):
result_extract_data_df = pd.DataFrame(result_extract_data_list)
result_extract_data_df.reset_index(drop=True, inplace=True)
result_mappingdata_df = pd.DataFrame(result_mapping_data_list)
result_mappingdata_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving extract data to {output_extract_data_total_folder}")
unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist()
os.makedirs(output_extract_data_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
file_name = f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx"
if total_data_prefix is not None and len(total_data_prefix) > 0:
file_name = f"{total_data_prefix}_{file_name}"
output_file = os.path.join(output_extract_data_total_folder, file_name)
with pd.ExcelWriter(output_file) as writer:
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data_info"
)
logger.info(f"Saving mapping data to {output_mapping_total_folder}")
unique_doc_ids = result_mappingdata_df["doc_id"].unique().tolist()
os.makedirs(output_mapping_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
file_name = f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx"
if total_data_prefix is not None and len(total_data_prefix) > 0:
file_name = f"{total_data_prefix}_{file_name}"
output_file = os.path.join(output_mapping_total_folder, file_name)
doc_mapping_data_in_db = only_output_mapping_data_in_db(result_mappingdata_df)
with pd.ExcelWriter(output_file) as writer:
doc_mapping_data_in_db.to_excel(
writer, index=False, sheet_name="data_in_doc_mapping"
)
result_mappingdata_df.to_excel(
writer, index=False, sheet_name="total_mapping_data"
)
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data"
)
if (
doc_source == "aus_prospectus"
and document_mapping_file is not None
and len(document_mapping_file) > 0
and os.path.exists(document_mapping_file)
):
try:
merged_total_data_folder = os.path.join(
output_mapping_total_folder, "merged/"
)
os.makedirs(merged_total_data_folder, exist_ok=True)
data_file_base_name = os.path.basename(output_file)
output_merged_data_file_path = os.path.join(
merged_total_data_folder, "merged_" + data_file_base_name
)
merge_output_data_aus_prospectus(
output_file, document_mapping_file, output_merged_data_file_path
)
except Exception as e:
logger.error(f"Error: {e}")
if calculate_metrics:
prediction_sheet_name = "data_in_doc_mapping"
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
ground_truth_sheet_name = "mapping_data"
metrics_output_folder = r"/data/emea_ar/output/metrics/"
# logger.info(f"Calculating metrics for data extraction")
# missing_error_list, metrics_list, metrics_file = get_metrics(
# "data_extraction",
# output_file,
# prediction_sheet_name,
# ground_truth_file,
# ground_truth_sheet_name,
# metrics_output_folder,
# )
# logger.info(f"Calculating metrics for investment mapping by actual document mapping")
# missing_error_list, metrics_list, metrics_file = get_metrics(
# "investment_mapping",
# output_file,
# prediction_sheet_name,
# ground_truth_file,
# ground_truth_sheet_name,
# metrics_output_folder,
# )
logger.info(
f"Calculating metrics for investment mapping by database document mapping"
)
missing_error_list, metrics_list, metrics_file = get_metrics(
"document_mapping_in_db",
output_file,
prediction_sheet_name,
ground_truth_file,
ground_truth_sheet_name,
metrics_output_folder,
)
def only_output_mapping_data_in_db(mapping_data: pd.DataFrame) -> None:
doc_id_list = mapping_data["doc_id"].unique().tolist()
data_in_mapping_df_list = []
for doc_id in doc_id_list:
doc_mapping_data = mapping_data[mapping_data["doc_id"] == doc_id]
document_mapping = query_document_fund_mapping(doc_id, rerun=False)
fund_id_list = document_mapping["FundId"].unique().tolist()
sec_id_list = document_mapping["SecId"].unique().tolist()
id_list = fund_id_list + sec_id_list
# filter doc_mapping_data by id_list or empty id
filter_doc_mapping_data = doc_mapping_data[
(doc_mapping_data["investment_id"].isin(id_list))
| (doc_mapping_data["investment_id"] == "")
]
data_in_mapping_df_list.append(filter_doc_mapping_data)
result_mapping_data_df = pd.concat(data_in_mapping_df_list)
result_mapping_data_df.reset_index(drop=True, inplace=True)
return result_mapping_data_df
def batch_filter_pdf_files(
pdf_folder: str,
doc_source: str = "emea_ar",
doc_data_excel_file: str = None,
output_folder: str = r"/data/emea_ar/output/filter_pages/",
special_doc_id_list: list = None,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
result_details = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
doc_datapoint_page_info, doc_result_details = filter_pages(
doc_id=doc_id, pdf_folder=pdf_folder, doc_source=doc_source
)
result_list.append(doc_datapoint_page_info)
result_details.extend(doc_result_details)
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
result_details_df = pd.DataFrame(result_details)
result_details_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_folder}")
os.makedirs(output_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_folder,
f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_df.to_excel(writer, index=False, sheet_name="dp_page_info")
result_details_df.to_excel(
writer, index=False, sheet_name="dp_page_info_details"
)
if len(special_doc_id_list) == 0:
logger.info(f"Calculating metrics for {output_file}")
metrics_output_folder = r"/data/emea_ar/output/metrics/"
missing_error_list, metrics_list, metrics_file = get_metrics(
data_type="page_filter",
prediction_file=output_file,
prediction_sheet_name="dp_page_info",
ground_truth_file=doc_data_excel_file,
output_folder=metrics_output_folder,
)
return missing_error_list, metrics_list, metrics_file
def get_metrics(
data_type: str,
prediction_file: str,
prediction_sheet_name: str,
ground_truth_file: str,
ground_truth_sheet_name: str = None,
output_folder: str = None,
) -> None:
metrics = Metrics(
data_type=data_type,
prediction_file=prediction_file,
prediction_sheet_name=prediction_sheet_name,
ground_truth_file=ground_truth_file,
ground_truth_sheet_name=ground_truth_sheet_name,
output_folder=output_folder,
)
missing_error_list, metrics_list, metrics_file = metrics.get_metrics(
strict_model=False
)
return missing_error_list, metrics_list, metrics_file
def test_auto_generate_instructions():
"""
doc_id: str,
pdf_file: str,
page_text_dict: dict,
datapoint_page_info: dict,
document_mapping_info_df: pd.DataFrame
"""
doc_id = "402397014"
pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf"
document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df)
page_text_dict = filter_pages.page_text_dict
datapoint_page_info, datapoint_page_info_details = filter_pages.start_job()
datapoint_list = list(datapoint_page_info.keys())
datapoint_list.remove("doc_id")
data_extraction = DataExtraction(
"emear_ar",
doc_id,
pdf_file,
page_text_dict,
datapoint_page_info,
document_mapping_info_df,
)
page_index_list = list(page_text_dict.keys())
if len(page_index_list) > 0:
page_text = ""
for datapoint in datapoint_list:
if len(datapoint_page_info[datapoint]) > 0:
page_index_list = datapoint_page_info[datapoint]
page_text = page_text_dict[page_index_list[0]]
break
output_folder = (
r"/data/emea_ar/basic_information/prompts_example/generate_by_config/"
)
os.makedirs(output_folder, exist_ok=True)
tor_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["tor"]
)
with open(
os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(tor_instructions_text)
ter_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter"]
)
with open(
os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ter_instructions_text)
ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ogc"]
)
with open(
os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ogc_instructions_text)
performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["performance_fee"]
)
)
with open(
os.path.join(output_folder, "performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(performance_fee_instructions_text)
ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "ogc"]
)
with open(
os.path.join(output_folder, "ter_ogc_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_ogc_instructions_text)
ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_performance_fee_instructions_text)
ogc_ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ogc", "ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ogc_ter_performance_fee_instructions_text)
def test_data_extraction_metrics():
data_type = "document_mapping_in_db"
# prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx"
prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_51_documents_by_text_20250127104008.xlsx"
# prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx"
prediction_sheet_name = "data_in_doc_mapping"
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
ground_truth_sheet_name = "mapping_data"
metrics_output_folder = r"/data/emea_ar/output/metrics/"
missing_error_list, metrics_list, metrics_file = get_metrics(
data_type,
prediction_file,
prediction_sheet_name,
ground_truth_file,
ground_truth_sheet_name,
metrics_output_folder,
)
def test_mapping_raw_name():
doc_id = "337293427"
# KBC Bonds Inflation-Linked Bonds Distribution Shares
# KBC Bonds Inflation-Linked Bonds Institutional B Shares
raw_name = "KBC Bonds Inflation-Linked Bonds Institutional B Shares"
raw_share_name = "Institutional B Shares"
output_folder = r"/data/emea_ar/output/mapping_data/docs/by_text/"
data_mapping = DataMapping(
doc_id,
datapoints=None,
raw_document_data_list=None,
document_mapping_info_df=None,
output_data_folder=output_folder,
)
process_cache = {}
mapping_info = data_mapping.matching_with_database(
raw_name=raw_name,
raw_share_name=raw_share_name,
parent_id="FSGBR051XK",
matching_type="share",
process_cache=process_cache,
)
print(mapping_info)
def test_translate_pdf():
from core.data_translate import Translate_PDF
pdf_file = r"/data/emea_ar/pdf/451063582.pdf"
output_folder = r"/data/translate/output/"
translate_pdf = Translate_PDF(pdf_file, output_folder)
translate_pdf.start_job()
def test_replace_abbrevation():
from utils.biz_utils import replace_abbrevation
text_list = [
"M&G European Credit Investment Fund A CHFH Acc",
"M&G European Credit Investment Fund A CHFHInc",
"M&G European Credit Investment Fund A USDHAcc",
"M&G European High Yield Credit Investment Fund E GBPHedgedAcc",
"M&G Sustainable European Credit Investment Fd Cl L GBPH Acc",
"M&G Sustainable Total Return Credit Investment Fd AI HGBPInc",
"M&G Total Return Credit Investment Fund Class WI GBPHedgedInc",
"M&G Total Return Credit Investment Fund Class W GBP HedgedInc",
"M&G Total Return Credit Investment Fund Class P CHF H Acc",
"M&G Total Return Credit Investment Fund P EUR Inc",
]
for text in text_list:
result = replace_abbrevation(text)
logger.info(f"Original text: {text}, replaced text: {result}")
def test_calculate_metrics():
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx"
mapping_file = r"/data/emea_ar/basic_information/English/sample_doc/emea_doc_with_all_4_dp/doc_ar_data_with_all_4_dp.xlsx"
data_df = pd.read_excel(data_file, sheet_name="data_in_doc_mapping")
data_df = data_df[data_df["check"].isin([0, 1])]
data_df.fillna("", inplace=True)
data_df.reset_index(drop=True, inplace=True)
mapping_df = pd.read_excel(mapping_file, sheet_name="doc_ar_data_in_db")
mapping_fund_id = mapping_df["FundId"].unique().tolist()
mapping_share_id = mapping_df["FundClassId"].unique().tolist()
mapping_id_list = mapping_fund_id + mapping_share_id
# filter data_df whether investment_id in mapping_id_list
filter_data_df = data_df[
(data_df["investment_id"].isin(mapping_id_list))
| (data_df["investment_id"] == "")
]
# Investment mapping data
mapping_metrics = get_sub_metrics(filter_data_df, "investment_mapping")
logger.info(f"Investment mapping metrics: {mapping_metrics}")
# tor data
tor_data_df = filter_data_df[filter_data_df["datapoint"] == "tor"]
tor_metrics = get_sub_metrics(tor_data_df, "tor")
logger.info(f"TOR metrics: {tor_metrics}")
# ter data
ter_data_df = filter_data_df[filter_data_df["datapoint"] == "ter"]
ter_metrics = get_sub_metrics(ter_data_df, "ter")
logger.info(f"TER metrics: {ter_metrics}")
# ogc data
ogc_data_df = filter_data_df[filter_data_df["datapoint"] == "ogc"]
ogc_metrics = get_sub_metrics(ogc_data_df, "ogc")
logger.info(f"OGC metrics: {ogc_metrics}")
# performance_fee data
performance_fee_data_df = filter_data_df[
filter_data_df["datapoint"] == "performance_fee"
]
performance_fee_metrics = get_sub_metrics(
performance_fee_data_df, "performance_fee"
)
logger.info(f"Performance fee metrics: {performance_fee_metrics}")
metrics_df = pd.DataFrame(
[
mapping_metrics,
tor_metrics,
ter_metrics,
ogc_metrics,
performance_fee_metrics,
]
)
metrics_df.reset_index(drop=True, inplace=True)
output_folder = r"/data/emea_ar/ground_truth/data_extraction/verify/"
output_metrics_file = os.path.join(
output_folder,
r"mapping_data_info_30_documents_all_4_datapoints_roughly_metrics.xlsx",
)
with pd.ExcelWriter(output_metrics_file) as writer:
metrics_df.to_excel(writer, index=False, sheet_name="metrics")
def get_sub_metrics(data_df: pd.DataFrame, data_point: str) -> dict:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
gt_list = [1] * len(data_df)
pre_list = data_df["check"].tolist()
# convert pre_list member to be integer
pre_list = [int(pre) for pre in pre_list]
for index, row in data_df.iterrows():
if row["check"] == 0 and len(row["investment_id"].strip()) > 0:
pre_list.append(1)
gt_list.append(0)
# calculate metrics
accuracy = accuracy_score(gt_list, pre_list)
precision = precision_score(gt_list, pre_list)
recall = recall_score(gt_list, pre_list)
f1 = f1_score(gt_list, pre_list)
support = len(data_df)
metrics = {
"DataPoint": data_point,
"F1": f1,
"Precision": precision,
"Recall": recall,
"Accuracy": accuracy,
"Support": support,
}
return metrics
def replace_rerun_data(new_data_file: str, original_data_file: str):
data_in_doc_mapping_sheet = "data_in_doc_mapping"
total_mapping_data_sheet = "total_mapping_data"
extract_data_sheet = "extract_data"
new_data_in_doc_mapping = pd.read_excel(
new_data_file, sheet_name=data_in_doc_mapping_sheet
)
new_total_mapping_data = pd.read_excel(
new_data_file, sheet_name=total_mapping_data_sheet
)
new_extract_data = pd.read_excel(new_data_file, sheet_name=extract_data_sheet)
document_list = new_data_in_doc_mapping["doc_id"].unique().tolist()
original_data_in_doc_mapping = pd.read_excel(
original_data_file, sheet_name=data_in_doc_mapping_sheet
)
original_total_mapping_data = pd.read_excel(
original_data_file, sheet_name=total_mapping_data_sheet
)
original_extract_data = pd.read_excel(
original_data_file, sheet_name=extract_data_sheet
)
# remove data in original data by document_list
original_data_in_doc_mapping = original_data_in_doc_mapping[
~original_data_in_doc_mapping["doc_id"].isin(document_list)
]
original_total_mapping_data = original_total_mapping_data[
~original_total_mapping_data["doc_id"].isin(document_list)
]
original_extract_data = original_extract_data[
~original_extract_data["doc_id"].isin(document_list)
]
# merge new data to original data
new_data_in_doc_mapping = pd.concat(
[original_data_in_doc_mapping, new_data_in_doc_mapping]
)
new_data_in_doc_mapping.reset_index(drop=True, inplace=True)
new_total_mapping_data = pd.concat(
[original_total_mapping_data, new_total_mapping_data]
)
new_total_mapping_data.reset_index(drop=True, inplace=True)
new_extract_data = pd.concat([original_extract_data, new_extract_data])
new_extract_data.reset_index(drop=True, inplace=True)
with pd.ExcelWriter(original_data_file) as writer:
new_data_in_doc_mapping.to_excel(
writer, index=False, sheet_name=data_in_doc_mapping_sheet
)
new_total_mapping_data.to_excel(
writer, index=False, sheet_name=total_mapping_data_sheet
)
new_extract_data.to_excel(writer, index=False, sheet_name=extract_data_sheet)
def batch_run_documents(
doc_source: str = "emea_ar",
special_doc_id_list: list = None,
pdf_folder: str = r"/data/emea_ar/pdf/",
document_mapping_file: str = None,
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
):
sample_document_list_folder = r"./sample_documents/"
document_list_files = glob(sample_document_list_folder + "*.txt")
page_filter_ground_truth_file = (
r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx"
)
re_run_extract_data = True
re_run_mapping_data = True
force_save_total_data = True
calculate_metrics = False
extract_way = "text"
# special_doc_id_list = []
if special_doc_id_list is None or len(special_doc_id_list) == 0:
force_save_total_data = True
file_base_name_candidates = []
for document_list_file in document_list_files:
file_base_name = os.path.basename(document_list_file).replace(".txt", "")
if (
file_base_name_candidates is not None
and len(file_base_name_candidates) > 0
and file_base_name not in file_base_name_candidates
):
continue
with open(document_list_file, "r", encoding="utf-8") as f:
doc_id_list = f.readlines()
doc_id_list = [doc_id.strip() for doc_id in doc_id_list]
batch_start_job(
doc_source,
pdf_folder,
output_pdf_text_folder,
page_filter_ground_truth_file,
document_mapping_file,
output_extract_data_child_folder,
output_mapping_child_folder,
output_extract_data_total_folder,
output_mapping_total_folder,
extract_way,
drilldown_folder,
doc_id_list,
re_run_extract_data,
re_run_mapping_data,
force_save_total_data=force_save_total_data,
calculate_metrics=calculate_metrics,
total_data_prefix=file_base_name,
)
else:
batch_start_job(
doc_source,
pdf_folder,
output_pdf_text_folder,
page_filter_ground_truth_file,
document_mapping_file,
output_extract_data_child_folder,
output_mapping_child_folder,
output_extract_data_total_folder,
output_mapping_total_folder,
extract_way,
drilldown_folder,
special_doc_id_list,
re_run_extract_data,
re_run_mapping_data,
force_save_total_data=force_save_total_data,
calculate_metrics=calculate_metrics,
)
def batch_initial_document(
sample_document_list_folder: str = r"./sample_documents/",
document_list_file: str = "sample_document_complex.txt",
doc_source: str = "emea_ar",
pdf_folder: str = r"/data/emea_ar/pdf/",
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
):
document_list_file_path = os.path.join(
sample_document_list_folder, document_list_file
)
with open(document_list_file_path, "r", encoding="utf-8") as f:
doc_id_list = f.readlines()
doc_id_list = [doc_id.strip() for doc_id in doc_id_list]
for doc_id in tqdm(doc_id_list):
logger.info(f"Start to initial document: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id=doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_data_folder=output_mapping_child_folder,
)
def merge_output_data(
data_file_path: str, document_mapping_file: str, output_data_file_path: str
):
data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data")
document_mapping_df = pd.read_excel(document_mapping_file, sheet_name="doc_date")
# set doc_id to be string type
data_df["doc_id"] = data_df["doc_id"].astype(str)
document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str)
"""
doc_id page_index raw_name datapoint value raw_check comment investment_type investment_id investment_name similarity
553242368 344 Deutsche MSCI World Index Fund tor 61 33 FS0000AY1Y Xtrackers MSCI World Index Fund 0.75
553242368 344 db x-trackers EUR Liquid Corporate 12.5 UCITS ETF - Klasse 1C ter 0.35 1 F000018PY1 Xtrackers EUR Corporate Green Bond UCITS ETF 1C 0.462
"""
doc_id_list = data_df["doc_id"].unique().tolist()
data_point_dict = {
"tor": "TurnoverRatio",
"ter": "NetExpenseRatio",
"ogc": "OngoingCharge",
"performance_fee": "PerformanceFee",
}
total_data_list = []
for doc_id in tqdm(doc_id_list):
doc_data_list = []
doc_data_df = data_df[data_df["doc_id"] == doc_id]
doc_date = str(
document_mapping_df[document_mapping_df["DocumentId"] == doc_id][
"EffectiveDate"
].values[0]
)[0:10]
exist_raw_name_list = []
for index, row in doc_data_df.iterrows():
doc_id = str(row["doc_id"])
page_index = int(row["page_index"])
raw_name = str(row["raw_name"])
datapoint = str(row["datapoint"])
value = row["value"]
investment_type = row["investment_type"]
investment_id = row["investment_id"]
investment_name = row["investment_name"]
exist = False
for exist_raw_name_info in exist_raw_name_list:
exist_raw_name = exist_raw_name_info["raw_name"]
exist_investment_type = exist_raw_name_info["investment_type"]
if (
exist_raw_name == raw_name
and exist_investment_type == investment_type
):
exist = True
break
if not exist:
data = {
"DocumentId": doc_id,
"investment_type": investment_type,
"investment_id": investment_id,
"investment_name": investment_name,
"EffectiveDate": doc_date,
"page_index": [],
"RawName": raw_name,
"NetExpenseRatio": "",
"OngoingCharge": "",
"TurnoverRatio": "",
"PerformanceFee": "",
}
exist_raw_name_list.append(
{"raw_name": raw_name, "investment_type": investment_type}
)
doc_data_list.append(data)
# find data from total_data_list by raw_name
for data in doc_data_list:
if (
data["RawName"] == raw_name
and data["investment_type"] == investment_type
):
update_key = data_point_dict[datapoint]
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
break
total_data_list.extend(doc_data_list)
total_data_df = pd.DataFrame(total_data_list)
total_data_df.fillna("", inplace=True)
with pd.ExcelWriter(output_data_file_path) as writer:
total_data_df.to_excel(writer, index=False, sheet_name="total_data")
def merge_output_data_aus_prospectus(
data_file_path: str, document_mapping_file: str, output_data_file_path: str
):
# TODO: merge output data for aus prospectus, plan to realize it on 2025-01-16
data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data")
document_mapping_df = pd.read_excel(
document_mapping_file, sheet_name="document_mapping"
)
# set doc_id to be string type
data_df["doc_id"] = data_df["doc_id"].astype(str)
document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str)
doc_id_list = data_df["doc_id"].unique().tolist()
datapoint_keyword_config_file = (
r"./configuration/aus_prospectus/datapoint_name.json"
)
with open(datapoint_keyword_config_file, "r", encoding="utf-8") as f:
datapoint_keyword_config = json.load(f)
datapoint_name_list = list(datapoint_keyword_config.keys())
total_data_list = []
for doc_id in tqdm(doc_id_list):
doc_data_list = []
doc_date = str(
document_mapping_df[document_mapping_df["DocumentId"] == doc_id][
"EffectiveDate"
].values[0]
)[0:10]
share_doc_data_df = data_df[
(data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 1)
]
exist_raw_name_list = []
for index, row in share_doc_data_df.iterrows():
doc_id = str(row["doc_id"])
page_index = int(row["page_index"])
raw_fund_name = str(row["raw_fund_name"])
raw_share_name = str(row["raw_share_name"])
raw_name = str(row["raw_name"])
datapoint = str(row["datapoint"])
value = row["value"]
investment_type = row["investment_type"]
share_class_id = row["investment_id"]
share_class_legal_name = row["investment_name"]
fund_id = ""
fund_legal_name = ""
if share_class_id != "":
record_row = document_mapping_df[
document_mapping_df["FundClassId"] == share_class_id
]
if len(record_row) > 0:
fund_id = record_row["FundId"].values[0]
fund_legal_name = record_row["FundLegalName"].values[0]
exist = False
for exist_raw_name_info in exist_raw_name_list:
exist_raw_name = exist_raw_name_info["raw_name"]
exist_investment_type = exist_raw_name_info["investment_type"]
if (
exist_raw_name == raw_name
and exist_investment_type == investment_type
):
exist = True
break
if not exist:
data = {
"DocumentId": doc_id,
"raw_fund_name": raw_fund_name,
"raw_share_name": raw_share_name,
"raw_name": raw_name,
"fund_id": fund_id,
"fund_name": fund_legal_name,
"sec_id": share_class_id,
"sec_name": share_class_legal_name,
"EffectiveDate": doc_date,
"page_index": [],
"RawName": raw_name,
}
for datapoint_name in datapoint_name_list:
data[datapoint_name] = ""
exist_raw_name_list.append(
{"raw_name": raw_name, "investment_type": investment_type}
)
doc_data_list.append(data)
# find data from total_data_list by raw_name
for data in doc_data_list:
if data["raw_name"] == raw_name:
update_key = datapoint
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
break
fund_doc_data_df = data_df[
(data_df["doc_id"] == doc_id) & (data_df["investment_type"] == 33)
]
fund_doc_data_df.fillna("", inplace=True)
for index, row in fund_doc_data_df.iterrows():
doc_id = str(row["doc_id"])
page_index = int(row["page_index"])
raw_fund_name = str(row["raw_fund_name"])
raw_share_name = ""
raw_name = str(row["raw_name"])
datapoint = str(row["datapoint"])
value = row["value"]
fund_id = row["investment_id"]
fund_legal_name = row["investment_name"]
exist = False
if fund_id != "":
for data in doc_data_list:
if (fund_id != "" and data["fund_id"] == fund_id) or (
data["raw_fund_name"] == raw_fund_name
):
update_key = datapoint
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
exist = True
else:
for data in doc_data_list:
if data["raw_name"] == raw_name:
update_key = datapoint
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
exist = True
if not exist:
data = {
"DocumentId": doc_id,
"raw_fund_name": raw_fund_name,
"raw_share_name": "",
"raw_name": raw_name,
"fund_id": fund_id,
"fund_name": fund_legal_name,
"sec_id": "",
"sec_name": "",
"EffectiveDate": doc_date,
"page_index": [page_index],
"RawName": raw_name,
}
for datapoint_name in datapoint_name_list:
data[datapoint_name] = ""
data[datapoint] = value
doc_data_list.append(data)
total_data_list.extend(doc_data_list)
total_data_df = pd.DataFrame(total_data_list)
total_data_df.fillna("", inplace=True)
with pd.ExcelWriter(output_data_file_path) as writer:
total_data_df.to_excel(writer, index=False, sheet_name="total_data")
if __name__ == "__main__":
# test_data_extraction_metrics()
# data_file_path = r"/data/aus_prospectus/output/mapping_data/total/mapping_data_info_11_documents_by_text_20250116220811.xlsx"
# document_mapping_file_path = r"/data/aus_prospectus/basic_information/11_documents/document_mapping.xlsx"
# merged_total_data_folder = r'/data/aus_prospectus/output/mapping_data/total/merged/'
# os.makedirs(merged_total_data_folder, exist_ok=True)
# data_file_base_name = os.path.basename(data_file_path)
# output_data_file_path = os.path.join(merged_total_data_folder, "merged_" + data_file_base_name)
# merge_output_data_aus_prospectus(data_file_path, document_mapping_file_path, output_data_file_path)
# doc_source = "aus_prospectus"
# sample_document_list_folder: str = r'./sample_documents/'
# document_list_file: str = "aus_prospectus_100_documents_multi_fund_sample.txt"
# pdf_folder: str = r"/data/aus_prospectus/pdf/"
# output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
# output_extract_data_child_folder: str = r"/data/aus_prospectus/output/extract_data/docs/"
# output_mapping_child_folder: str = r"/data/aus_prospectus/output/mapping_data/docs/"
# batch_initial_document(sample_document_list_folder=sample_document_list_folder,
# document_list_file=document_list_file,
# doc_source=doc_source,
# pdf_folder=pdf_folder,
# output_pdf_text_folder=output_pdf_text_folder,
# output_extract_data_child_folder=output_extract_data_child_folder,
# output_mapping_child_folder=output_mapping_child_folder)
# special_doc_id_list = ["553242411"]
doc_source = "aus_prospectus"
if doc_source == "aus_prospectus":
# document_sample_file = (
# r"./sample_documents/aus_prospectus_100_documents_multi_fund_sample.txt"
# )
document_sample_file = (
r"./sample_documents/aus_prospectus_17_documents_sample.txt"
)
with open(document_sample_file, "r", encoding="utf-8") as f:
special_doc_id_list = [doc_id.strip() for doc_id in f.readlines()]
# document_mapping_file = r"/data/aus_prospectus/basic_information/from_2024_documents/aus_100_document_prospectus_multi_fund.xlsx"
document_mapping_file = r"/data/aus_prospectus/basic_information/17_documents/aus_prospectus_17_documents_mapping.xlsx"
# special_doc_id_list: list = [
# "539790009",
# "542300403",
# "542301117",
# "542306317",
# "547567013",
# "552505237",
# "552505278",
# "554431052",
# "554851189",
# "555377021",
# "555654388",
# ]
special_doc_id_list: list = ["377377369"]
pdf_folder: str = r"/data/aus_prospectus/pdf/"
output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
output_extract_data_child_folder: str = (
r"/data/aus_prospectus/output/extract_data/docs/"
)
output_extract_data_total_folder: str = (
r"/data/aus_prospectus/output/extract_data/total/"
)
output_mapping_child_folder: str = (
r"/data/aus_prospectus/output/mapping_data/docs/"
)
output_mapping_total_folder: str = (
r"/data/aus_prospectus/output/mapping_data/total/"
)
drilldown_folder = r"/data/aus_prospectus/output/drilldown/"
batch_run_documents(
doc_source=doc_source,
special_doc_id_list=special_doc_id_list,
pdf_folder=pdf_folder,
document_mapping_file=document_mapping_file,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_child_folder=output_extract_data_child_folder,
output_extract_data_total_folder=output_extract_data_total_folder,
output_mapping_child_folder=output_mapping_child_folder,
output_mapping_total_folder=output_mapping_total_folder,
drilldown_folder=drilldown_folder,
)
elif doc_source == "emea_ar":
special_doc_id_list = [
"292989214",
"316237292",
"321733631",
"323390570",
"327956364",
"333207452",
"334718372",
"344636875",
"362246081",
"366179419",
"380945052",
"382366116",
"387202452",
"389171486",
"391456740",
"391736837",
"394778487",
"401684600",
"402113224",
"402181770",
"402397014",
"405803396",
"445102363",
"445256897",
"448265376",
"449555622",
"449623976",
"458291624",
"458359181",
"463081566",
"469138353",
"471641628",
"476492237",
"478585901",
"478586066",
"479042264",
"479793787",
"481475385",
"483617247",
"486378555",
"486383912",
"492121213",
"497497599",
"502693599",
"502821436",
"503194284",
"506559375",
"507967525",
"508854243",
"509845549",
"520879048",
"529925114",
]
# special_doc_id_list = ["532438210"]
batch_run_documents(
doc_source=doc_source, special_doc_id_list=special_doc_id_list
)
# new_data_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_15_documents_by_text_20241121154243.xlsx"
# original_data_file = r"/data/emea_ar/ground_truth/data_extraction/verify/mapping_data_info_30_documents_all_4_datapoints_20241106_verify_mapping.xlsx"
# replace_rerun_data(new_data_file, original_data_file)
# test_calculate_metrics()
# test_replace_abbrevation()
# test_translate_pdf()
# test_mapping_raw_name()
# test_data_extraction_metrics()
# batch_filter_pdf_files(
# pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list
# )
# data_type = "page_filter"
# prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx"
# missing_error_list, metrics_list, metrics_file = get_metrics(
# data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder
# )
# test_auto_generate_instructions()
# batch_extract_data(
# pdf_folder,
# page_filter_ground_truth_file,
# output_extract_data_child_folder,
# output_extract_data_total_folder,
# special_doc_id_list,
# re_run,
# )
# doc_id = "476492237"
# extract_way = "image"
# extract_data(doc_id,
# pdf_folder,
# output_extract_data_child_folder,
# extract_way,
# re_run_extract_data)