dc-ml-emea-ar/main.py

523 lines
20 KiB
Python

import os
import json
import pandas as pd
from glob import glob
from tqdm import tqdm
import time
from utils.logger import logger
from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.sql_query_util import query_document_fund_mapping
from core.page_filter import FilterPages
from core.data_extraction import DataExtraction
from core.data_mapping import DataMapping
from core.metrics import Metrics
class EMEA_AR_Parsing:
def __init__(
self,
doc_id: str,
pdf_folder: str = r"/data/emea_ar/pdf/",
output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_data_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
) -> None:
self.doc_id = doc_id
self.pdf_folder = pdf_folder
os.makedirs(self.pdf_folder, exist_ok=True)
self.pdf_file = self.download_pdf()
self.document_mapping_info_df = query_document_fund_mapping(doc_id)
if output_extract_data_folder is None or len(output_extract_data_folder) == 0:
output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/"
self.output_extract_data_folder = output_extract_data_folder
os.makedirs(self.output_extract_data_folder, exist_ok=True)
if output_mapping_data_folder is None or len(output_mapping_data_folder) == 0:
output_mapping_data_folder = r"/data/emea_ar/output/mapping_data/docs/"
self.output_mapping_data_folder = output_mapping_data_folder
os.makedirs(self.output_mapping_data_folder, exist_ok=True)
self.filter_pages = FilterPages(
self.doc_id, self.pdf_file, self.document_mapping_info_df
)
self.page_text_dict = self.filter_pages.page_text_dict
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
self.datapoints = self.get_datapoints_from_datapoint_page_info()
def download_pdf(self) -> str:
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
return pdf_file
def get_datapoint_page_info(self) -> tuple:
datapoint_page_info, result_details = self.filter_pages.start_job()
return datapoint_page_info, result_details
def get_datapoints_from_datapoint_page_info(self) -> list:
datapoints = list(self.datapoint_page_info.keys())
if "doc_id" in datapoints:
datapoints.remove("doc_id")
return datapoints
def extract_data(self, re_run: bool = False) -> list:
if not re_run:
output_data_json_folder = os.path.join(
self.output_extract_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The document: {self.doc_id} has been parsed, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
data_from_gpt = json.load(f)
return data_from_gpt
data_extraction = DataExtraction(
self.doc_id,
self.pdf_file,
self.output_extract_data_folder,
self.page_text_dict,
self.datapoint_page_info,
self.datapoints,
self.document_mapping_info_df,
)
data_from_gpt = data_extraction.extract_data()
return data_from_gpt
def mapping_data(self, data_from_gpt: list, re_run: bool = False) -> list:
if not re_run:
output_data_json_folder = os.path.join(
self.output_mapping_data_folder, "json/"
)
os.makedirs(output_data_json_folder, exist_ok=True)
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
if os.path.exists(json_file):
logger.info(
f"The fund/ share of this document: {self.doc_id} has been mapped, loading data from {json_file}"
)
with open(json_file, "r", encoding="utf-8") as f:
doc_mapping_data = json.load(f)
return doc_mapping_data
"""
doc_id,
datapoints: list,
raw_document_data_list: list,
document_mapping_info_df: pd.DataFrame,
output_data_folder: str,
"""
data_mapping = DataMapping(
self.doc_id,
self.datapoints,
data_from_gpt,
self.document_mapping_info_df,
self.output_mapping_data_folder,
)
return data_mapping.mapping_raw_data()
def filter_pages(doc_id: str, pdf_folder: str) -> None:
logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(doc_id, pdf_folder)
datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info()
return datapoint_page_info, result_details
def extract_data(
doc_id: str, pdf_folder: str, output_data_folder: str, re_run: bool = False
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id, pdf_folder, output_extract_data_folder=output_data_folder
)
data_from_gpt = emea_ar_parsing.extract_data(re_run)
return data_from_gpt
def mapping_data(
doc_id: str,
pdf_folder: str,
output_extract_data_folder: str,
output_mapping_folder: str,
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
) -> None:
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
pdf_folder,
output_extract_data_folder=output_extract_data_folder,
output_mapping_data_folder=output_mapping_folder,
)
doc_data_from_gpt = emea_ar_parsing.extract_data(re_run=re_run_extract_data)
doc_mapping_data = emea_ar_parsing.mapping_data(
data_from_gpt=doc_data_from_gpt, re_run=re_run_mapping_data
)
return doc_data_from_gpt, doc_mapping_data
def batch_extract_data(
pdf_folder: str,
doc_data_excel_file: str = None,
output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
special_doc_id_list: list = None,
re_run: bool = False,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
data_from_gpt = extract_data(
doc_id=doc_id,
pdf_folder=pdf_folder,
output_data_folder=output_child_folder,
re_run=re_run,
)
result_list.extend(data_from_gpt)
if special_doc_id_list is None or len(special_doc_id_list) == 0:
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_total_folder}")
os.makedirs(output_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_total_folder,
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_df.to_excel(writer, index=False, sheet_name="extract_data_info")
def batch_start_job(
pdf_folder: str,
doc_data_excel_file: str = None,
output_extract_data_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
output_mapping_child_folder: str = r"/data/emea_ar/output/mapping_data/docs/",
output_extract_data_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
output_mapping_total_folder: str = r"/data/emea_ar/output/mapping_data/total/",
special_doc_id_list: list = None,
re_run_extract_data: bool = False,
re_run_mapping_data: bool = False,
):
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_extract_data_list = []
result_mapping_data_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
doc_data_from_gpt, doc_mapping_data_list = mapping_data(
doc_id=doc_id,
pdf_folder=pdf_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_folder=output_mapping_child_folder,
re_run_extract_data=re_run_extract_data,
re_run_mapping_data=re_run_mapping_data,
)
result_extract_data_list.extend(doc_data_from_gpt)
result_mapping_data_list.extend(doc_mapping_data_list)
if special_doc_id_list is None or len(special_doc_id_list) == 0:
result_extract_data_df = pd.DataFrame(result_extract_data_list)
result_extract_data_df.reset_index(drop=True, inplace=True)
result_mappingdata_df = pd.DataFrame(result_mapping_data_list)
result_mappingdata_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving extract data to {output_extract_data_total_folder}")
os.makedirs(output_extract_data_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_extract_data_total_folder,
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data_info"
)
logger.info(f"Saving mapping data to {output_mapping_total_folder}")
os.makedirs(output_mapping_total_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_mapping_total_folder,
f"mapping_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_mappingdata_df.to_excel(
writer, index=False, sheet_name="mapping_data"
)
result_extract_data_df.to_excel(
writer, index=False, sheet_name="extract_data"
)
def batch_filter_pdf_files(
pdf_folder: str,
doc_data_excel_file: str = None,
output_folder: str = r"/data/emea_ar/output/filter_pages/",
special_doc_id_list: list = None,
) -> None:
pdf_files = glob(pdf_folder + "*.pdf")
doc_list = []
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
doc_list = special_doc_id_list
if (
len(doc_list) == 0
and doc_data_excel_file is not None
and len(doc_data_excel_file) > 0
and os.path.exists(doc_data_excel_file)
):
doc_data_df = pd.read_excel(doc_data_excel_file)
doc_data_df = doc_data_df[doc_data_df["Checked"] == 1]
doc_list = [str(doc_id) for doc_id in doc_data_df["doc_id"].tolist()]
result_list = []
result_details = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file)
doc_id = pdf_base_name.split(".")[0]
if doc_list is not None and doc_id not in doc_list:
continue
doc_datapoint_page_info, doc_result_details = filter_pages(
doc_id=doc_id, pdf_folder=pdf_folder
)
result_list.append(doc_datapoint_page_info)
result_details.extend(doc_result_details)
result_df = pd.DataFrame(result_list)
result_df.reset_index(drop=True, inplace=True)
result_details_df = pd.DataFrame(result_details)
result_details_df.reset_index(drop=True, inplace=True)
logger.info(f"Saving the result to {output_folder}")
os.makedirs(output_folder, exist_ok=True)
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
output_file = os.path.join(
output_folder,
f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx",
)
with pd.ExcelWriter(output_file) as writer:
result_df.to_excel(writer, index=False, sheet_name="dp_page_info")
result_details_df.to_excel(
writer, index=False, sheet_name="dp_page_info_details"
)
if len(special_doc_id_list) == 0:
logger.info(f"Calculating metrics for {output_file}")
metrics_output_folder = r"/data/emea_ar/output/metrics/"
missing_error_list, metrics_list, metrics_file = get_metrics(
data_type="page_filter",
prediction_file=output_file,
prediction_sheet_name="dp_page_info",
ground_truth_file=doc_data_excel_file,
output_folder=metrics_output_folder,
)
return missing_error_list, metrics_list, metrics_file
def get_metrics(
data_type: str,
prediction_file: str,
prediction_sheet_name: str,
ground_truth_file: str,
output_folder: str = None,
) -> None:
metrics = Metrics(
data_type=data_type,
prediction_file=prediction_file,
prediction_sheet_name=prediction_sheet_name,
ground_truth_file=ground_truth_file,
output_folder=output_folder,
)
missing_error_list, metrics_list, metrics_file = metrics.get_metrics()
return missing_error_list, metrics_list, metrics_file
def test_auto_generate_instructions():
"""
doc_id: str,
pdf_file: str,
page_text_dict: dict,
datapoint_page_info: dict,
document_mapping_info_df: pd.DataFrame
"""
doc_id = "402397014"
pdf_file = f"/data/emea_ar/small_pdf/{doc_id}.pdf"
document_mapping_info_df = query_document_fund_mapping(doc_id)
filter_pages = FilterPages(doc_id, pdf_file, document_mapping_info_df)
page_text_dict = filter_pages.page_text_dict
datapoint_page_info, datapoint_page_info_details = filter_pages.start_job()
datapoint_list = list(datapoint_page_info.keys())
datapoint_list.remove("doc_id")
data_extraction = DataExtraction(
doc_id, pdf_file, page_text_dict, datapoint_page_info, document_mapping_info_df
)
page_index_list = list(page_text_dict.keys())
if len(page_index_list) > 0:
page_text = ""
for datapoint in datapoint_list:
if len(datapoint_page_info[datapoint]) > 0:
page_index_list = datapoint_page_info[datapoint]
page_text = page_text_dict[page_index_list[0]]
break
output_folder = (
r"/data/emea_ar/basic_information/prompts_example/generate_by_config/"
)
os.makedirs(output_folder, exist_ok=True)
tor_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["tor"]
)
with open(
os.path.join(output_folder, "tor_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(tor_instructions_text)
ter_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter"]
)
with open(
os.path.join(output_folder, "ter_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ter_instructions_text)
ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ogc"]
)
with open(
os.path.join(output_folder, "ogc_instructions.txt"), "w", encoding="utf-8"
) as f:
f.write(ogc_instructions_text)
performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["performance_fee"]
)
)
with open(
os.path.join(output_folder, "performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(performance_fee_instructions_text)
ter_ogc_instructions_text = data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "ogc"]
)
with open(
os.path.join(output_folder, "ter_ogc_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_ogc_instructions_text)
ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ter_performance_fee_instructions_text)
ogc_ter_performance_fee_instructions_text = (
data_extraction.get_instructions_by_datapoints(
page_text, ["ogc", "ter", "performance_fee"]
)
)
with open(
os.path.join(output_folder, "ogc_ter_performance_fee_instructions.txt"),
"w",
encoding="utf-8",
) as f:
f.write(ogc_ter_performance_fee_instructions_text)
if __name__ == "__main__":
pdf_folder = r"/data/emea_ar/small_pdf/"
page_filter_ground_truth_file = (
r"/data/emea_ar/ground_truth/page_filter/datapoint_page_info_88_documents.xlsx"
)
prediction_output_folder = r"/data/emea_ar/output/filter_pages/"
metrics_output_folder = r"/data/emea_ar/output/metrics/"
special_doc_id_list = []
# batch_filter_pdf_files(
# pdf_folder, page_filter_ground_truth_file, prediction_output_folder, special_doc_id_list
# )
# data_type = "page_filter"
# prediction_file = r"/data/emea_ar/output/filter_pages/datapoint_page_info_73_documents_20240903145002.xlsx"
# missing_error_list, metrics_list, metrics_file = get_metrics(
# data_type, prediction_file, page_filter_ground_truth_file, metrics_output_folder
# )
# test_auto_generate_instructions()
output_extract_data_child_folder = r"/data/emea_ar/output/extract_data/docs/"
output_extract_data_total_folder = r"/data/emea_ar/output/extract_data/total/"
re_run_extract_data = True
# batch_extract_data(
# pdf_folder,
# page_filter_ground_truth_file,
# output_extract_data_child_folder,
# output_extract_data_total_folder,
# special_doc_id_list,
# re_run,
# )
# doc_id = "476492237"
# extract_data(doc_id, pdf_folder, output_extract_data_child_folder, re_run)
special_doc_id_list = ["508854243"]
output_mapping_child_folder = r"/data/emea_ar/output/mapping_data/docs/"
output_mapping_total_folder = r"/data/emea_ar/output/mapping_data/total/"
re_run_mapping_data = True
batch_start_job(
pdf_folder,
page_filter_ground_truth_file,
output_extract_data_child_folder,
output_mapping_child_folder,
output_extract_data_total_folder,
output_mapping_total_folder,
special_doc_id_list,
re_run_extract_data,
re_run_mapping_data,
)