814 lines
29 KiB
Python
814 lines
29 KiB
Python
import os
|
|
import json
|
|
import pandas as pd
|
|
from glob import glob
|
|
from tqdm import tqdm
|
|
import time
|
|
from utils.logger import logger
|
|
from utils.pdf_download import download_pdf_from_documents_warehouse
|
|
from utils.sql_query_util import query_document_fund_mapping
|
|
from core.page_filter import FilterPages
|
|
from core.data_extraction import DataExtraction
|
|
from core.data_mapping import DataMapping
|
|
from core.metrics import Metrics
|
|
|
|
|
|
class EMEA_AR_Parsing:
|
|
def __init__(
|
|
self,
|
|
doc_id: str,
|
|
pdf_folder: str = r"/data/emea_ar/pdf/",
|
|
output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
|
|
output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
|
|
extract_way: str = "text",
|
|
) -> None:
|
|
self.doc_id = doc_id
|
|
self.pdf_folder = pdf_folder
|
|
os.makedirs(self.pdf_folder, exist_ok=True)
|
|
self.pdf_file = self.download_pdf()
|
|
self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
|
|
|
|
if extract_way is None or len(extract_way) == 0:
|
|
extract_way = "text"
|
|
self.extract_way = extract_way
|
|
self.output_extract_image_folder = None
|
|
if self.extract_way == "image":
|
|
self.output_extract_image_folder = (
|
|
r"/data/emea_ar/output/extract_data/images/"
|
|
)
|
|
os.makedirs(self.output_extract_image_folder, exist_ok=True)
|
|
|
|
if output_extract_data_folder is None or len(output_extract_data_folder) == 0:
|
|
output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/"
|
|
if not output_extract_data_folder.endswith("/"):
|
|
output_extract_data_folder = f"{output_extract_data_folder}/"
|
|
if extract_way is not None and len(extract_way) > 0:
|
|
output_extract_data_folder = (
|
|
f"{output_extract_data_folder}by_{extract_way}/"
|
|
)
|
|
self.output_extract_data_folder = output_extract_data_folder
|
|
os.makedirs(self.output_extract_data_folder, exist_ok=True)
|
|
|
|
if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0:
|
|
output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
|
|
if not output_mapping_data_folder.endswith("/"):
|
|
output_mapping_data_folder = f"{output_mapping_data_folder}/"
|
|
if extract_way is not None and len(extract_way) > 0:
|
|
output_mapping_data_folder = (
|
|
f"{output_mapping_data_folder}by_{extract_way}/"
|
|
)
|
|
self.output_mapping_data_folder = output_mapping_data_folder
|
|
os.makedirs(self.output_mapping_data_folder, exist_ok=True)
|
|
|
|
self.filter_pages = FilterPages(
|
|
self.doc_id, self.pdf_file, self.document_mapping_info_df
|
|
)
|
|
self.page_text_dict = self.filter_pages.page_text_dict
|
|
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
|
|
self.datapoints = self.get_datapoints_from_datapoint_page_info()
|
|
|
|
def download_pdf(self) -> str:
|
|
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
|
|
return pdf_file
|
|
|
|
def get_datapoint_page_info(self) -> tuple:
|
|
datapoint_page_info, result_details = self.filter_pages.start_job()
|
|
return datapoint_page_info, result_details
|
|
|
|
def get_datapoints_from_datapoint_page_info(self) -> list:
|
|
datapoints = list(self.datapoint_page_info.keys())
|
|
if "doc_id" in datapoints:
|
|
datapoints.remove("doc_id")
|
|
return datapoints
|
|
|
|
def extract_data(
|
|
self,
|
|
re_run: bool = False,
|
|
) -> list:
|
|
if not re_run:
|
|
output_data_json_folder = os.path.join(
|
|
self.output_extract_data_folder, "json/"
|
|
)
|
|
os.makedirs(output_data_json_folder, exist_ok=True)
|
|
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
|
|
if os.path.exists(json_file):
|
|
logger.info(
|
|
f"The document: {self.doc_id} has been parsed, loading data from {json_file}"
|
|
)
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
data_from_gpt = json.load(f)
|
|
return data_from_gpt
|
|
|
|
data_extraction = DataExtraction(
|
|
self.doc_id,
|
|
self.pdf_file,
|
|
self.output_extract_data_folder,
|
|
self.page_text_dict,
|
|
self.datapoint_page_info,
|
|
self.datapoints,
|
|
self.document_mapping_info_df,
|
|
extract_way=self.extract_way,
|
|
output_image_folder=self.output_extract_image_folder,
|
|
)
|
|
data_from_gpt = data_extraction.extract_data()
|
|
return data_from_gpt
|
|
|
|
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
|
|
if not re_run:
|
|
output_data_json_folder = os.path.join(
|
|
self.output_mapping_data_folder, "json/"
|
|
)
|
|
os.makedirs(output_data_json_folder, exist_ok=True)
|
|
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
|
|
if os.path.exists(json_file):
|
|
logger.info(
|
|
f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}"
|
|
)
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
doc_mapping_data = json.load(f)
|
|
return doc_mapping_data
|
|
"""
|
|
doc_id,
|
|
datapoints: list,
|
|
raw_document_data_list: list,
|
|
document_mapping_info_df: pd.DataFrame,
|
|
output_data_folder: str,
|
|
"""
|
|
data_mapping = DataMapping(
|
|
self.doc_id,
|
|
self.datapoints,
|
|
data_from_gpt,
|
|
self.document_mapping_info_df,
|
|
self.output_mapping_data_folder,
|
|
)
|
|
return data_mapping.mapping_raw_data()
|
|
|
|
|
|
def filter_pages(doc_id: str, pdf_folder: str) -> None:
|
|
logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}")
|
|
emea_ar_parsing = EMEA_AR_Parsing(doc_id, pdf_folder)
|
|
datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info()
|
|
return datapoint_page_info, result_details
|
|
|
|
|
|
def extract_data(
|
|
doc_id: str,
|
|
pdf_folder: str,
|
|
output_data_folder: str,
|
|
extract_way: str = "text",
|
|
re_run: bool = False,
|
|
) -> None:
|
|
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
|
|
emea_ar_parsing = EMEA_AR_Parsing(
|
|
doc_id,
|
|
pdf_folder,
|
|
output_extract_data_folder=output_data_folder,
|
|
extract_way=extract_way,
|
|
)
|
|
data_from_gpt = emea_ar_parsing.extract_data(re_run)
|
|
return data_from_gpt
|
|
|
|
|
|
def mapping_data(
|
|
doc_id: str,
|
|
pdf_folder: str,
|
|
output_extract_data_folder: str,
|
|
output_mapping_folder: str,
|
|
extract_way: str = "text",
|
|
re_run_extract_data: bool = False,
|
|
re_run_mapping_data: bool = False,
|
|
) -> None:
|
|
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
|
|
emea_ar_parsing = EMEA_AR_Parsing(
|
|
doc_id,
|
|
pdf_folder,
|
|
output_extract_data_folder=output_extract_data_folder,
|
|
output_mapping_data_folder=output_mapping_folder,
|
|
extract_way=extract_way,
|
|
)
|
|
doc_data_from_gpt = emea_ar_parsing.extract_data(re_run=re_run_extract_data)
|
|
doc_mapping_data = emea_ar_parsing.mapping_data(
|
|
data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data
|
|
)
|
|
return doc_data_from_gpt, doc_mapping_data
|
|
|
|
|
|
def batch_extract_data(
|
|
pdf_folder: str,
|
|
doc_data_excel_file: str = None,
|
|
output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
|
|
output_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
|
|
extract_way: str = "text",
|
|
special_doc_id_list: list = None,
|
|
re_run: bool = False,
|
|
) -> None:
|
|
pdf_files = glob(pdf_folder + "*.pdf")
|
|
doc_list = []
|
|
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
|
|
doc_list = special_doc_id_list
|
|
if (
|
|
len(doc_list) == 0
|
|
and doc_data_excel_file is not None
|
|
and len(doc_data_excel_file) > 0
|
|
and os.path.exists(doc_data_excel_file)
|
|
):
|
|
doc_data_df = pd.read_excel(doc_data_excel_file)
|
|
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
|
|
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
|
|
|
|
result_list = []
|
|
for pdf_file in tqdm(pdf_files):
|
|
pdf_base_name = os.path.basename(pdf_file)
|
|
doc_id = pdf_base_name.split(".")[0]
|
|
if doc_list is not None and doc_id not in doc_list:
|
|
continue
|
|
data_from_gpt = extract_data(
|
|
doc_id=doc_id,
|
|
pdf_folder=pdf_folder,
|
|
output_data_folder=output_child_folder,
|
|
extract_way=extract_way,
|
|
re_run=re_run,
|
|
)
|
|
result_list.extend(data_from_gpt)
|
|
|
|
if special_doc_id_list is None or len(special_doc_id_list) == 0:
|
|
result_df = pd.DataFrame(result_list)
|
|
result_df.reset_index(drop=True, inplace=True)
|
|
|
|
logger.info(f"Saving the result to {output_total_folder}")
|
|
os.makedirs(output_total_folder, exist_ok=True)
|
|
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
output_file = os.path.join(
|
|
output_total_folder,
|
|
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
|
|
)
|
|
with pd.ExcelWriter(output_file) as writer:
|
|
result_df.to_excel(writer, index=False, sheet_name="extract_data_info")
|
|
|
|
|
|
def batch_start_job(
|
|
pdf_folder: str,
|
|
doc_data_excel_file: str = None,
|
|
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
|
|
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
|
|
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
|
|
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
|
|
extract_way: str = "text",
|
|
special_doc_id_list: list = None,
|
|
re_run_extract_data: bool = False,
|
|
re_run_mapping_data: bool = False,
|
|
force_save_total_data: bool = False,
|
|
calculate_metrics: bool = False,
|
|
):
|
|
pdf_files = glob(pdf_folder + "*.pdf")
|
|
doc_list = []
|
|
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
|
|
doc_list = special_doc_id_list
|
|
if (
|
|
len(doc_list) == 0
|
|
and doc_data_excel_file is not None
|
|
and len(doc_data_excel_file) > 0
|
|
and os.path.exists(doc_data_excel_file)
|
|
):
|
|
doc_data_df = pd.read_excel(doc_data_excel_file)
|
|
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
|
|
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
|
|
|
|
result_extract_data_list = []
|
|
result_mapping_data_list = []
|
|
for pdf_file in tqdm(pdf_files):
|
|
pdf_base_name = os.path.basename(pdf_file)
|
|
doc_id = pdf_base_name.split(".")[0]
|
|
if doc_list is not None and doc_id not in doc_list:
|
|
continue
|
|
doc_data_from_gpt, doc_mapping_data_list = mapping_data(
|
|
doc_id=doc_id,
|
|
pdf_folder=pdf_folder,
|
|
output_extract_data_folder=output_extract_data_child_folder,
|
|
output_mapping_folder=output_mapping_child_folder,
|
|
extract_way=extract_way,
|
|
re_run_extract_data=re_run_extract_data,
|
|
re_run_mapping_data=re_run_mapping_data,
|
|
)
|
|
result_extract_data_list.extend(doc_data_from_gpt)
|
|
result_mapping_data_list.extend(doc_mapping_data_list)
|
|
|
|
if force_save_total_data or (
|
|
special_doc_id_list is None or len(special_doc_id_list) == 0
|
|
):
|
|
result_extract_data_df = pd.DataFrame(result_extract_data_list)
|
|
result_extract_data_df.reset_index(drop=True, inplace=True)
|
|
|
|
result_mappingdata_df = pd.DataFrame(result_mapping_data_list)
|
|
result_mappingdata_df.reset_index(drop=True, inplace=True)
|
|
|
|
logger.info(f"Saving extract data to {output_extract_data_total_folder}")
|
|
unique_doc_ids = result_extract_data_df["doc_id"].unique().tolist()
|
|
os.makedirs(output_extract_data_total_folder, exist_ok=True)
|
|
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
output_file = os.path.join(
|
|
output_extract_data_total_folder,
|
|
f"extract_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx",
|
|
)
|
|
with pd.ExcelWriter(output_file) as writer:
|
|
result_extract_data_df.to_excel(
|
|
writer, index=False, sheet_name="extract_data_info"
|
|
)
|
|
|
|
logger.info(f"Saving mapping data to {output_mapping_total_folder}")
|
|
unique_doc_ids = result_mappingdata_df["doc_id"].unique().tolist()
|
|
os.makedirs(output_mapping_total_folder, exist_ok=True)
|
|
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
output_file = os.path.join(
|
|
output_mapping_total_folder,
|
|
f"mapping_data_info_{len(unique_doc_ids)}_documents_by_{extract_way}_{time_stamp}.xlsx",
|
|
)
|
|
|
|
doc_mapping_data_in_db = only_output_mapping_data_in_db(result_mappingdata_df)
|
|
with pd.ExcelWriter(output_file) as writer:
|
|
doc_mapping_data_in_db.to_excel(
|
|
writer, index=False, sheet_name="data_in_doc_mapping"
|
|
)
|
|
result_mappingdata_df.to_excel(
|
|
writer, index=False, sheet_name="total_mapping_data"
|
|
)
|
|
result_extract_data_df.to_excel(
|
|
writer, index=False, sheet_name="extract_data"
|
|
)
|
|
|
|
|
|
if calculate_metrics:
|
|
prediction_sheet_name = "total_mapping_data"
|
|
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
|
|
ground_truth_sheet_name = "mapping_data"
|
|
metrics_output_folder = r"/data/emea_ar/output/metrics/"
|
|
|
|
# logger.info(f"Calculating metrics for data extraction")
|
|
# missing_error_list, metrics_list, metrics_file = get_metrics(
|
|
# "data_extraction",
|
|
# output_file,
|
|
# prediction_sheet_name,
|
|
# ground_truth_file,
|
|
# ground_truth_sheet_name,
|
|
# metrics_output_folder,
|
|
# )
|
|
|
|
# logger.info(f"Calculating metrics for investment mapping by actual document mapping")
|
|
# missing_error_list, metrics_list, metrics_file = get_metrics(
|
|
# "investment_mapping",
|
|
# output_file,
|
|
# prediction_sheet_name,
|
|
# ground_truth_file,
|
|
# ground_truth_sheet_name,
|
|
# metrics_output_folder,
|
|
# )
|
|
|
|
logger.info(f"Calculating metrics for investment mapping by database document mapping")
|
|
missing_error_list, metrics_list, metrics_file = get_metrics(
|
|
"document_mapping_in_db",
|
|
output_file,
|
|
prediction_sheet_name,
|
|
ground_truth_file,
|
|
ground_truth_sheet_name,
|
|
metrics_output_folder,
|
|
)
|
|
|
|
|
|
def only_output_mapping_data_in_db(mapping_data: pd.DataFrame) -> None:
|
|
doc_id_list = mapping_data["doc_id"].unique().tolist()
|
|
data_in_mapping_df_list = []
|
|
for doc_id in doc_id_list:
|
|
doc_mapping_data = mapping_data[mapping_data["doc_id"] == doc_id]
|
|
|
|
document_mapping = query_document_fund_mapping(doc_id, rerun=False)
|
|
fund_id_list = document_mapping["FundId"].unique().tolist()
|
|
sec_id_list = document_mapping["SecId"].unique().tolist()
|
|
id_list = fund_id_list + sec_id_list
|
|
# filter doc_mapping_data by id_list
|
|
filter_doc_mapping_data = doc_mapping_data[doc_mapping_data["investment_id"].isin(id_list)]
|
|
data_in_mapping_df_list.append(filter_doc_mapping_data)
|
|
result_mapping_data_df = pd.concat(data_in_mapping_df_list)
|
|
result_mapping_data_df.reset_index(drop=True, inplace=True)
|
|
return result_mapping_data_df
|
|
|
|
|
|
def batch_filter_pdf_files(
|
|
pdf_folder: str,
|
|
doc_data_excel_file: str = None,
|
|
output_folder: str = r"/data/emea_ar/output/filter_pages/",
|
|
special_doc_id_list: list = None,
|
|
) -> None:
|
|
pdf_files = glob(pdf_folder + "*.pdf")
|
|
doc_list = []
|
|
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
|
|
doc_list = special_doc_id_list
|
|
if (
|
|
len(doc_list) == 0
|
|
and doc_data_excel_file is not None
|
|
and len(doc_data_excel_file) > 0
|
|
and os.path.exists(doc_data_excel_file)
|
|
):
|
|
doc_data_df = pd.read_excel(doc_data_excel_file)
|
|
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
|
|
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
|
|
result_list = []
|
|
result_details = []
|
|
for pdf_file in tqdm(pdf_files):
|
|
pdf_base_name = os.path.basename(pdf_file)
|
|
doc_id = pdf_base_name.split(".")[0]
|
|
if doc_list is not None and doc_id not in doc_list:
|
|
continue
|
|
doc_datapoint_page_info, doc_result_details = filter_pages(
|
|
doc_id=doc_id, pdf_folder=pdf_folder
|
|
)
|
|
result_list.append(doc_datapoint_page_info)
|
|
result_details.extend(doc_result_details)
|
|
|
|
result_df = pd.DataFrame(result_list)
|
|
result_df.reset_index(drop=True, inplace=True)
|
|
|
|
result_details_df = pd.DataFrame(result_details)
|
|
result_details_df.reset_index(drop=True, inplace=True)
|
|
|
|
logger.info(f"Saving the result to {output_folder}")
|
|
os.makedirs(output_folder, exist_ok=True)
|
|
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
output_file = os.path.join(
|
|
output_folder,
|
|
f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx",
|
|
)
|
|
with pd.ExcelWriter(output_file) as writer:
|
|
result_df.to_excel(writer, index=False, sheet_name="dp_page_info")
|
|
result_details_df.to_excel(
|
|
writer, index=False, sheet_name="dp_page_info_details"
|
|
)
|
|
|
|
if len(special_doc_id_list) == 0:
|
|
logger.info(f"Calculating metrics for {output_file}")
|
|
metrics_output_folder = r"/data/emea_ar/output/metrics/"
|
|
missing_error_list, metrics_list, metrics_file = get_metrics(
|
|
data_type="page_filter",
|
|
prediction_file=output_file,
|
|
prediction_sheet_name="dp_page_info",
|
|
ground_truth_file=doc_data_excel_file,
|
|
output_folder=metrics_output_folder,
|
|
)
|
|
return missing_error_list, metrics_list, metrics_file
|
|
|
|
|
|
def get_metrics(
|
|
data_type: str,
|
|
prediction_file: str,
|
|
prediction_sheet_name: str,
|
|
ground_truth_file: str,
|
|
ground_truth_sheet_name: str = None,
|
|
output_folder: str = None,
|
|
) -> None:
|
|
metrics = Metrics(
|
|
data_type=data_type,
|
|
prediction_file=prediction_file,
|
|
prediction_sheet_name=prediction_sheet_name,
|
|
ground_truth_file=ground_truth_file,
|
|
ground_truth_sheet_name=ground_truth_sheet_name,
|
|
output_folder=output_folder,
|
|
)
|
|
missing_error_list, metrics_list, metrics_file = metrics.get_metrics(strict_model=False)
|
|
return missing_error_list, metrics_list, metrics_file
|
|
|
|
|
|
def test_auto_generate_instructions():
|
|
"""
|
|
doc_id: str,
|
|
pdf_file: str,
|
|
page_text_dict: dict,
|
|
datapoint_page_info: dict,
|
|
document_mapping_info_df: pd.DataFrame
|
|
"""
|
|
doc_id = "402397014"
|
|
pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf"
|
|
document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
|
|
filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df)
|
|
page_text_dict = filter_pages.page_text_dict
|
|
datapoint_page_info, datapoint_page_info_details = filter_pages.start_job()
|
|
datapoint_list = list(datapoint_page_info.keys())
|
|
datapoint_list.remove("doc_id")
|
|
|
|
data_extraction = DataExtraction(
|
|
doc_id, pdf_file, page_text_dict, datapoint_page_info, document_mapping_info_df
|
|
)
|
|
page_index_list = list(page_text_dict.keys())
|
|
if len(page_index_list) > 0:
|
|
page_text = ""
|
|
for datapoint in datapoint_list:
|
|
if len(datapoint_page_info[datapoint]) > 0:
|
|
page_index_list = datapoint_page_info[datapoint]
|
|
page_text = page_text_dict[page_index_list[0]]
|
|
break
|
|
|
|
output_folder = (
|
|
r"/data/emea_ar/basic_information/prompts_example/generate_by_config/"
|
|
)
|
|
os.makedirs(output_folder, exist_ok=True)
|
|
tor_instructions_text = data_extraction.get_instructions_by_datapoints(
|
|
page_text, ["tor"]
|
|
)
|
|
with open(
|
|
os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8"
|
|
) as f:
|
|
f.write(tor_instructions_text)
|
|
|
|
ter_instructions_text = data_extraction.get_instructions_by_datapoints(
|
|
page_text, ["ter"]
|
|
)
|
|
with open(
|
|
os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8"
|
|
) as f:
|
|
f.write(ter_instructions_text)
|
|
|
|
ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
|
|
page_text, ["ogc"]
|
|
)
|
|
with open(
|
|
os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8"
|
|
) as f:
|
|
f.write(ogc_instructions_text)
|
|
|
|
performance_fee_instructions_text = (
|
|
data_extraction.get_instructions_by_datapoints(
|
|
page_text, ["performance_fee"]
|
|
)
|
|
)
|
|
with open(
|
|
os.path.join(output_folder, "performance_fee_instructions.txt"),
|
|
"w",
|
|
encoding="utf-8",
|
|
) as f:
|
|
f.write(performance_fee_instructions_text)
|
|
|
|
ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
|
|
page_text, ["ter", "ogc"]
|
|
)
|
|
with open(
|
|
os.path.join(output_folder, "ter_ogc_instructions.txt"),
|
|
"w",
|
|
encoding="utf-8",
|
|
) as f:
|
|
f.write(ter_ogc_instructions_text)
|
|
|
|
ter_performance_fee_instructions_text = (
|
|
data_extraction.get_instructions_by_datapoints(
|
|
page_text, ["ter", "performance_fee"]
|
|
)
|
|
)
|
|
with open(
|
|
os.path.join(output_folder, "ter_performance_fee_instructions.txt"),
|
|
"w",
|
|
encoding="utf-8",
|
|
) as f:
|
|
f.write(ter_performance_fee_instructions_text)
|
|
|
|
ogc_ter_performance_fee_instructions_text = (
|
|
data_extraction.get_instructions_by_datapoints(
|
|
page_text, ["ogc", "ter", "performance_fee"]
|
|
)
|
|
)
|
|
with open(
|
|
os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"),
|
|
"w",
|
|
encoding="utf-8",
|
|
) as f:
|
|
f.write(ogc_ter_performance_fee_instructions_text)
|
|
|
|
|
|
def test_data_extraction_metrics():
|
|
data_type = "data_extraction"
|
|
# prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_image_20240920033929.xlsx"
|
|
prediction_file = r"/data/emea_ar/output/mapping_data/total/mapping_data_info_88_documents_by_text_20240922152517.xlsx"
|
|
# prediction_file = r"/data/emea_ar/output/mapping_data/docs/by_text/excel/481475385.xlsx"
|
|
prediction_sheet_name = "mapping_data"
|
|
ground_truth_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
|
|
ground_truth_sheet_name = "mapping_data"
|
|
metrics_output_folder = r"/data/emea_ar/output/metrics/"
|
|
missing_error_list, metrics_list, metrics_file = get_metrics(
|
|
data_type,
|
|
prediction_file,
|
|
prediction_sheet_name,
|
|
ground_truth_file,
|
|
ground_truth_sheet_name,
|
|
metrics_output_folder,
|
|
)
|
|
|
|
|
|
def test_mapping_raw_name():
|
|
doc_id = "382366116"
|
|
raw_name = "SPARINVEST SICAV - ETHICAL EMERGING MARKETS VALUE EUR I"
|
|
raw_share_name = "EUR I"
|
|
output_folder = r"/data/emea_ar/output/mapping_data/docs/by_text/"
|
|
data_mapping = DataMapping(
|
|
doc_id,
|
|
datapoints=None,
|
|
raw_document_data_list=None,
|
|
document_mapping_info_df=None,
|
|
output_data_folder=output_folder,
|
|
)
|
|
process_cache = {}
|
|
mapping_info = data_mapping.matching_with_database(
|
|
raw_name=raw_name,
|
|
raw_share_name=raw_share_name,
|
|
parent_id=None,
|
|
matching_type="share",
|
|
process_cache=process_cache
|
|
)
|
|
print(mapping_info)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pdf_folder = r"/data/emea_ar/pdf/"
|
|
page_filter_ground_truth_file = (
|
|
r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx"
|
|
)
|
|
prediction_output_folder = r"/data/emea_ar/output/filter_pages/"
|
|
metrics_output_folder = r"/data/emea_ar/output/metrics/"
|
|
special_doc_id_list = []
|
|
# batch_filter_pdf_files(
|
|
# pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list
|
|
# )
|
|
|
|
# data_type = "page_filter"
|
|
# prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx"
|
|
# missing_error_list, metrics_list, metrics_file = get_metrics(
|
|
# data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder
|
|
# )
|
|
|
|
# test_auto_generate_instructions()
|
|
|
|
output_extract_data_child_folder = r"/data/emea_ar/output/extract_data/docs/"
|
|
output_extract_data_total_folder = r"/data/emea_ar/output/extract_data/total/"
|
|
|
|
# batch_extract_data(
|
|
# pdf_folder,
|
|
# page_filter_ground_truth_file,
|
|
# output_extract_data_child_folder,
|
|
# output_extract_data_total_folder,
|
|
# special_doc_id_list,
|
|
# re_run,
|
|
# )
|
|
|
|
# doc_id = "476492237"
|
|
# extract_way = "image"
|
|
# extract_data(doc_id,
|
|
# pdf_folder,
|
|
# output_extract_data_child_folder,
|
|
# extract_way,
|
|
# re_run_extract_data)
|
|
|
|
# special_doc_id_list = ["505174428", "510326848", "349679479"]
|
|
# check_mapping_doc_id_list = [
|
|
# "327956364",
|
|
# "391456740",
|
|
# "391736837",
|
|
# "458359181",
|
|
# "486383912",
|
|
# "497497599",
|
|
# "529925114",
|
|
# "321733631",
|
|
# "334718372",
|
|
# "344636875",
|
|
# "362246081",
|
|
# "445256897",
|
|
# "449623976",
|
|
# "458291624",
|
|
# "478585901",
|
|
# "492121213",
|
|
# "502821436",
|
|
# "507967525",
|
|
# "481475385",
|
|
# "508854243",
|
|
# "520879048",
|
|
# "402181770",
|
|
# "463081566",
|
|
# "502693599",
|
|
# "509845549",
|
|
# "389171486",
|
|
# "323390570",
|
|
# "366179419",
|
|
# "486378555",
|
|
# "506559375",
|
|
# "479793787",
|
|
# "471641628",
|
|
# ]
|
|
check_db_mapping_doc_id_list = [
|
|
"292989214",
|
|
"316237292",
|
|
"321733631",
|
|
"323390570",
|
|
"327956364",
|
|
"332223498",
|
|
"333207452",
|
|
"334718372",
|
|
"344636875",
|
|
"362246081",
|
|
"366179419",
|
|
"380945052",
|
|
"382366116",
|
|
"387202452",
|
|
"389171486",
|
|
"391456740",
|
|
"391736837",
|
|
"394778487",
|
|
"401684600",
|
|
"402113224",
|
|
"402181770",
|
|
"402397014",
|
|
"405803396",
|
|
"445102363",
|
|
"445256897",
|
|
"448265376",
|
|
"449555622",
|
|
"449623976",
|
|
"458291624",
|
|
"458359181",
|
|
"463081566",
|
|
"469138353",
|
|
"471641628",
|
|
"476492237",
|
|
"478585901",
|
|
"478586066",
|
|
"479042264",
|
|
"479042269",
|
|
"479793787",
|
|
"481475385",
|
|
"483617247",
|
|
"486378555",
|
|
"486383912",
|
|
"492121213",
|
|
"497497599",
|
|
"502693599"
|
|
]
|
|
|
|
# check_db_mapping_doc_id_list = [
|
|
# "334584772",
|
|
# "406913630",
|
|
# "407275419",
|
|
# "337937633",
|
|
# "337293427",
|
|
# "334584772",
|
|
# "404712928",
|
|
# "451063582",
|
|
# "451878128",
|
|
# "425595958",
|
|
# "536344026",
|
|
# "532422548",
|
|
# "423418540",
|
|
# "423418395",
|
|
# "532998065",
|
|
# "540307575",
|
|
# "423395975",
|
|
# "508704368",
|
|
# "481482392",
|
|
# "466580448",
|
|
# "423365707",
|
|
# "423364758",
|
|
# "422761666",
|
|
# "422760156",
|
|
# "422760148",
|
|
# "422686965",
|
|
# "492029971",
|
|
# "510300817",
|
|
# "512745032",
|
|
# "514213638",
|
|
# "527525440",
|
|
# "534535767"
|
|
# ]
|
|
# special_doc_id_list = check_mapping_doc_id_list
|
|
special_doc_id_list = check_db_mapping_doc_id_list
|
|
# special_doc_id_list = ["394778487"]
|
|
output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/"
|
|
output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/"
|
|
re_run_extract_data = False
|
|
re_run_mapping_data = True
|
|
force_save_total_data = True
|
|
calculate_metrics = True
|
|
|
|
extract_ways = ["text"]
|
|
pdf_folder = r"/data/emea_ar/small_pdf/"
|
|
# pdf_folder = r"/data/emea_ar/pdf/"
|
|
for extract_way in extract_ways:
|
|
batch_start_job(
|
|
pdf_folder,
|
|
page_filter_ground_truth_file,
|
|
output_extract_data_child_folder,
|
|
output_mapping_child_folder,
|
|
output_extract_data_total_folder,
|
|
output_mapping_total_folder,
|
|
extract_way,
|
|
special_doc_id_list,
|
|
re_run_extract_data,
|
|
re_run_mapping_data,
|
|
force_save_total_data=force_save_total_data,
|
|
calculate_metrics=calculate_metrics,
|
|
)
|
|
|
|
# test_data_extraction_metrics()
|
|
# test_mapping_raw_name()
|