399 lines
15 KiB
Python
399 lines
15 KiB
Python
import os
|
|
import json
|
|
import numpy as np
|
|
import pandas as pd
|
|
from glob import glob
|
|
from tqdm import tqdm
|
|
import time
|
|
import fitz
|
|
import re
|
|
from io import BytesIO
|
|
from traceback import print_exc
|
|
from utils.logger import logger
|
|
from utils.pdf_download import download_pdf_from_documents_warehouse
|
|
from utils.sql_query_util import query_document_fund_mapping
|
|
from utils.pdf_util import PDFUtil
|
|
from utils.biz_utils import add_slash_to_text_as_regex
|
|
from core.page_filter import FilterPages
|
|
from core.data_extraction import DataExtraction
|
|
from core.data_mapping import DataMapping
|
|
from core.auz_nz.hybrid_solution_script import api_for_fund_matching_call
|
|
from core.metrics import Metrics
|
|
import certifi
|
|
|
|
class EMEA_AR_Parsing:
|
|
def __init__(
|
|
self,
|
|
doc_id: str,
|
|
doc_source: str = "emea_ar",
|
|
pdf_folder: str = r"/data/emea_ar/pdf/",
|
|
output_pdf_text_folder: str = r"/data/emea_ar/output/pdf_text/",
|
|
output_extract_data_folder: str = r"/data/emea_ar/output/extract_data/docs/",
|
|
extract_way: str = "text",
|
|
drilldown_folder: str = r"/data/emea_ar/output/drilldown/",
|
|
text_model: str = "qwen-plus",
|
|
image_model: str = "qwen-vl-plus",
|
|
) -> None:
|
|
self.doc_id = doc_id
|
|
self.doc_source = doc_source
|
|
self.pdf_folder = pdf_folder
|
|
os.makedirs(self.pdf_folder, exist_ok=True)
|
|
|
|
self.pdf_file = self.download_pdf()
|
|
|
|
if extract_way is None or len(extract_way) == 0:
|
|
extract_way = "text"
|
|
self.extract_way = extract_way
|
|
self.output_extract_image_folder = None
|
|
if self.extract_way == "image":
|
|
self.output_extract_image_folder = (
|
|
r"/data/emea_ar/output/extract_data/images/"
|
|
)
|
|
os.makedirs(self.output_extract_image_folder, exist_ok=True)
|
|
|
|
if output_extract_data_folder is None or len(output_extract_data_folder) == 0:
|
|
output_extract_data_folder = r"/data/emea_ar/output/extract_data/docs/"
|
|
if not output_extract_data_folder.endswith("/"):
|
|
output_extract_data_folder = f"{output_extract_data_folder}/"
|
|
if extract_way is not None and len(extract_way) > 0:
|
|
output_extract_data_folder = (
|
|
f"{output_extract_data_folder}by_{extract_way}/"
|
|
)
|
|
self.output_extract_data_folder = output_extract_data_folder
|
|
os.makedirs(self.output_extract_data_folder, exist_ok=True)
|
|
|
|
self.filter_pages = FilterPages(
|
|
self.doc_id,
|
|
self.pdf_file,
|
|
self.doc_source,
|
|
output_pdf_text_folder,
|
|
)
|
|
self.page_text_dict = self.filter_pages.page_text_dict
|
|
|
|
self.datapoint_page_info, self.result_details = self.get_datapoint_page_info()
|
|
self.datapoints = self.get_datapoints_from_datapoint_page_info()
|
|
|
|
if drilldown_folder is None or len(drilldown_folder) == 0:
|
|
drilldown_folder = r"/data/emea_ar/output/drilldown/"
|
|
os.makedirs(drilldown_folder, exist_ok=True)
|
|
self.drilldown_folder = drilldown_folder
|
|
misc_config_file = os.path.join(
|
|
f"./configuration/{doc_source}/", "misc_config.json"
|
|
)
|
|
if os.path.exists(misc_config_file):
|
|
with open(misc_config_file, "r", encoding="utf-8") as f:
|
|
misc_config = json.load(f)
|
|
self.apply_drilldown = misc_config.get("apply_drilldown", False)
|
|
else:
|
|
self.apply_drilldown = False
|
|
self.text_model = text_model
|
|
self.image_model = image_model
|
|
|
|
def download_pdf(self) -> str:
|
|
pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id)
|
|
return pdf_file
|
|
|
|
def get_datapoint_page_info(self) -> tuple:
|
|
datapoint_page_info, result_details = self.filter_pages.start_job()
|
|
return datapoint_page_info, result_details
|
|
|
|
def get_datapoints_from_datapoint_page_info(self) -> list:
|
|
datapoints = list(self.datapoint_page_info.keys())
|
|
if "doc_id" in datapoints:
|
|
datapoints.remove("doc_id")
|
|
return datapoints
|
|
|
|
def extract_data(
|
|
self,
|
|
re_run: bool = False,
|
|
) -> list:
|
|
found_data = False
|
|
if not re_run:
|
|
output_data_json_folder = os.path.join(
|
|
self.output_extract_data_folder, "json/"
|
|
)
|
|
os.makedirs(output_data_json_folder, exist_ok=True)
|
|
json_file = os.path.join(output_data_json_folder, f"{self.doc_id}.json")
|
|
if os.path.exists(json_file):
|
|
logger.info(
|
|
f"The document: {self.doc_id} has been parsed, loading data from {json_file}"
|
|
)
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
data_from_gpt = json.load(f)
|
|
found_data = True
|
|
|
|
if not found_data:
|
|
try:
|
|
data_extraction = DataExtraction(
|
|
self.doc_source,
|
|
self.doc_id,
|
|
self.pdf_file,
|
|
self.output_extract_data_folder,
|
|
self.page_text_dict,
|
|
self.datapoint_page_info,
|
|
self.datapoints,
|
|
extract_way=self.extract_way,
|
|
output_image_folder=self.output_extract_image_folder,
|
|
text_model=self.text_model,
|
|
image_model=self.image_model,
|
|
)
|
|
data_from_gpt = data_extraction.extract_data()
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
print_exc()
|
|
data_from_gpt = {"data": []}
|
|
|
|
# Drilldown data to relevant PDF document
|
|
annotation_list = []
|
|
if self.apply_drilldown:
|
|
try:
|
|
annotation_list = self.drilldown_pdf_document(data_from_gpt)
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return data_from_gpt, annotation_list
|
|
|
|
def drilldown_pdf_document(self, data_from_gpt: list) -> list:
|
|
logger.info(f"Drilldown PDF document for doc_id: {self.doc_id}")
|
|
pdf_util = PDFUtil(self.pdf_file)
|
|
drilldown_data_list = []
|
|
for data in data_from_gpt:
|
|
doc_id = str(data.get("doc_id", ""))
|
|
page_index = data.get("page_index", -1)
|
|
if page_index == -1:
|
|
continue
|
|
extract_data_list = data.get("extract_data", {}).get("data", [])
|
|
dp_reported_name_dict = data.get("extract_data", {}).get(
|
|
"dp_reported_name", {}
|
|
)
|
|
highlighted_value_list = []
|
|
for extract_data in extract_data_list:
|
|
for data_point, value in extract_data.items():
|
|
if value in highlighted_value_list:
|
|
continue
|
|
if data_point in ["ter", "ogc", "performance_fee"]:
|
|
continue
|
|
drilldown_data = {
|
|
"doc_id": doc_id,
|
|
"page_index": page_index,
|
|
"data_point": data_point,
|
|
"parent_text_block": None,
|
|
"value": value,
|
|
"annotation_attribute": {},
|
|
}
|
|
drilldown_data_list.append(drilldown_data)
|
|
highlighted_value_list.append(value)
|
|
|
|
for data_point, reported_name in dp_reported_name_dict.items():
|
|
if reported_name in highlighted_value_list:
|
|
continue
|
|
data_point = f"{data_point}_reported_name"
|
|
drilldown_data = {
|
|
"doc_id": doc_id,
|
|
"page_index": page_index,
|
|
"data_point": data_point,
|
|
"parent_text_block": None,
|
|
"value": reported_name,
|
|
"annotation_attribute": {},
|
|
}
|
|
drilldown_data_list.append(drilldown_data)
|
|
highlighted_value_list.append(reported_name)
|
|
|
|
drilldown_result = pdf_util.batch_drilldown(
|
|
drilldown_data_list=drilldown_data_list,
|
|
output_pdf_folder=self.drilldown_folder,
|
|
)
|
|
annotation_list = []
|
|
if len(drilldown_result) > 0:
|
|
logger.info(f"Drilldown PDF document for doc_id: {doc_id} successfully")
|
|
annotation_list = drilldown_result.get("annotation_list", [])
|
|
for annotation in annotation_list:
|
|
annotation["doc_id"] = doc_id
|
|
if self.drilldown_folder is not None and len(self.drilldown_folder) > 0:
|
|
drilldown_data_folder = os.path.join(self.drilldown_folder, "data/")
|
|
os.makedirs(drilldown_data_folder, exist_ok=True)
|
|
drilldown_file = os.path.join(
|
|
drilldown_data_folder, f"{doc_id}_drilldown.xlsx"
|
|
)
|
|
|
|
drilldown_source_df = pd.DataFrame(drilldown_data_list)
|
|
annotation_list_df = pd.DataFrame(annotation_list)
|
|
# set drilldown_result_df column order as doc_id, pdf_file, page_index,
|
|
# data_point, value, matching_val_area, normalized_bbox
|
|
try:
|
|
annotation_list_df = annotation_list_df[
|
|
[
|
|
"doc_id",
|
|
"pdf_file",
|
|
"page_index",
|
|
"data_point",
|
|
"value",
|
|
"matching_val_area",
|
|
"normalized_bbox",
|
|
]
|
|
]
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
logger.info(f"Writing drilldown data to {drilldown_file}")
|
|
try:
|
|
with pd.ExcelWriter(drilldown_file) as writer:
|
|
drilldown_source_df.to_excel(
|
|
writer, index=False, sheet_name="source_data"
|
|
)
|
|
annotation_list_df.to_excel(
|
|
writer, index=False, sheet_name="drilldown_data"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
annotation_list = annotation_list_df.to_dict(orient="records")
|
|
try:
|
|
drilldown_json_file = os.path.join(
|
|
drilldown_data_folder, f"{doc_id}_drilldown.json"
|
|
)
|
|
with open(drilldown_json_file, "w", encoding="utf-8") as f:
|
|
json.dump(annotation_list, f, ensure_ascii=False, indent=4)
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return annotation_list
|
|
|
|
|
|
def filter_pages(doc_id: str, pdf_folder: str, doc_source: str) -> None:
|
|
logger.info(f"Filter EMEA AR PDF pages for doc_id: {doc_id}")
|
|
emea_ar_parsing = EMEA_AR_Parsing(
|
|
doc_id, doc_source=doc_source, pdf_folder=pdf_folder
|
|
)
|
|
datapoint_page_info, result_details = emea_ar_parsing.get_datapoint_page_info()
|
|
return datapoint_page_info, result_details
|
|
|
|
|
|
def extract_data(
|
|
doc_id: str,
|
|
doc_source: str,
|
|
pdf_folder: str,
|
|
output_data_folder: str,
|
|
extract_way: str = "text",
|
|
re_run: bool = False,
|
|
text_model: str = "qwen-plus",
|
|
image_model: str = "qwen-vl-plus",
|
|
) -> None:
|
|
logger.info(f"Extract EMEA AR data for doc_id: {doc_id}")
|
|
emea_ar_parsing = EMEA_AR_Parsing(
|
|
doc_id,
|
|
doc_source=doc_source,
|
|
pdf_folder=pdf_folder,
|
|
output_extract_data_folder=output_data_folder,
|
|
extract_way=extract_way,
|
|
text_model=text_model,
|
|
image_model=image_model,
|
|
)
|
|
data_from_gpt, annotation_list = emea_ar_parsing.extract_data(re_run)
|
|
return data_from_gpt, annotation_list
|
|
|
|
|
|
def batch_extract_data(
|
|
pdf_folder: str,
|
|
doc_source: str = "emea_ar",
|
|
output_child_folder: str = r"/data/emea_ar/output/extract_data/docs/",
|
|
output_total_folder: str = r"/data/emea_ar/output/extract_data/total/",
|
|
extract_way: str = "text",
|
|
special_doc_id_list: list = None,
|
|
re_run: bool = False,
|
|
text_model: str = "qwen-plus",
|
|
image_model: str = "qwen-vl-plus",
|
|
) -> None:
|
|
pdf_files = glob(pdf_folder + "*.pdf")
|
|
doc_list = []
|
|
if special_doc_id_list is not None and len(special_doc_id_list) > 0:
|
|
doc_list = special_doc_id_list
|
|
|
|
if len(doc_list) == 0:
|
|
logger.info(f"No special doc_id list provided, extracting all documents in {pdf_folder}")
|
|
return
|
|
|
|
result_list = []
|
|
for pdf_file in tqdm(pdf_files):
|
|
pdf_base_name = os.path.basename(pdf_file)
|
|
doc_id = pdf_base_name.split(".")[0]
|
|
if doc_list is not None and doc_id not in doc_list:
|
|
continue
|
|
data_from_gpt = extract_data(
|
|
doc_id=doc_id,
|
|
doc_source=doc_source,
|
|
pdf_folder=pdf_folder,
|
|
output_data_folder=output_child_folder,
|
|
extract_way=extract_way,
|
|
re_run=re_run,
|
|
text_model=text_model,
|
|
image_model=image_model,
|
|
)
|
|
result_list.extend(data_from_gpt)
|
|
|
|
if special_doc_id_list is None or len(special_doc_id_list) == 0:
|
|
result_df = pd.DataFrame(result_list)
|
|
result_df.reset_index(drop=True, inplace=True)
|
|
|
|
logger.info(f"Saving the result to {output_total_folder}")
|
|
os.makedirs(output_total_folder, exist_ok=True)
|
|
time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
|
|
output_file = os.path.join(
|
|
output_total_folder,
|
|
f"extract_data_info_{len(pdf_files)}_documents_{time_stamp}.xlsx",
|
|
)
|
|
with pd.ExcelWriter(output_file) as writer:
|
|
result_df.to_excel(writer, index=False, sheet_name="extract_data_info")
|
|
|
|
|
|
def test_translate_pdf():
|
|
from core.data_translate import Translate_PDF
|
|
|
|
pdf_file = r"/data/emea_ar/pdf/451063582.pdf"
|
|
output_folder = r"/data/translate/output/"
|
|
translate_pdf = Translate_PDF(pdf_file, output_folder)
|
|
translate_pdf.start_job()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
os.environ["SSL_CERT_FILE"] = certifi.where()
|
|
|
|
# doc_source = "aus_prospectus"
|
|
doc_source = "emea_ar"
|
|
re_run = True
|
|
extract_way = "text"
|
|
if doc_source == "aus_prospectus":
|
|
special_doc_id_list = ["412778803", "539266874"]
|
|
pdf_folder: str = r"./data/aus_prospectus/pdf/"
|
|
output_pdf_text_folder: str = r"./data/aus_prospectus/output/pdf_text/"
|
|
output_child_folder: str = (
|
|
r"./data/aus_prospectus/output/extract_data/docs/"
|
|
)
|
|
output_total_folder: str = (
|
|
r"./data/aus_prospectus/output/extract_data/total/"
|
|
)
|
|
elif doc_source == "emea_ar":
|
|
special_doc_id_list = ["514636993"]
|
|
pdf_folder: str = r"./data/emea_ar/pdf/"
|
|
output_child_folder: str = (
|
|
r"./data/emea_ar/output/extract_data/docs/"
|
|
)
|
|
output_total_folder: str = (
|
|
r"./data/emea_ar/output/extract_data/total/"
|
|
)
|
|
else:
|
|
raise ValueError(f"Invalid doc_source: {doc_source}")
|
|
|
|
# text_model = "qwen-plus"
|
|
text_model = "qwen-max"
|
|
image_model = "qwen-vl-plus"
|
|
batch_extract_data(
|
|
pdf_folder=pdf_folder,
|
|
doc_source=doc_source,
|
|
output_child_folder=output_child_folder,
|
|
output_total_folder=output_total_folder,
|
|
extract_way=extract_way,
|
|
special_doc_id_list=special_doc_id_list,
|
|
re_run=re_run,
|
|
text_model=text_model,
|
|
image_model=image_model,
|
|
)
|
|
|
|
|