2313 lines
116 KiB
Python
2313 lines
116 KiB
Python
import os
|
||
import json
|
||
import json_repair
|
||
import re
|
||
import fitz
|
||
import pandas as pd
|
||
from traceback import print_exc
|
||
from utils.gpt_utils import chat
|
||
from utils.pdf_util import PDFUtil
|
||
from utils.sql_query_util import query_document_fund_mapping, query_investment_by_provider
|
||
from utils.logger import logger
|
||
from utils.biz_utils import add_slash_to_text_as_regex, clean_text, \
|
||
get_most_similar_name, remove_abundant_data, replace_special_table_header
|
||
from utils.similarity import Similarity
|
||
|
||
class DataExtraction:
|
||
def __init__(
|
||
self,
|
||
doc_source: str,
|
||
doc_id: str,
|
||
pdf_file: str,
|
||
output_data_folder: str,
|
||
page_text_dict: dict,
|
||
datapoint_page_info: dict,
|
||
datapoints: list,
|
||
document_mapping_info_df: pd.DataFrame,
|
||
extract_way: str = "text",
|
||
output_image_folder: str = None,
|
||
) -> None:
|
||
self.doc_source = doc_source
|
||
self.doc_id = doc_id
|
||
self.pdf_file = pdf_file
|
||
self.configuration_folder = f"./configuration/{doc_source}/"
|
||
self.instruction_folder = f"./instructions/{doc_source}/"
|
||
if output_data_folder is None or len(output_data_folder) == 0:
|
||
output_data_folder = r"/data/emea_ar/output/extract_data/docs/"
|
||
os.makedirs(output_data_folder, exist_ok=True)
|
||
|
||
self.output_data_json_folder = os.path.join(output_data_folder, "json/")
|
||
os.makedirs(self.output_data_json_folder, exist_ok=True)
|
||
|
||
self.output_data_excel_folder = os.path.join(output_data_folder, "excel/")
|
||
os.makedirs(self.output_data_excel_folder, exist_ok=True)
|
||
|
||
if page_text_dict is None or len(page_text_dict.keys()) == 0:
|
||
self.page_text_dict = self.get_pdf_page_text_dict()
|
||
else:
|
||
self.page_text_dict = page_text_dict
|
||
if document_mapping_info_df is None or len(document_mapping_info_df) == 0:
|
||
self.document_mapping_info_df = query_document_fund_mapping(doc_id, rerun=False)
|
||
else:
|
||
self.document_mapping_info_df = document_mapping_info_df
|
||
|
||
self.fund_name_list = self.document_mapping_info_df["FundName"].unique().tolist()
|
||
|
||
# get document type by DocumentType in self.document_mapping_info_df
|
||
self.document_type = int(self.document_mapping_info_df["DocumentType"].iloc[0])
|
||
self.investment_objective_pages = []
|
||
if self.document_type == 1:
|
||
self.investment_objective_pages = self.get_investment_objective_pages()
|
||
|
||
self.provider_mapping_df = self.get_provider_mapping()
|
||
if len(self.provider_mapping_df) == 0:
|
||
self.provider_fund_name_list = []
|
||
else:
|
||
self.provider_fund_name_list = (
|
||
self.provider_mapping_df["FundName"].unique().tolist()
|
||
)
|
||
self.document_category, self.document_production = self.get_document_category_production()
|
||
self.datapoint_page_info = self.get_datapoint_page_info(datapoint_page_info)
|
||
self.page_nums_with_datapoints = self.get_page_nums_from_datapoint_page_info()
|
||
self.datapoints = datapoints
|
||
self.instructions_config = self.get_instructions_config()
|
||
self.datapoint_level_config = self.get_datapoint_level()
|
||
self.datapoint_type_config = self.get_datapoint_type()
|
||
self.datapoint_name_config = self.get_datapoint_name()
|
||
self.replace_table_header_config = self.get_replace_table_header_config()
|
||
self.special_datapoint_feature_config = self.get_special_datapoint_feature_config()
|
||
self.special_datapoint_feature = self.init_special_datapoint_feature()
|
||
|
||
self.datapoint_reported_name_config, self.non_english_reported_name_config = \
|
||
self.get_datapoint_reported_name()
|
||
self.extract_way = extract_way
|
||
self.output_image_folder = output_image_folder
|
||
|
||
def get_special_datapoint_feature_config(self) -> dict:
|
||
special_datapoint_feature_config_file = os.path.join(self.configuration_folder, "special_datapoint_feature.json")
|
||
if not os.path.exists(special_datapoint_feature_config_file):
|
||
return {}
|
||
special_datapoint_feature_config = {}
|
||
with open(special_datapoint_feature_config_file, "r", encoding="utf-8") as f:
|
||
special_datapoint_feature_config = json.load(f)
|
||
|
||
return special_datapoint_feature_config
|
||
|
||
def init_special_datapoint_feature(self) -> dict:
|
||
special_datapoint_feature = {}
|
||
if self.special_datapoint_feature_config is None or \
|
||
len(list(self.special_datapoint_feature_config.keys())) == 0:
|
||
return special_datapoint_feature
|
||
for feature in list(self.special_datapoint_feature_config.keys()):
|
||
special_datapoint_feature[feature] = {"page_index": []}
|
||
return special_datapoint_feature
|
||
|
||
def get_document_category_production(self):
|
||
document_category = None
|
||
document_production = None
|
||
if self.doc_source == "aus_prospectus":
|
||
first_4_page_text = ""
|
||
for page_index, page_text in self.page_text_dict.items():
|
||
if page_index > 3:
|
||
break
|
||
first_4_page_text += page_text + "\n"
|
||
first_4_page_text = clean_text(first_4_page_text)
|
||
document_category_prompt_file = os.path.join(self.instruction_folder, "document_category_prompts.json")
|
||
with open(document_category_prompt_file, "r", encoding="utf-8") as f:
|
||
document_category_prompt = "\n".join(json.load(f).get("prompts", []))
|
||
if len(document_category_prompt) > 0:
|
||
prompts = f"Context: \n{first_4_page_text}\n\Instructions: \n{document_category_prompt}"
|
||
result, with_error = chat(
|
||
prompt=prompts, response_format={"type": "json_object"}, max_tokens=1000
|
||
)
|
||
response = result.get("response", "")
|
||
if not with_error:
|
||
try:
|
||
data = json.loads(response)
|
||
document_category = data.get("document_category", None)
|
||
document_production = data.get("document_production", None)
|
||
except:
|
||
pass
|
||
|
||
return document_category, document_production
|
||
|
||
def get_objective_fund_name(self, page_text: str) -> str:
|
||
fund_name = ""
|
||
if self.doc_source == "aus_prospectus":
|
||
objective_fund_name_prompts_file = os.path.join(self.instruction_folder, "objective_fund_name_prompts.json")
|
||
if not os.path.exists(objective_fund_name_prompts_file):
|
||
return fund_name
|
||
with open(objective_fund_name_prompts_file, "r", encoding="utf-8") as f:
|
||
objective_fund_name_prompt = "\n".join(json.load(f).get("prompts", []))
|
||
if len(objective_fund_name_prompt) > 0:
|
||
prompts = f"Context: \n{page_text}\n\Instructions: \n{objective_fund_name_prompt}"
|
||
result, with_error = chat(
|
||
prompt=prompts, response_format={"type": "json_object"}, max_tokens=1000
|
||
)
|
||
response = result.get("response", "")
|
||
if not with_error:
|
||
try:
|
||
data = json.loads(response)
|
||
fund_name = data.get("fund_name", "")
|
||
except:
|
||
pass
|
||
return fund_name
|
||
|
||
|
||
def get_datapoint_page_info(self, datapoint_page_info: dict) -> dict:
|
||
"""
|
||
If document source is aus_propectus and document category is MIS
|
||
then remove the administration_fee from datapoint_page_info
|
||
"""
|
||
if self.doc_source == "aus_prospectus" and self.document_category.upper() == "MIS":
|
||
if "administration_fees" in list(datapoint_page_info.keys()):
|
||
datapoint_page_info.pop("administration_fees")
|
||
if "total_annual_dollar_based_charges" in list(datapoint_page_info.keys()):
|
||
datapoint_page_info.pop("total_annual_dollar_based_charges")
|
||
return datapoint_page_info
|
||
|
||
def get_investment_objective_pages(self):
|
||
investment_objective_pages = []
|
||
if self.document_type == 1:
|
||
objective_strategy_regex_config_file = os.path.join(self.configuration_folder, "objective_strategy_regex.json")
|
||
with open(objective_strategy_regex_config_file, "r", encoding="utf-8") as f:
|
||
objective_strategy_regex_config = json.load(f)
|
||
objective_start_regex = objective_strategy_regex_config.get("objective_strategy", {}).get("start", "")
|
||
|
||
if objective_start_regex is not None and len(objective_start_regex) > 0:
|
||
for page_index, text in self.page_text_dict.items():
|
||
if re.search(objective_start_regex, text, re.I):
|
||
investment_objective_pages.append(page_index)
|
||
if len(investment_objective_pages) > 0:
|
||
investment_objective_pages.sort()
|
||
return investment_objective_pages
|
||
|
||
def get_datapoint_reported_name(self):
|
||
language_config_file = os.path.join(self.configuration_folder, "language.json")
|
||
self.language_config = {}
|
||
with open(language_config_file, "r", encoding="utf-8") as file:
|
||
self.language_config = json.load(file)
|
||
|
||
self.language_id = self.document_mapping_info_df["Language"].iloc[0]
|
||
self.language = self.language_config.get(self.language_id, None)
|
||
|
||
datapoint_reported_name_config_file = os.path.join(self.configuration_folder, "datapoint_reported_name.json")
|
||
all_datapoint_reported_name = {}
|
||
with open(datapoint_reported_name_config_file, "r", encoding="utf-8") as file:
|
||
all_datapoint_reported_name = json.load(file)
|
||
|
||
non_english_reported_name_config = {}
|
||
datapoint_reported_name_config = {}
|
||
common_language = "english"
|
||
for datapoint, language_reported_name in all_datapoint_reported_name.items():
|
||
reported_name_list = language_reported_name.get(common_language, [])
|
||
|
||
if self.language != "english":
|
||
reported_name_list.extend(language_reported_name.get(self.language, []))
|
||
non_english_reported_name_config[datapoint] = language_reported_name.get(self.language, [])
|
||
# remove duplicate reported name
|
||
reported_name_list = list(set(reported_name_list))
|
||
# sort the reported name
|
||
reported_name_list.sort()
|
||
datapoint_reported_name_config[datapoint] = reported_name_list
|
||
return datapoint_reported_name_config, non_english_reported_name_config
|
||
|
||
def get_provider_mapping(self):
|
||
if len(self.document_mapping_info_df) == 0:
|
||
return pd.DataFrame()
|
||
provider_id_list = (
|
||
self.document_mapping_info_df["ProviderId"].unique().tolist()
|
||
)
|
||
provider_mapping_list = []
|
||
for provider_id in provider_id_list:
|
||
provider_mapping_list.append(query_investment_by_provider(provider_id, rerun=False))
|
||
provider_mapping_df = pd.concat(provider_mapping_list)
|
||
provider_mapping_df = provider_mapping_df.drop_duplicates()
|
||
provider_mapping_df.reset_index(drop=True, inplace=True)
|
||
return provider_mapping_df
|
||
|
||
def get_pdf_image_base64(self, page_index: int) -> dict:
|
||
pdf_util = PDFUtil(self.pdf_file)
|
||
return pdf_util.extract_image_from_page(page_index=page_index,
|
||
output_folder=self.output_image_folder)
|
||
def get_instructions_config(self) -> dict:
|
||
instructions_config_file = os.path.join(self.instruction_folder, "data_extraction_prompts_config.json")
|
||
with open(instructions_config_file, "r", encoding="utf-8") as f:
|
||
instructions_config = json.load(f)
|
||
return instructions_config
|
||
|
||
def get_datapoint_level(self) -> dict:
|
||
datapoint_level_file = os.path.join(self.configuration_folder, "datapoint_level.json")
|
||
with open(datapoint_level_file, "r", encoding="utf-8") as f:
|
||
datapoint_level = json.load(f)
|
||
return datapoint_level
|
||
|
||
def get_datapoint_type(self) -> dict:
|
||
datapoint_type_file = os.path.join(self.configuration_folder, "datapoint_type.json")
|
||
with open(datapoint_type_file, "r", encoding="utf-8") as f:
|
||
datapoint_type = json.load(f)
|
||
return datapoint_type
|
||
|
||
def get_datapoint_name(self) -> dict:
|
||
datapoint_name_file = os.path.join(self.configuration_folder, "datapoint_name.json")
|
||
with open(datapoint_name_file, "r", encoding="utf-8") as f:
|
||
datapoint_name = json.load(f)
|
||
return datapoint_name
|
||
|
||
def get_replace_table_header_config(self) -> str:
|
||
replace_table_header_file = os.path.join(self.configuration_folder, "replace_table_header.json")
|
||
if os.path.exists(replace_table_header_file):
|
||
with open(replace_table_header_file, "r", encoding="utf-8") as f:
|
||
replace_table_header_config = json.load(f).get("details", [])
|
||
return replace_table_header_config
|
||
else:
|
||
return []
|
||
|
||
def get_pdf_page_text_dict(self) -> dict:
|
||
pdf_util = PDFUtil(self.pdf_file)
|
||
success, text, page_text_dict = pdf_util.extract_text()
|
||
return page_text_dict
|
||
|
||
def get_page_nums_from_datapoint_page_info(self) -> list:
|
||
page_nums_with_datapoints = []
|
||
for datapoint, page_nums in self.datapoint_page_info.items():
|
||
if datapoint == "doc_id":
|
||
continue
|
||
page_nums_with_datapoints.extend(page_nums)
|
||
page_nums_with_datapoints = list(set(page_nums_with_datapoints))
|
||
# sort the page numbers
|
||
page_nums_with_datapoints.sort()
|
||
return page_nums_with_datapoints
|
||
|
||
def extract_data(self) -> dict:
|
||
logger.info(f"Extracting data from document {self.doc_id}, extract way: {self.extract_way}")
|
||
if self.extract_way == "text":
|
||
data_list = self.extract_data_by_text()
|
||
elif self.extract_way == "image":
|
||
data_list = self.extract_data_by_image()
|
||
else:
|
||
data_list = self.extract_data_by_text()
|
||
if self.doc_source == "aus_prospectus":
|
||
data_list = self.post_supplement_data(data_list)
|
||
# data_list = remove_abundant_data(data_list)
|
||
self.output_data_to_file(data_list)
|
||
return data_list
|
||
|
||
def post_supplement_data(self, data_list: list) -> list:
|
||
"""
|
||
data_dict = {"doc_id": self.doc_id}
|
||
data_dict["page_index"] = page_num
|
||
data_dict["datapoints"] = ", ".join(page_datapoints)
|
||
data_dict["page_text"] = page_text
|
||
data_dict["instructions"] = instructions
|
||
data_dict["raw_answer"] = response
|
||
data_dict["extract_data"] = data
|
||
data_dict["extract_way"] = original_way
|
||
data_dict["prompt_token"] = result.get("prompt_token", 0)
|
||
data_dict["completion_token"] = result.get("completion_token", 0)
|
||
data_dict["total_token"] = result.get("total_token", 0)
|
||
"""
|
||
data_list = self.check_benchmark(data_list)
|
||
data_list = self.supplement_ttr_pension(data_list)
|
||
data_list = self.align_fund_share_name(data_list)
|
||
data_list = self.supplement_minimum_initial_investment(data_list)
|
||
data_list = self.check_total_annual_dollar_based_charges(data_list)
|
||
data_list, datapoint_list_with_production_name = self.post_adjust_for_value_with_production_name(data_list)
|
||
data_list = self.remove_duplicate_data(data_list)
|
||
if "management_fee" not in datapoint_list_with_production_name and "management_fee_and_costs" not in datapoint_list_with_production_name:
|
||
data_list, adjust = self.post_management_fee_exclude_performance_fee(data_list)
|
||
if not adjust:
|
||
data_list = self.post_adjust_management_fee_costs(data_list)
|
||
|
||
data_list = self.check_administration_fees(data_list)
|
||
return data_list
|
||
|
||
def check_benchmark(self, data_list: list):
|
||
"""
|
||
Remove illegal benchmark data
|
||
e.g.
|
||
annual growth in dividends received from the underlying companies
|
||
The fund's composite benchmark is shown on page 11
|
||
benchmark
|
||
A range of published indices
|
||
composite benchmark
|
||
fund’s composite benchmark
|
||
|
||
The rules are:
|
||
1. If starts with alphabet and not starts with upper case, then remove it
|
||
2. If starts with "A range", then remove it
|
||
3. If starts with "The fund", then remove it
|
||
"""
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
remove_items = []
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
if "benchmark_name" not in keys:
|
||
continue
|
||
benchmark_name = data_item.get("benchmark_name", "")
|
||
if benchmark_name.startswith("A range") or benchmark_name.startswith("The fund") or \
|
||
benchmark_name.startswith("CPI "):
|
||
data_item.pop("benchmark_name")
|
||
elif benchmark_name[0].isalpha() and not benchmark_name[0].isupper():
|
||
data_item.pop("benchmark_name")
|
||
elif benchmark_name.lower().endswith("benchmark"):
|
||
data_item.pop("benchmark_name")
|
||
else:
|
||
pass
|
||
|
||
keys = [key for key in keys if key not in ["fund_name", "share_name"]]
|
||
if len(keys) == 0:
|
||
remove_items.append(data_item)
|
||
|
||
for remove_item in remove_items:
|
||
if remove_item in extract_data["data"]:
|
||
extract_data["data"].remove(remove_item)
|
||
return data_list
|
||
|
||
def align_fund_share_name(self, data_list: list):
|
||
"""
|
||
Align the fund name and share name to be the same format
|
||
"""
|
||
if self.document_production is None or len(self.document_production) == 0:
|
||
return data_list
|
||
fund_name_list = []
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
fund_name = data_item.get("fund_name", "")
|
||
if len(fund_name) == 0:
|
||
continue
|
||
if fund_name not in fund_name_list:
|
||
fund_name_list.append(fund_name)
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
if len(fund_name) == 0:
|
||
continue
|
||
for c_fund_name in fund_name_list:
|
||
if c_fund_name == fund_name:
|
||
continue
|
||
if len(fund_name) < len(c_fund_name) and c_fund_name.endswith(fund_name):
|
||
if c_fund_name.replace(fund_name, "").strip() in self.document_production:
|
||
data_item["fund_name"] = c_fund_name
|
||
if share_name == fund_name:
|
||
data_item["share_name"] = c_fund_name
|
||
break
|
||
return data_list
|
||
|
||
def supplement_ttr_pension(self, data_list: list):
|
||
"""
|
||
If with fund name ends with "TTR" and "Pension", and exist same fund name without "TTR" or "Pension",
|
||
Supplement the data of "TTR" and "Pension" to the same fund name without "TTR" or "Pension"
|
||
"""
|
||
ttr_fund_name_list = []
|
||
pension_fund_name_list = []
|
||
exist_ttr = False
|
||
exist_pension = False
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
fund_name = data_item.get("fund_name", "")
|
||
if len(fund_name) == 0:
|
||
continue
|
||
share_name = data_item.get("share_name", "")
|
||
|
||
updated_fund_name = self.update_pension_ttr_fund_name(fund_name)
|
||
if updated_fund_name != fund_name:
|
||
fund_name = updated_fund_name
|
||
data_item["fund_name"] = fund_name
|
||
updated_share_name = self.update_pension_ttr_fund_name(share_name)
|
||
if updated_share_name != share_name:
|
||
share_name = updated_share_name
|
||
data_item["share_name"] = share_name
|
||
|
||
fund_name_splits = fund_name.split()
|
||
if fund_name_splits[-1] == "TTR" and fund_name not in ttr_fund_name_list:
|
||
ttr_fund_name_list.append(fund_name)
|
||
exist_ttr = True
|
||
if fund_name_splits[-1] == "Pension" and fund_name not in pension_fund_name_list:
|
||
pension_fund_name_list.append(fund_name)
|
||
exist_pension = True
|
||
if exist_ttr and exist_pension:
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
new_item_list = []
|
||
remove_item_list = []
|
||
for data_item in data:
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
if len(fund_name) == 0:
|
||
continue
|
||
fund_name_splits = fund_name.split()
|
||
if fund_name_splits[-1] == "TTR" or fund_name_splits[-1] == "Pension":
|
||
continue
|
||
keys = [key for key in list(data_item.keys())
|
||
if key not in ["fund_name", "share_name"]]
|
||
for ttr_fund_name in ttr_fund_name_list:
|
||
ttr_pure_fund_name = ttr_fund_name.replace(" TTR", "")
|
||
if fund_name == ttr_pure_fund_name:
|
||
new_fund_name = f"{fund_name} TTR"
|
||
if share_name == fund_name:
|
||
share_name = new_fund_name
|
||
new_item = {"fund_name": new_fund_name, "share_name": share_name}
|
||
for key in keys:
|
||
new_item[key] = data_item.get(key, "")
|
||
new_item_list.append(new_item)
|
||
if data_item not in remove_item_list:
|
||
remove_item_list.append(data_item)
|
||
break
|
||
for pension_fund_name in pension_fund_name_list:
|
||
pernsion_pure_fund_name = pension_fund_name.replace(" Pension", "")
|
||
if fund_name == pernsion_pure_fund_name:
|
||
new_fund_name = f"{fund_name} Pension"
|
||
if share_name == fund_name:
|
||
share_name = new_fund_name
|
||
new_item = {"fund_name": new_fund_name, "share_name": share_name}
|
||
for key in keys:
|
||
new_item[key] = data_item.get(key, "")
|
||
new_item_list.append(new_item)
|
||
if data_item not in remove_item_list:
|
||
remove_item_list.append(data_item)
|
||
break
|
||
for remove_item in remove_item_list:
|
||
if remove_item in data:
|
||
data.remove(remove_item)
|
||
data.extend(new_item_list)
|
||
return data_list
|
||
|
||
def update_pension_ttr_fund_name(self, investment_name: str):
|
||
pension_prefix_list = ["retirement account", "account-based pension", "account based pension"]
|
||
ttr_prefix_list = ["transition to retirement account", "pre-retirement pension", "pre retirement pension"]
|
||
investment_name_lower = investment_name.lower()
|
||
for pension_prefix in pension_prefix_list:
|
||
if investment_name_lower.startswith(pension_prefix) and investment_name_lower != pension_prefix:
|
||
pension_prefix_split = pension_prefix.split()
|
||
investment_name = " ".join(investment_name.split()[len(pension_prefix_split):]) + " Pension"
|
||
break
|
||
for ttr_prefix in ttr_prefix_list:
|
||
if investment_name_lower.startswith(ttr_prefix) and investment_name_lower != ttr_prefix:
|
||
ttr_prefix_split = ttr_prefix.split()
|
||
investment_name = " ".join(investment_name.split()[len(ttr_prefix_split):]) + " TTR"
|
||
break
|
||
return investment_name
|
||
|
||
def check_administration_fees(self, data_list: list):
|
||
"""
|
||
If document source is aus_prospectus and document category is MIS, then remove the administration fees from data_list
|
||
"""
|
||
if self.doc_source == "aus_prospectus" and self.document_category.upper() == "MIS":
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
remove_items = []
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
if "administration_fees" in keys:
|
||
data_item.pop("administration_fees")
|
||
if "total_annual_dollar_based_charges" in keys:
|
||
data_item.pop("total_annual_dollar_based_charges")
|
||
keys = [key for key in list(data_item.keys()) if key not in ["fund_name", "share_name"]]
|
||
if len(keys) == 0:
|
||
remove_items.append(data_item)
|
||
|
||
for remove_item in remove_items:
|
||
try:
|
||
if remove_item in extract_data["data"]:
|
||
extract_data["data"].remove(remove_item)
|
||
except:
|
||
pass
|
||
return data_list
|
||
|
||
def check_total_annual_dollar_based_charges(self, data_list: list):
|
||
"""
|
||
If found total_annual_dollar_based_charges and could be divisible by 52 or 12,
|
||
then set the fund name and share name to be document production name.
|
||
"""
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
found = False
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
if len(fund_name) == 0:
|
||
continue
|
||
if "total_annual_dollar_based_charges" in keys:
|
||
value = data_item.get("total_annual_dollar_based_charges", -1)
|
||
if len(str(value)) > 0:
|
||
value_divide_52 = value / 52
|
||
value_divide_12 = value / 12
|
||
if (value_divide_52 == round(value_divide_52, 4)) or \
|
||
(value_divide_12 == round(value_divide_12, 4)):
|
||
data_item["fund_name"] = self.document_production
|
||
data_item["share_name"] = self.document_production
|
||
found = True
|
||
break
|
||
if found:
|
||
break
|
||
return data_list
|
||
|
||
def post_adjust_for_value_with_production_name(self, data_list: list):
|
||
"""
|
||
If some datapoint with production name, then each fund/ share class in the same document for the datapoint should be with same value.
|
||
"""
|
||
raw_name_dict = self.get_raw_name_dict(data_list)
|
||
raw_name_list = list(raw_name_dict.keys())
|
||
raw_name_as_production_name = None
|
||
for raw_name in raw_name_list:
|
||
if self.is_production_name(raw_name):
|
||
raw_name_as_production_name = raw_name
|
||
break
|
||
datapoint_list_with_production_name = []
|
||
if raw_name_as_production_name is None:
|
||
return data_list, datapoint_list_with_production_name
|
||
|
||
raw_name_dict.pop(raw_name_as_production_name)
|
||
|
||
for data_dict in data_list:
|
||
# if data_dict.get("page_index", -1) > 9:
|
||
# break
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
remove_item_list = []
|
||
new_dp_item_list = []
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
raw_name = self.get_raw_name(fund_name, share_name)
|
||
if self.is_production_name(raw_name):
|
||
dp_keys = [key for key in keys if key not in ["fund_name",
|
||
"share_name",
|
||
"management_fee_and_costs",
|
||
"management_fee",
|
||
"buy_spread",
|
||
"sell_spread"]]
|
||
for dp_key in dp_keys:
|
||
if dp_key not in datapoint_list_with_production_name:
|
||
datapoint_list_with_production_name.append(dp_key)
|
||
remove_item_list.append(data_item)
|
||
for v_raw_name, v_dict in raw_name_dict.items():
|
||
v_fund_name = v_dict.get("fund_name", "")
|
||
v_share_name = v_dict.get("share_name", "")
|
||
if len(share_name) > 0:
|
||
new_dp_item = {"fund_name": v_fund_name, "share_name": v_share_name}
|
||
else:
|
||
new_dp_item = {"fund_name": v_fund_name}
|
||
for dp_key in dp_keys:
|
||
new_dp_item[dp_key] = data_item.get(dp_key, "")
|
||
new_dp_item["source"] = "from_production_name"
|
||
new_dp_item_list.append(new_dp_item)
|
||
for remove_item in remove_item_list:
|
||
if remove_item in extract_data["data"]:
|
||
extract_data["data"].remove(remove_item)
|
||
if len(new_dp_item_list) > 0:
|
||
extract_data["data"].extend(new_dp_item_list)
|
||
|
||
if len(datapoint_list_with_production_name) == 0:
|
||
return data_list, datapoint_list_with_production_name
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
remove_item_list = []
|
||
for data_item in data:
|
||
source = data_item.get("source", "")
|
||
if source == "from_production_name":
|
||
continue
|
||
keys = list(data_item.keys())
|
||
dp_keys = [key for key in keys if key not in ["fund_name", "share_name"]]
|
||
for dp_key in dp_keys:
|
||
if dp_key in datapoint_list_with_production_name:
|
||
data_item.pop(dp_key)
|
||
keys = list(data_item.keys())
|
||
dp_keys = [key for key in keys if key not in ["fund_name", "share_name"]]
|
||
if len(dp_keys) == 0:
|
||
remove_item_list.append(data_item)
|
||
for remove_item in remove_item_list:
|
||
if remove_item in extract_data["data"]:
|
||
extract_data["data"].remove(remove_item)
|
||
return data_list, datapoint_list_with_production_name
|
||
|
||
def is_production_name(self, text: str):
|
||
if text.lower() in self.document_production.lower():
|
||
return True
|
||
simlarity_util = Similarity()
|
||
similarity = simlarity_util.edit_distance_similarity(text, self.document_production)
|
||
if similarity > 0.93:
|
||
return True
|
||
return False
|
||
|
||
def remove_duplicate_data(self, data_list: list):
|
||
"""
|
||
The purpose is to remove duplicate data in the different pages.
|
||
Reason:
|
||
1. Some pdf documents have multiple pages for the same data
|
||
2. Usually, the first data is the latest data, and the others is the older data.
|
||
3. That's why we need to remove the duplicate data in the different pages.
|
||
"""
|
||
handled_data_dict_list = []
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
raw_name = self.get_raw_name(fund_name, share_name)
|
||
dp_keys = [key for key in keys if key not in ["fund_name", "share_name"]]
|
||
# sort the keys
|
||
dp_keys.sort()
|
||
additional_dp_keys = [dp_key for dp_key in dp_keys
|
||
if dp_key not in ["management_fee", "management_fee_and_costs"]]
|
||
if len(additional_dp_keys) == 0:
|
||
continue
|
||
for c_data_dict in data_list:
|
||
if c_data_dict in handled_data_dict_list:
|
||
continue
|
||
if c_data_dict == data_dict:
|
||
continue
|
||
c_extract_data = c_data_dict.get("extract_data", {})
|
||
c_data = c_extract_data.get("data", [])
|
||
remove_c_items = []
|
||
for c_data_item in c_data:
|
||
c_keys = list(c_data_item.keys())
|
||
c_fund_name = c_data_item.get("fund_name", "")
|
||
c_share_name = c_data_item.get("share_name", "")
|
||
c_raw_name = self.get_raw_name(c_fund_name, c_share_name)
|
||
if raw_name != c_raw_name:
|
||
continue
|
||
c_dp_keys = [key for key in c_keys if key not in ["fund_name", "share_name"]]
|
||
c_dp_keys.sort()
|
||
if dp_keys == c_dp_keys:
|
||
remove_c_items.append(c_data_item)
|
||
for remove_c_item in remove_c_items:
|
||
if remove_c_item in c_data:
|
||
c_data.remove(remove_c_item)
|
||
handled_data_dict_list.append(data_dict)
|
||
return data_list
|
||
|
||
def get_raw_name(self, fund_name: str, share_name: str) -> str:
|
||
raw_name = ""
|
||
if len(fund_name) == 0:
|
||
return raw_name
|
||
if fund_name == share_name:
|
||
raw_name = fund_name
|
||
elif len(share_name) > 0 and share_name.startswith(fund_name):
|
||
raw_name = share_name
|
||
else:
|
||
raw_name = f"{fund_name} {share_name}".strip()
|
||
return raw_name
|
||
|
||
def get_raw_name_dict(self, data_list: list) -> dict:
|
||
raw_name_dict = {}
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
raw_name = self.get_raw_name(fund_name, share_name)
|
||
if len(raw_name) == 0:
|
||
continue
|
||
# if isinstance(self.document_production, str) and \
|
||
# raw_name.lower() in self.document_production.lower():
|
||
# continue
|
||
if raw_name not in list(raw_name_dict.keys()):
|
||
raw_name_dict[raw_name] = {"fund_name": fund_name, "share_name": share_name}
|
||
return raw_name_dict
|
||
|
||
def post_management_fee_exclude_performance_fee(self, data_list: list):
|
||
adjust = False
|
||
mangement_fee_index_list = self.special_datapoint_feature.get("management_fee_including_performance_fee", {}).\
|
||
get("page_index", [])
|
||
if len(mangement_fee_index_list) == 0:
|
||
return data_list, adjust
|
||
min_page_index = min(mangement_fee_index_list)
|
||
performance_fee_item_list = []
|
||
for data_dict in data_list:
|
||
page_index = data_dict.get("page_index", -1)
|
||
if page_index <= min_page_index:
|
||
continue
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
share_name = data_item.get("share_name", "")
|
||
if len(share_name) == 0:
|
||
continue
|
||
if "performance_fee_costs" in keys:
|
||
performance_fee_item_list.append(data_item)
|
||
|
||
for data_dict in data_list:
|
||
page_index = data_dict.get("page_index", -1)
|
||
if page_index not in mangement_fee_index_list:
|
||
continue
|
||
extract_data = data_dict.get("extract_data", {})
|
||
management_fee_data_list = extract_data.get("data", [])
|
||
for management_fee_data in management_fee_data_list:
|
||
keys = list(management_fee_data.keys())
|
||
fund_name = management_fee_data.get("fund_name", "")
|
||
share_name = management_fee_data.get("share_name", "")
|
||
if len(fund_name) == 0 or len(share_name) == 0:
|
||
continue
|
||
if "management_fee_and_costs" in keys:
|
||
management_fee_and_costs = management_fee_data.get("management_fee_and_costs", -1)
|
||
try:
|
||
management_fee_and_costs = float(management_fee_and_costs)
|
||
except:
|
||
management_fee_and_costs = -1
|
||
if management_fee_and_costs != -1:
|
||
for performance_fee_item in performance_fee_item_list:
|
||
pf_fund_name = performance_fee_item.get("fund_name", "")
|
||
pf_share_name = performance_fee_item.get("share_name", "")
|
||
if pf_fund_name == fund_name and pf_share_name == share_name:
|
||
performance_fee_costs = performance_fee_item.get("performance_fee_costs", -1)
|
||
try:
|
||
performance_fee_costs = float(performance_fee_costs)
|
||
except:
|
||
performance_fee_costs = -1
|
||
if performance_fee_costs != -1:
|
||
management_fee_data["management_fee_and_costs"] = management_fee_and_costs - performance_fee_costs
|
||
management_fee_data["management_fee"] = management_fee_data["management_fee_and_costs"]
|
||
management_fee_data["source"] = f"subtract_performance_fee_{performance_fee_costs}"
|
||
adjust = True
|
||
break
|
||
return data_list, adjust
|
||
|
||
def post_adjust_management_fee_costs(self, data_list: list):
|
||
"""
|
||
Adjust the management fee and management fee and costs
|
||
Because maybe the management fee and costs disclose in the first pages,
|
||
and the management fee disclose in the next pages.
|
||
According to biz rule, if can't find management fee when found management fee and costs, the management fee should be same as management fee and costs.
|
||
if can't find management fee and costs when found management fee, the management fee and costs should be same as management fee.
|
||
This function is to adjust the management fee and management fee and costs according to this case.
|
||
"""
|
||
management_fee_costs_list = []
|
||
management_fee_list = []
|
||
complex_rule_keywords = "Recoverable expenses \nEstimated other indirect costs"
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
exist_complex_rule_keywords = False
|
||
page_text = data_dict.get("page_text", "")
|
||
if complex_rule_keywords in page_text:
|
||
exist_complex_rule_keywords = True
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
if fund_name == "" or share_name == "":
|
||
continue
|
||
if "management_fee" in keys:
|
||
management_fee = data_item.get("management_fee", -1)
|
||
if management_fee != -1:
|
||
found = False
|
||
for mf in management_fee_list:
|
||
mf_fund_name = mf.get("fund_name", "")
|
||
mf_share_name = mf.get("share_name", "")
|
||
# if (mf_fund_name == fund_name and mf_share_name == share_name) or \
|
||
# (len(mf_fund_name) > 0 and len(mf_share_name) > 0 and mf_fund_name == mf_share_name and
|
||
# (mf_share_name.endswith(share_name) or share_name.endswith(mf_share_name))):
|
||
if (mf_fund_name == fund_name and mf_share_name == share_name):
|
||
if exist_complex_rule_keywords and \
|
||
("interposed_vehicle_performance_fee_cost" in keys or "recoverable_expenses" in keys):
|
||
mf["management_fee"] = management_fee
|
||
found = True
|
||
break
|
||
else:
|
||
mf_value = mf.get("management_fee", -1)
|
||
if mf_value != -1 and mf_value >= management_fee:
|
||
mf["management_fee"] = management_fee
|
||
found = True
|
||
break
|
||
if not found:
|
||
management_fee_list.append({"fund_name": fund_name,
|
||
"share_name": share_name,
|
||
"management_fee": management_fee})
|
||
if "management_fee_and_costs" in keys:
|
||
management_fee_costs = data_item.get("management_fee_and_costs", -1)
|
||
if management_fee_costs != -1:
|
||
found = False
|
||
for mfc in management_fee_costs_list:
|
||
mfc_fund_name = mfc.get("fund_name", "")
|
||
mfc_share_name = mfc.get("share_name", "")
|
||
# if (mfc_fund_name == fund_name and mfc_share_name == share_name) or \
|
||
# (len(mfc_fund_name) > 0 and len(mfc_share_name) > 0 and mfc_fund_name == mfc_share_name and
|
||
# (mfc_share_name.endswith(share_name) or share_name.endswith(mfc_share_name))):
|
||
if (mfc_fund_name == fund_name and mfc_share_name == share_name):
|
||
if exist_complex_rule_keywords and \
|
||
("interposed_vehicle_performance_fee_cost" in keys or "recoverable_expenses" in keys):
|
||
mfc["management_fee_and_costs"] = management_fee_costs
|
||
found = True
|
||
break
|
||
else:
|
||
mfc_value = mfc.get("management_fee_and_costs", -1)
|
||
if mfc_value != -1 and mfc_value <= management_fee_costs:
|
||
mfc["management_fee_and_costs"] = management_fee_costs
|
||
found = True
|
||
break
|
||
if not found:
|
||
management_fee_costs_list.append({"fund_name": fund_name,
|
||
"share_name": share_name,
|
||
"management_fee_and_costs": management_fee_costs})
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
fund_name = data_item.get("fund_name", "")
|
||
share_name = data_item.get("share_name", "")
|
||
if fund_name == "" or share_name == "":
|
||
continue
|
||
if "management_fee" in keys:
|
||
for mf in management_fee_list:
|
||
if mf.get("fund_name", "") == fund_name and mf.get("share_name", "") == share_name:
|
||
data_item["management_fee"] = mf.get("management_fee", -1)
|
||
break
|
||
if "management_fee_and_costs" in keys:
|
||
for mfc in management_fee_costs_list:
|
||
if mfc.get("fund_name", "") == fund_name and mfc.get("share_name", "") == share_name:
|
||
data_item["management_fee_and_costs"] = mfc.get("management_fee_and_costs", -1)
|
||
break
|
||
return data_list
|
||
|
||
def supplement_minimum_initial_investment(self, data_list: list):
|
||
"""
|
||
Minimum initial investment should be same as from every fund/ share class in the same document.
|
||
This function is to supplement the minimum initial investment to each fund/ share class in the same document.
|
||
"""
|
||
exist_minimum_initial_investment = False
|
||
minimum_initial_investment = -1
|
||
mii_dict = None
|
||
for data_dict in data_list:
|
||
extract_data = data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
remove_items = []
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
if "minimum_initial_investment" in keys:
|
||
exist_minimum_initial_investment = True
|
||
if minimum_initial_investment == -1:
|
||
minimum_initial_investment = data_item.get("minimum_initial_investment", -1)
|
||
if mii_dict is None:
|
||
mii_dict = data_dict
|
||
remove_items.append(data_item)
|
||
for data_item in remove_items:
|
||
try:
|
||
data_dict["extract_data"]["data"].remove(data_item)
|
||
except:
|
||
pass
|
||
if exist_minimum_initial_investment and minimum_initial_investment != -1:
|
||
# get all of funds in data_list
|
||
fund_name_list = []
|
||
for iter_data_dict in data_list:
|
||
extract_data = iter_data_dict.get("extract_data", {})
|
||
data = extract_data.get("data", [])
|
||
for data_item in data:
|
||
keys = list(data_item.keys())
|
||
if "fund_name" in keys:
|
||
fund_name = data_item.get("fund_name", "")
|
||
if len(fund_name) > 0 and fund_name not in fund_name_list:
|
||
fund_name_list.append(fund_name)
|
||
# rewrite mii_dict, set each fund name with same minimum_initial_investment value
|
||
new_mii_data_list = []
|
||
for fund_name in fund_name_list:
|
||
new_data_dict = {"fund_name": fund_name, "minimum_initial_investment": minimum_initial_investment}
|
||
new_mii_data_list.append(new_data_dict)
|
||
|
||
mii_dict["extract_data"]["data"].extend(new_mii_data_list)
|
||
return data_list
|
||
|
||
def extract_data_by_text(self) -> dict:
|
||
"""
|
||
keys are
|
||
doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name
|
||
"""
|
||
data_list = []
|
||
pdf_page_count = len(self.page_text_dict.keys())
|
||
handled_page_num_list = []
|
||
|
||
previous_page_num = -1
|
||
previous_page_datapoints = []
|
||
previous_page_fund_name = None
|
||
for page_num, page_text in self.page_text_dict.items():
|
||
# if page_num not in [37, 38]:
|
||
# continue
|
||
if page_num in handled_page_num_list:
|
||
continue
|
||
page_datapoints = self.get_datapoints_by_page_num(page_num)
|
||
if len(page_datapoints) == 0:
|
||
continue
|
||
if previous_page_num == page_num - 1 and \
|
||
previous_page_datapoints == page_datapoints and \
|
||
previous_page_fund_name is not None:
|
||
# Transfer previous page fund name to be the pre-fix of page text
|
||
# The purpose is to get fund name if the first records without fund name
|
||
# example document: 431073795, page index 1727 to 1728
|
||
logger.info(f"Transfer previous page fund name: {previous_page_fund_name} to be the pre-fix of page text")
|
||
page_text = f"\nThe last fund name of previous PDF page: {previous_page_fund_name}\n{page_text}"
|
||
else:
|
||
previous_page_fund_name = None
|
||
|
||
page_text = replace_special_table_header(self.replace_table_header_config, page_text)
|
||
extract_data = self.extract_data_by_page(
|
||
page_num,
|
||
page_text,
|
||
page_datapoints,
|
||
need_exclude=False,
|
||
exclude_data=None,
|
||
previous_page_last_fund=previous_page_fund_name
|
||
)
|
||
if extract_data is None:
|
||
continue
|
||
data_list.append(extract_data)
|
||
|
||
|
||
page_data_list = extract_data.get("extract_data", {}).get("data", [])
|
||
if len(page_data_list) > 0:
|
||
previous_page_num = page_num
|
||
previous_page_fund_name = page_data_list[-1].get("fund_name", "")
|
||
previous_page_datapoints = page_datapoints
|
||
extract_way = extract_data.get("extract_way", "text")
|
||
current_page_data_count = len(page_data_list)
|
||
if current_page_data_count > 0:
|
||
count = 1
|
||
# some pdf documents have multiple pages for the same data
|
||
# and the next page may without table header with data point keywords.
|
||
# the purpose is try to get data from the next page
|
||
if extract_way == "text":
|
||
current_text = page_text
|
||
else:
|
||
current_text = ""
|
||
|
||
while True:
|
||
try:
|
||
next_page_num = page_num + count
|
||
if next_page_num >= pdf_page_count:
|
||
break
|
||
if self.document_type == 1 and next_page_num in self.investment_objective_pages:
|
||
break
|
||
next_datapoints = page_datapoints
|
||
if next_page_num in self.page_nums_with_datapoints:
|
||
should_continue = False
|
||
next_datapoints = self.get_datapoints_by_page_num(next_page_num)
|
||
if len(next_datapoints) == 0:
|
||
should_continue = True
|
||
else:
|
||
for next_datapoint in next_datapoints:
|
||
if self.doc_source == "aus_prospectus":
|
||
if next_datapoint in page_datapoints:
|
||
should_continue = False
|
||
break
|
||
else:
|
||
if next_datapoint not in page_datapoints:
|
||
should_continue = True
|
||
break
|
||
if should_continue:
|
||
next_datapoints.extend(page_datapoints)
|
||
# remove duplicate datapoints
|
||
next_datapoints = list(set(next_datapoints))
|
||
if not should_continue:
|
||
break
|
||
if extract_way == "text":
|
||
next_page_text = self.page_text_dict.get(next_page_num, "")
|
||
with_same_structure_table = self.is_next_page_with_same_structure_table(
|
||
current_text, next_page_text
|
||
)
|
||
if not with_same_structure_table:
|
||
break
|
||
next_page_text = replace_special_table_header(self.replace_table_header_config,
|
||
next_page_text)
|
||
target_text = current_text + next_page_text
|
||
else:
|
||
target_text = ""
|
||
|
||
# try to get data by current page_datapoints
|
||
logger.info(f"Try to get data from next page {next_page_num}")
|
||
next_page_extract_data = self.extract_data_by_page(
|
||
next_page_num,
|
||
target_text,
|
||
next_datapoints,
|
||
need_exclude=True,
|
||
exclude_data=page_data_list,
|
||
previous_page_last_fund=previous_page_fund_name
|
||
)
|
||
if next_page_extract_data is None:
|
||
break
|
||
next_page_data_list = next_page_extract_data.get(
|
||
"extract_data", {}
|
||
).get("data", [])
|
||
# print(f"next_page_data_list: {next_page_num}")
|
||
# print(next_page_data_list)
|
||
if next_page_data_list is not None and len(next_page_data_list) > 0:
|
||
for current_page_data in page_data_list:
|
||
if current_page_data in next_page_data_list:
|
||
next_page_data_list.remove(current_page_data)
|
||
if len(next_page_data_list) == 0:
|
||
break
|
||
next_page_extract_data["extract_data"][
|
||
"data"
|
||
] = next_page_data_list
|
||
data_list.append(next_page_extract_data)
|
||
previous_page_num = next_page_num
|
||
previous_page_fund_name = next_page_data_list[-1].get("fund_name", "")
|
||
handled_page_num_list.append(next_page_num)
|
||
exist_current_page_datapoint = False
|
||
for next_page_data in next_page_data_list:
|
||
for page_datapoint in page_datapoints:
|
||
if page_datapoint in list(next_page_data.keys()):
|
||
exist_current_page_datapoint = True
|
||
break
|
||
if exist_current_page_datapoint:
|
||
break
|
||
if not exist_current_page_datapoint:
|
||
break
|
||
else:
|
||
data_list.append(next_page_extract_data)
|
||
break
|
||
count += 1
|
||
except Exception as e:
|
||
logger.error(f"Error in extracting data from next page: {e}")
|
||
break
|
||
|
||
# self.output_data_to_file(data_list)
|
||
return data_list
|
||
|
||
def is_next_page_with_same_structure_table(self, current_page_text: str, next_page_text: str) -> bool:
|
||
with_same_structure_table = False
|
||
compare_table_structure_prompts_file = os.path.join(self.instruction_folder, "compare_table_structure_prompts.json")
|
||
with open(compare_table_structure_prompts_file, "r", encoding="utf-8") as f:
|
||
compare_table_structure_prompts = "\n".join(json.load(f).get("prompts", []))
|
||
if len(compare_table_structure_prompts) > 0:
|
||
prompts = f"Context: \ncurrent page contents:\n{current_page_text}\nnext page contents:\n{next_page_text}\nInstructions:\n{compare_table_structure_prompts}\n"
|
||
result, with_error = chat(
|
||
prompt=prompts, response_format={"type": "json_object"}, max_tokens=100
|
||
)
|
||
response = result.get("response", "")
|
||
if not with_error:
|
||
try:
|
||
data = json.loads(response)
|
||
answer = data.get("answer", "No")
|
||
if answer.lower() == "yes":
|
||
with_same_structure_table = True
|
||
except:
|
||
pass
|
||
|
||
return with_same_structure_table
|
||
|
||
def extract_data_by_image(self) -> dict:
|
||
"""
|
||
keys are
|
||
doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name
|
||
"""
|
||
data_list = []
|
||
pdf_page_count = len(self.page_text_dict.keys())
|
||
handled_page_num_list = []
|
||
for page_num, page_text in self.page_text_dict.items():
|
||
if page_num in handled_page_num_list:
|
||
continue
|
||
page_datapoints = self.get_datapoints_by_page_num(page_num)
|
||
if len(page_datapoints) == 0:
|
||
continue
|
||
|
||
extract_data = self.extract_data_by_page_image(page_num=page_num,
|
||
page_datapoints=page_datapoints)
|
||
data_list.append(extract_data)
|
||
|
||
page_data_list = extract_data.get("extract_data", {}).get("data", [])
|
||
|
||
current_page_data_count = len(page_data_list)
|
||
if current_page_data_count > 0:
|
||
count = 1
|
||
|
||
while count < 3:
|
||
try:
|
||
next_page_num = page_num + count
|
||
if next_page_num >= pdf_page_count:
|
||
break
|
||
next_datapoints = page_datapoints
|
||
if next_page_num in self.page_nums_with_datapoints:
|
||
should_continue = False
|
||
next_datapoints = self.get_datapoints_by_page_num(next_page_num)
|
||
if len(next_datapoints) == 0:
|
||
should_continue = True
|
||
else:
|
||
for next_datapoint in next_datapoints:
|
||
if next_datapoint not in page_datapoints:
|
||
should_continue = True
|
||
break
|
||
next_datapoints.extend(page_datapoints)
|
||
# remove duplicate datapoints
|
||
next_datapoints = list(set(next_datapoints))
|
||
if not should_continue:
|
||
break
|
||
# try to get data by current page_datapoints
|
||
next_page_extract_data = self.extract_data_by_page_image(
|
||
page_num=next_page_num,
|
||
page_datapoints=next_datapoints,
|
||
need_extract_text=False
|
||
)
|
||
next_page_data_list = next_page_extract_data.get(
|
||
"extract_data", {}
|
||
).get("data", [])
|
||
|
||
if next_page_data_list is not None and len(next_page_data_list) > 0:
|
||
data_list.append(next_page_extract_data)
|
||
handled_page_num_list.append(next_page_num)
|
||
exist_current_page_datapoint = False
|
||
for next_page_data in next_page_data_list:
|
||
for page_datapoint in page_datapoints:
|
||
if page_datapoint in list(next_page_data.keys()):
|
||
exist_current_page_datapoint = True
|
||
break
|
||
if exist_current_page_datapoint:
|
||
break
|
||
if not exist_current_page_datapoint:
|
||
break
|
||
else:
|
||
break
|
||
count += 1
|
||
except Exception as e:
|
||
logger.error(f"Error in extracting data from next page: {e}")
|
||
break
|
||
|
||
# self.output_data_to_file(data_list)
|
||
|
||
return data_list
|
||
|
||
def output_data_to_file(self, data_list: list) -> None:
|
||
json_data_file = os.path.join(
|
||
self.output_data_json_folder, f"{self.doc_id}.json"
|
||
)
|
||
with open(json_data_file, "w", encoding="utf-8") as f:
|
||
json.dump(data_list, f, ensure_ascii=False, indent=4)
|
||
|
||
data_df = pd.DataFrame(data_list)
|
||
data_df.reset_index(drop=True, inplace=True)
|
||
excel_data_file = os.path.join(
|
||
self.output_data_excel_folder, f"{self.doc_id}.xlsx"
|
||
)
|
||
with pd.ExcelWriter(excel_data_file) as writer:
|
||
data_df.to_excel(writer, sheet_name="extract_data", index=False)
|
||
|
||
def extract_data_by_page(
|
||
self,
|
||
page_num: int,
|
||
page_text: str,
|
||
page_datapoints: list,
|
||
need_exclude: bool = False,
|
||
exclude_data: list = None,
|
||
previous_page_last_fund: str = None) -> dict:
|
||
# If can't find numberic value, e.g. 1.25 or 3,88
|
||
# apply Vision ChatGPT to extract data
|
||
exist_data_point_value_text = False
|
||
for datapoint in page_datapoints:
|
||
if self.datapoint_type_config.get(datapoint, "") == "text":
|
||
exist_data_point_value_text = True
|
||
break
|
||
exist_numeric_value = False
|
||
special_code_all = []
|
||
page_text_line_count = 100
|
||
if not exist_data_point_value_text:
|
||
special_code_regex = r"\x10|\x11|\x12|\x13|\x14|\x15|\x16|\x17|\x18|\x19|\x1a|\x1b|\x1c|\x1d|\x1e|\x1f"
|
||
special_code_all = [code for code in re.findall(special_code_regex, page_text)
|
||
if code != "\n"]
|
||
page_text_line_count = len(page_text.split("\n"))
|
||
numeric_regex = r"\d+(\.|\,)\d+"
|
||
exist_numeric_value = (re.search(numeric_regex, page_text) is not None)
|
||
if not exist_data_point_value_text and (not exist_numeric_value or
|
||
page_text_line_count < 3 or
|
||
len(special_code_all) > 100):
|
||
if self.doc_source != "aus_prospectus":
|
||
logger.info(f"Can't find numberic value in page {page_num}, apply Vision ChatGPT to extract data")
|
||
return self.extract_data_by_page_image(
|
||
page_num=page_num,
|
||
page_datapoints=page_datapoints,
|
||
need_exclude=False,
|
||
exclude_data=None,
|
||
previous_page_last_fund=previous_page_last_fund,
|
||
need_extract_text=False)
|
||
else:
|
||
logger.info(f"Can't find numberic value in page {page_num}, skip to extract data")
|
||
return None
|
||
else:
|
||
return self.extract_data_by_page_text(
|
||
page_num=page_num,
|
||
page_text=page_text,
|
||
page_datapoints=page_datapoints,
|
||
need_exclude=need_exclude,
|
||
exclude_data=exclude_data,
|
||
previous_page_last_fund=previous_page_last_fund
|
||
)
|
||
|
||
def extract_data_by_page_text(
|
||
self,
|
||
page_num: int,
|
||
page_text: str,
|
||
page_datapoints: list,
|
||
need_exclude: bool = False,
|
||
exclude_data: list = None,
|
||
previous_page_last_fund: str = None,
|
||
original_way: str = "text"
|
||
) -> dict:
|
||
"""
|
||
keys are
|
||
doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name
|
||
"""
|
||
logger.info(f"Extracting data from page {page_num}")
|
||
if self.document_type == 1:
|
||
# pre_context = f"The document type is prospectus. \nThe fund names in this document are {', '.join(self.fund_name_list)}."
|
||
# if pre_context in page_text:
|
||
# page_text = page_text.replace(pre_context, "\n").strip()
|
||
pre_context = ""
|
||
if len(self.investment_objective_pages) > 0:
|
||
# Get the page number of the most recent investment objective at the top of the current page.
|
||
diff_pages = [page_num - investment_objective_page for investment_objective_page
|
||
in self.investment_objective_pages
|
||
if investment_objective_page <= page_num]
|
||
if len(diff_pages) > 0 and diff_pages[-1] < 5 and diff_pages[-1] > 0:
|
||
top_nearest_investment_objective_page = self.investment_objective_pages[len(diff_pages) - 1]
|
||
top_nearest_investment_objective_text = self.page_text_dict.get(top_nearest_investment_objective_page, "")
|
||
|
||
if top_nearest_investment_objective_text in page_text and \
|
||
top_nearest_investment_objective_text != page_text:
|
||
page_text = page_text.replace(top_nearest_investment_objective_text, "").strip()
|
||
pre_context_fund_name = self.get_objective_fund_name(top_nearest_investment_objective_text)
|
||
if pre_context_fund_name is not None and len(pre_context_fund_name) > 0:
|
||
pre_context = f"\nThe fund name for most recent investment objective page text is: \n{pre_context_fund_name}.\n"
|
||
# If can't find previous investment objective text, add the fund names to be the pre-fix of page text
|
||
page_text = f"{pre_context}\n{page_text}".strip()
|
||
|
||
instructions = self.get_instructions_by_datapoints(
|
||
page_text,
|
||
page_datapoints,
|
||
need_exclude,
|
||
exclude_data,
|
||
extract_way="text"
|
||
)
|
||
result, with_error = chat(
|
||
prompt=instructions, response_format={"type": "json_object"}
|
||
)
|
||
response = result.get("response", "")
|
||
if with_error:
|
||
logger.error(f"Error in extracting tables from page")
|
||
data_dict = {"doc_id": self.doc_id}
|
||
data_dict["page_index"] = page_num
|
||
data_dict["datapoints"] = ", ".join(page_datapoints)
|
||
data_dict["page_text"] = page_text
|
||
data_dict["instructions"] = instructions
|
||
data_dict["raw_answer"] = response
|
||
data_dict["extract_data"] = {"data": []}
|
||
data_dict["extract_way"] = original_way
|
||
data_dict["prompt_token"] = result.get("prompt_token", 0)
|
||
data_dict["completion_token"] = result.get("completion_token", 0)
|
||
data_dict["total_token"] = result.get("total_token", 0)
|
||
return data_dict
|
||
try:
|
||
data = json.loads(response)
|
||
except:
|
||
try:
|
||
# if occur error, perhaps the output length is over 4K tokens
|
||
# split the context to two parts and try to get data from the two parts
|
||
# Attention: after deploy ChatGPT4o 2024-08-16 version, the max token length is 16K,
|
||
# need not to split the context.
|
||
# data = self.chat_by_split_context(
|
||
# page_text, page_datapoints, need_exclude, exclude_data
|
||
# )
|
||
if len(data.get("data", [])) == 0:
|
||
data = json_repair.loads(response)
|
||
except:
|
||
data = {"data": []}
|
||
try:
|
||
if self.doc_source == "emea_ar":
|
||
data = self.validate_emea_ar_data(extract_data_info=data,
|
||
page_text=page_text,
|
||
previous_page_last_fund=previous_page_last_fund)
|
||
elif self.doc_source == "aus_prospectus":
|
||
data = self.validate_aus_prospectus_data(extract_data_info=data,
|
||
page_text=page_text,
|
||
page_num=page_num,
|
||
previous_page_last_fund=previous_page_last_fund)
|
||
else:
|
||
pass
|
||
except:
|
||
pass
|
||
|
||
data_dict = {"doc_id": self.doc_id}
|
||
data_dict["page_index"] = page_num
|
||
data_dict["datapoints"] = ", ".join(page_datapoints)
|
||
data_dict["page_text"] = page_text
|
||
data_dict["instructions"] = instructions
|
||
data_dict["raw_answer"] = response
|
||
data_dict["extract_data"] = data
|
||
data_dict["extract_way"] = original_way
|
||
data_dict["prompt_token"] = result.get("prompt_token", 0)
|
||
data_dict["completion_token"] = result.get("completion_token", 0)
|
||
data_dict["total_token"] = result.get("total_token", 0)
|
||
return data_dict
|
||
|
||
def extract_data_by_page_image(
|
||
self,
|
||
page_num: int,
|
||
page_datapoints: list,
|
||
need_exclude: bool = False,
|
||
exclude_data: list = None,
|
||
previous_page_last_fund: str = None,
|
||
need_extract_text: bool = False
|
||
) -> dict:
|
||
"""
|
||
keys are
|
||
doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name
|
||
"""
|
||
if need_extract_text:
|
||
logger.info(f"Extracting data from page {page_num} with extracting text as single step.")
|
||
page_text = self.get_image_text(page_num)
|
||
if page_text is None or len(page_text) == 0:
|
||
data_dict = {"doc_id": self.doc_id}
|
||
data_dict["page_index"] = page_num
|
||
data_dict["datapoints"] = ", ".join(page_datapoints)
|
||
data_dict["page_text"] = ""
|
||
data_dict["instructions"] = ""
|
||
data_dict["raw_answer"] = ""
|
||
data_dict["extract_data"] = {"data": []}
|
||
data_dict["extract_way"] = "image"
|
||
data_dict["prompt_token"] = 0
|
||
data_dict["completion_token"] = 0
|
||
data_dict["total_token"] = 0
|
||
return data_dict
|
||
else:
|
||
if previous_page_last_fund is not None and len(previous_page_last_fund) > 0:
|
||
logger.info(f"Transfer previous page fund name: {previous_page_last_fund} to be the pre-fix of page text")
|
||
page_text = f"\nThe last fund name of previous PDF page: {previous_page_last_fund}\n{page_text}"
|
||
return self.extract_data_by_page_text(
|
||
page_num=page_num,
|
||
page_text=page_text,
|
||
page_datapoints=page_datapoints,
|
||
need_exclude=need_exclude,
|
||
exclude_data=exclude_data,
|
||
previous_page_last_fund=previous_page_last_fund,
|
||
original_way="image"
|
||
)
|
||
else:
|
||
logger.info(f"Extracting data from page {page_num} without extracting text as single step.")
|
||
return self.extract_data_by_pure_image(
|
||
page_num=page_num,
|
||
page_datapoints=page_datapoints,
|
||
need_exclude=need_exclude,
|
||
exclude_data=exclude_data,
|
||
previous_page_last_fund=previous_page_last_fund
|
||
)
|
||
|
||
def extract_data_by_pure_image(
|
||
self,
|
||
page_num: int,
|
||
page_datapoints: list,
|
||
need_exclude: bool = False,
|
||
exclude_data: list = None,
|
||
previous_page_last_fund: str = None
|
||
) -> dict:
|
||
"""
|
||
keys are
|
||
doc_id, page_index, datapoint, value, raw_fund_name, fund_id, fund_name, raw_share_name, share_id, share_name
|
||
"""
|
||
image_base64 = self.get_pdf_image_base64(page_num)
|
||
instructions = self.get_instructions_by_datapoints(
|
||
previous_page_last_fund,
|
||
page_datapoints,
|
||
need_exclude=need_exclude,
|
||
exclude_data=exclude_data,
|
||
extract_way="image"
|
||
)
|
||
result, with_error = chat(
|
||
prompt=instructions, response_format={"type": "json_object"}, image_base64=image_base64
|
||
)
|
||
response = result.get("response", "")
|
||
if with_error:
|
||
logger.error(f"Error in extracting tables from page")
|
||
data_dict = {"doc_id": self.doc_id}
|
||
data_dict["page_index"] = page_num
|
||
data_dict["datapoints"] = ", ".join(page_datapoints)
|
||
data_dict["page_text"] = ""
|
||
data_dict["instructions"] = instructions
|
||
data_dict["raw_answer"] = response
|
||
data_dict["extract_data"] = {"data": []}
|
||
data_dict["extract_way"] = "image"
|
||
data_dict["prompt_token"] = result.get("prompt_token", 0)
|
||
data_dict["completion_token"] = result.get("completion_token", 0)
|
||
data_dict["total_token"] = result.get("total_token", 0)
|
||
return data_dict
|
||
try:
|
||
data = json.loads(response)
|
||
except:
|
||
try:
|
||
data = json_repair.loads(response)
|
||
except:
|
||
data = {"data": []}
|
||
try:
|
||
if self.doc_source == "emea_ar":
|
||
data = self.validate_emea_ar_data(data, None, previous_page_last_fund)
|
||
elif self.doc_source == "aus_prospectus":
|
||
data = self.validate_aus_prospectus_data(data, None, page_num, previous_page_last_fund)
|
||
else:
|
||
pass
|
||
except:
|
||
pass
|
||
|
||
data_dict = {"doc_id": self.doc_id}
|
||
data_dict["page_index"] = page_num
|
||
data_dict["datapoints"] = ", ".join(page_datapoints)
|
||
data_dict["page_text"] = ""
|
||
data_dict["instructions"] = instructions
|
||
data_dict["raw_answer"] = response
|
||
data_dict["extract_data"] = data
|
||
data_dict["extract_way"] = "image"
|
||
data_dict["prompt_token"] = result.get("prompt_token", 0)
|
||
data_dict["completion_token"] = result.get("completion_token", 0)
|
||
data_dict["total_token"] = result.get("total_token", 0)
|
||
return data_dict
|
||
|
||
def get_image_text(self, page_num: int) -> str:
|
||
image_base64 = self.get_pdf_image_base64(page_num)
|
||
instructions = self.instructions_config.get("get_image_text", "\n")
|
||
logger.info(f"Get text from image of page {page_num}")
|
||
result, with_error = chat(
|
||
prompt=instructions, response_format={"type": "json_object"}, image_base64=image_base64
|
||
)
|
||
response = result.get("response", "")
|
||
text = ""
|
||
if with_error:
|
||
logger.error(f"Can't get text from current image")
|
||
try:
|
||
data = json.loads(response)
|
||
except:
|
||
try:
|
||
data = json_repair.loads(response)
|
||
except:
|
||
pass
|
||
text = data.get("text", "")
|
||
# print(text)
|
||
return text
|
||
|
||
def validate_emea_ar_data(self,
|
||
extract_data_info: dict,
|
||
page_text: str,
|
||
previous_page_last_fund: str=None) -> dict:
|
||
"""
|
||
Validate data by the rules
|
||
1. Each data should be with fund name
|
||
2. For share level data, it should be with share name
|
||
"""
|
||
data_list = extract_data_info.get("data", [])
|
||
if len(data_list) == 0:
|
||
return extract_data_info
|
||
|
||
remove_list = []
|
||
performance_fee_regex = r"Amount\s+of\s+the\s+performance\s+fees|Performance\s+Fees\s+amounts|Performance\s+fees\s+amounts|Commissioni\s+di\s+performance|Performance\s+Fee\s+|Performance\s+fees\s+charged"
|
||
nav_regex = r"based\s+on\s+(the\s+)?NAV|on\s+the\s+Share\s+Class\s+NAV|NAV\s+of\s+performance\s+fee|of\s+the\s+average\s+Net\s+Asset\s+Value|Attivi\s+in\s+gestione|Performance\s+Fee\s+of\s+NAV\s+in|share\s+class\s+dealing\s+NAV"
|
||
if page_text is not None and len(page_text) > 0:
|
||
performance_fee_search = re.search(performance_fee_regex, page_text)
|
||
nav_search = re.search(nav_regex, page_text)
|
||
else:
|
||
performance_fee_search = None
|
||
nav_search = None
|
||
for data in data_list:
|
||
if (data.get("performance_fee", None) is not None and
|
||
performance_fee_search is not None and
|
||
nav_search is not None):
|
||
data.pop("performance_fee")
|
||
keys = [key for key in list(data.keys())
|
||
if key not in ["fund name", "share name"]]
|
||
if len(keys) == 0:
|
||
remove_list.append(data)
|
||
continue
|
||
raw_fund_name = data.get("fund name", "").strip()
|
||
if raw_fund_name == "":
|
||
remove_list.append(data)
|
||
continue
|
||
|
||
# Clean fund name start
|
||
if previous_page_last_fund is not None and len(previous_page_last_fund) > 0:
|
||
previous_page_last_fund = previous_page_last_fund.strip()
|
||
if raw_fund_name.startswith(previous_page_last_fund) and raw_fund_name != previous_page_last_fund:
|
||
modified_fund_name = raw_fund_name.replace(previous_page_last_fund, "").strip()
|
||
if len(modified_fund_name.split()) > 1:
|
||
raw_fund_name = modified_fund_name
|
||
raw_fund_name = self.get_fund_name(raw_fund_name, "Fund")
|
||
raw_fund_name = self.get_fund_name(raw_fund_name, "Bond")
|
||
|
||
remove_prefix_list = ["Market Specific Equity Sub-Funds",
|
||
"International and Regional Equity Sub-Funds",
|
||
"Equity Sub-Funds"]
|
||
for remove_item in remove_prefix_list:
|
||
if raw_fund_name.startswith(remove_item):
|
||
raw_fund_name = raw_fund_name.replace(remove_item, "").strip()
|
||
|
||
data["fund name"] = raw_fund_name
|
||
# Clean fund name end
|
||
|
||
keys = list(data.keys())
|
||
for key in keys:
|
||
if self.datapoint_level_config.get(key, "") == "share_level":
|
||
if data.get("share name", "") == "":
|
||
include_key_words = False
|
||
if key == "ter" and page_text is not None and len(page_text) > 0:
|
||
ter_regex = r"TER\s+in\s+\%|TER\s*\%"
|
||
ter_search = re.search(ter_regex, page_text)
|
||
if ter_search is not None:
|
||
include_key_words = True
|
||
if not include_key_words:
|
||
is_share_name = self.check_fund_name_as_share(raw_fund_name)
|
||
if not is_share_name:
|
||
remove_list.append(data)
|
||
break
|
||
data["share name"] = raw_fund_name
|
||
if data.get(key, "") == "":
|
||
data.pop(key)
|
||
for remove_data in remove_list:
|
||
if remove_data in data_list:
|
||
data_list.remove(remove_data)
|
||
# check performance_fee
|
||
for data in data_list:
|
||
performance_fee = data.get("performance_fee", None)
|
||
if performance_fee is not None:
|
||
try:
|
||
performance_fee = float(performance_fee)
|
||
if (performance_fee > 3 and performance_fee % 2.5 == 0) or \
|
||
performance_fee > 10:
|
||
data.pop("performance_fee")
|
||
except:
|
||
data.pop("performance_fee")
|
||
remove_list = []
|
||
for data in data_list:
|
||
keys = [key for key in list(data.keys())
|
||
if key not in ["fund name", "share name"]]
|
||
if len(keys) == 0:
|
||
remove_list.append(data)
|
||
for remove_data in remove_list:
|
||
if remove_data in data_list:
|
||
data_list.remove(remove_data)
|
||
# update "fund name" to be "fund_name"
|
||
# update "share name" to be "share_name"
|
||
new_data_list = []
|
||
multi_over_3_share_regex = r"([A-Z]{1,}\,\s){3,}"
|
||
exist_multi_over_3_share = False
|
||
for data in data_list:
|
||
raw_fund_name = data.get("fund name", "").strip()
|
||
if len(raw_fund_name) == 0:
|
||
continue
|
||
raw_share_name = data.get("share name", "")
|
||
if not exist_multi_over_3_share:
|
||
multi_over_3_share_search = re.search(multi_over_3_share_regex, raw_share_name)
|
||
if multi_over_3_share_search is not None:
|
||
exist_multi_over_3_share = True
|
||
if exist_multi_over_3_share:
|
||
share_name_list = self.split_multi_share_name(raw_share_name)
|
||
else:
|
||
share_name_list = [raw_share_name]
|
||
if len(share_name_list) > 0:
|
||
for share_name in share_name_list:
|
||
new_data = {}
|
||
new_data["fund_name"] = raw_fund_name
|
||
if share_name != "":
|
||
new_data["share_name"] = share_name
|
||
ter = data.get("ter", None)
|
||
if ter is not None:
|
||
new_data["ter"] = ter
|
||
performance_fee = data.get("performance fees", None)
|
||
if performance_fee is not None:
|
||
new_data["performance_fee"] = performance_fee
|
||
|
||
for key, value in data.items():
|
||
if key not in ["fund name", "share name", "ter", "performance fees"]:
|
||
new_data[key] = value
|
||
new_data_list.append(new_data)
|
||
|
||
extract_data_info["data"] = new_data_list
|
||
return extract_data_info
|
||
|
||
def validate_aus_prospectus_data(self,
|
||
extract_data_info: dict,
|
||
page_text: str,
|
||
page_num: int,
|
||
previous_page_last_fund: str=None) -> dict:
|
||
data_list = extract_data_info.get("data", [])
|
||
if len(data_list) == 0:
|
||
return extract_data_info
|
||
remove_list = []
|
||
for data in data_list:
|
||
raw_fund_name = data.get("fund name", "").strip()
|
||
if raw_fund_name == "":
|
||
remove_list.append(data)
|
||
continue
|
||
|
||
# Clean fund name start
|
||
if previous_page_last_fund is not None and len(previous_page_last_fund) > 0:
|
||
previous_page_last_fund = previous_page_last_fund.strip()
|
||
if raw_fund_name.startswith(previous_page_last_fund) and raw_fund_name != previous_page_last_fund:
|
||
modified_fund_name = raw_fund_name.replace(previous_page_last_fund, "").strip()
|
||
if len(modified_fund_name.split()) > 1:
|
||
raw_fund_name = modified_fund_name
|
||
data["fund name"] = raw_fund_name
|
||
for remove_data in remove_list:
|
||
if remove_data in data_list:
|
||
data_list.remove(remove_data)
|
||
|
||
new_data_list = []
|
||
multi_over_3_share_regex = r"([A-Z]{1,}\,\s){3,}"
|
||
exist_multi_over_3_share = False
|
||
for data in data_list:
|
||
raw_fund_name = data.get("fund name", "").strip()
|
||
if len(raw_fund_name) == 0:
|
||
continue
|
||
raw_share_name = data.get("share name", "")
|
||
if not exist_multi_over_3_share:
|
||
multi_over_3_share_search = re.search(multi_over_3_share_regex, raw_share_name)
|
||
if multi_over_3_share_search is not None:
|
||
exist_multi_over_3_share = True
|
||
if exist_multi_over_3_share:
|
||
share_name_list = self.split_multi_share_name(raw_share_name)
|
||
else:
|
||
share_name_list = [raw_share_name]
|
||
if len(share_name_list) > 0:
|
||
for share_name in share_name_list:
|
||
new_data = {}
|
||
new_data["fund_name"] = raw_fund_name
|
||
if share_name != "":
|
||
new_data["share_name"] = share_name
|
||
for key, value in data.items():
|
||
if key not in ["fund name", "share name"]:
|
||
new_data[key] = value
|
||
new_data_list.append(new_data)
|
||
extract_data_info["data"] = new_data_list
|
||
if page_text is not None and len(page_text) > 0:
|
||
self.set_datapoint_feature_properties(new_data_list, page_text, page_num)
|
||
return extract_data_info
|
||
|
||
def set_datapoint_feature_properties(self, data_list: list, page_text: str, page_num: int) -> None:
|
||
for feature, properties in self.special_datapoint_feature_config.items():
|
||
regex_text_list = properties.get("regex_text", [])
|
||
if len(regex_text_list) == 0:
|
||
continue
|
||
effective_datapoints = properties.get("effective_datapoints", [])
|
||
if len(effective_datapoints) == 0:
|
||
continue
|
||
provider_ids = properties.get("provider_ids", [])
|
||
if len(provider_ids) > 0:
|
||
is_current_provider = False
|
||
doc_provider_list = self.document_mapping_info_df["ProviderId"].unique().tolist()
|
||
if len(doc_provider_list) > 0:
|
||
for provider in provider_ids:
|
||
if provider in doc_provider_list:
|
||
is_current_provider = True
|
||
break
|
||
if not is_current_provider:
|
||
continue
|
||
exclude_datapoints = properties.get("exclude_datapoints", [])
|
||
|
||
exist_effective_datapoints = False
|
||
exist_exclude_datapoints = False
|
||
for data_item in data_list:
|
||
datapoints = [datapoint for datapoint in list(data_item.keys())
|
||
if datapoint in effective_datapoints]
|
||
if len(datapoints) > 0:
|
||
exist_effective_datapoints = True
|
||
datapoints = [datapoint for datapoint in list(data_item.keys())
|
||
if datapoint in exclude_datapoints]
|
||
if len(datapoints) > 0:
|
||
exist_exclude_datapoints = True
|
||
if exist_effective_datapoints and exist_exclude_datapoints:
|
||
break
|
||
|
||
if not exist_effective_datapoints:
|
||
continue
|
||
if exist_exclude_datapoints:
|
||
continue
|
||
found_regex_text = False
|
||
for regex_text in regex_text_list:
|
||
regex_search = re.search(regex_text, page_text)
|
||
if regex_search is not None:
|
||
found_regex_text = True
|
||
break
|
||
if found_regex_text:
|
||
if self.special_datapoint_feature[feature].get("page_index", None) is None:
|
||
self.special_datapoint_feature[feature]["page_index"] = []
|
||
self.special_datapoint_feature[feature]["page_index"].append(page_num)
|
||
|
||
def split_multi_share_name(self, raw_share_name: str) -> list:
|
||
"""
|
||
Some document, e.g. 481482392
|
||
Exist multi share name as table header, e.g. "Class A, B, E, M, N, P, R, U"
|
||
For this case, need split the share name to be ["Class A", "Class B", "Class E",
|
||
"Class M", "Class N", "Class P", "Class R", "Class U"]
|
||
"""
|
||
multi_over_2_share_regex = r"([A-Z]{1,}\,\s){2,}"
|
||
multi_over_2_share_search = re.search(multi_over_2_share_regex, raw_share_name)
|
||
share_name_list = [raw_share_name]
|
||
if multi_over_2_share_search is not None:
|
||
multi_share_splits = [share_name.strip() for share_name in raw_share_name.split(",")
|
||
if len(share_name.strip()) > 0]
|
||
first_share_name = multi_share_splits[0]
|
||
first_share_name_split = first_share_name.split()
|
||
share_name_prefix = None
|
||
if len(first_share_name_split) == 2:
|
||
share_name_prefix = first_share_name_split[0]
|
||
if share_name_prefix is not None and len(share_name_prefix) > 0:
|
||
new_share_name_list = []
|
||
for split in multi_share_splits:
|
||
if split == first_share_name:
|
||
new_share_name_list.append(split)
|
||
else:
|
||
new_share_name_list.append(f"{share_name_prefix} {split}")
|
||
share_name_list = new_share_name_list
|
||
else:
|
||
share_name_list = multi_share_splits
|
||
else:
|
||
share_name_list = multi_share_splits
|
||
return share_name_list
|
||
|
||
def get_fund_name(self, fund_name: str, fund_feature: str):
|
||
if not fund_name.endswith(fund_feature):
|
||
return fund_name
|
||
# to avoid split funds to fund s
|
||
fund_feature = fund_feature + " "
|
||
fund_name_split = fund_name.split(fund_feature)
|
||
if len(fund_name_split) > 1:
|
||
last_fund = fund_name_split[-1].strip()
|
||
if len(last_fund) == 0:
|
||
last_fund = fund_name_split[-2].strip()
|
||
fund_name = f"{last_fund} {fund_feature}"
|
||
return fund_name
|
||
|
||
def check_fund_name_as_share(self, fund_name: str) -> bool:
|
||
"""
|
||
Check if the fund name is the same as share name
|
||
"""
|
||
if len(fund_name) == 0 == 0:
|
||
return False
|
||
share_name_list = self.document_mapping_info_df["ShareClassName"].unique().tolist()
|
||
if len(share_name_list) == 0:
|
||
return False
|
||
max_similarity_name, max_similarity = get_most_similar_name(
|
||
text=fund_name,
|
||
name_list=share_name_list,
|
||
share_name=None,
|
||
fund_name=None,
|
||
matching_type="share",
|
||
process_cache=None)
|
||
if max_similarity >= 0.8:
|
||
return True
|
||
return False
|
||
|
||
def get_datapoints_by_page_num(self, page_num: int) -> list:
|
||
datapoints = []
|
||
|
||
for datapoint in self.datapoints:
|
||
if page_num in self.datapoint_page_info.get(datapoint, []):
|
||
datapoints.append(datapoint)
|
||
return datapoints
|
||
|
||
def get_instructions_by_datapoints(
|
||
self,
|
||
page_text: str,
|
||
datapoints: list,
|
||
need_exclude: bool = False,
|
||
exclude_data: list = None,
|
||
extract_way: str = "text",
|
||
) -> str:
|
||
"""
|
||
Get instructions to extract data from the page by the datapoints
|
||
Below is the instructions sections:
|
||
summary: string
|
||
reported_name by datapoints: dict
|
||
data_business_features: dict
|
||
common: list
|
||
investment_level by datapoints: dict
|
||
data_value_range by datapoints: dict
|
||
special_rule by datapoints: dict
|
||
special_cases: dict
|
||
common: list
|
||
title
|
||
contents
|
||
special_case by datapoints: list
|
||
title
|
||
contents
|
||
output_requirement
|
||
common: list
|
||
fund_level: list
|
||
share_level: dict
|
||
fund_name: list
|
||
share_name: list
|
||
ogc_value: list
|
||
ter_value: list
|
||
performance_fee_value: list
|
||
end
|
||
"""
|
||
instructions = []
|
||
if extract_way == "text":
|
||
instructions = [f"Context:\n{page_text}\n\nInstructions:\n"]
|
||
|
||
datapoint_name_list = []
|
||
for datapoint in datapoints:
|
||
datapoint_name = self.datapoint_name_config.get(datapoint, "")
|
||
datapoint_name_list.append(datapoint_name)
|
||
|
||
if extract_way == "text":
|
||
summary = self.instructions_config.get("summary", "\n")
|
||
elif extract_way == "image":
|
||
summary = self.instructions_config.get("summary_image", "\n")
|
||
if page_text is not None and len(page_text) > 0:
|
||
logger.info(f"Transfer previous page fund name: {page_text} to be the pre-fix of page text")
|
||
summary += f"\nThe last fund name of previous PDF page: {page_text}\n"
|
||
summary += "If could find the fund name for the first data point value, please ignore this fund name.\n"
|
||
else:
|
||
summary = self.instructions_config.get("summary", "\n")
|
||
|
||
instructions.append(summary.format(", ".join(datapoint_name_list)))
|
||
instructions.append("\n")
|
||
|
||
if extract_way == "image":
|
||
image_features = self.instructions_config.get("image_features", [])
|
||
instructions.extend(image_features)
|
||
instructions.append("\n")
|
||
|
||
instructions.append("## Datapoints Reported name\n")
|
||
instructions.append("Please look for relevant reported names and similar variations in the context.\n")
|
||
reported_name_info_in_instructions = self.instructions_config.get("reported_name", {})
|
||
for datapoint in datapoints:
|
||
reported_name_list = self.datapoint_reported_name_config.get(datapoint, [])
|
||
if len(reported_name_list) == 0:
|
||
reported_name = reported_name_info_in_instructions.get(datapoint, "")
|
||
else:
|
||
joined_reported_name = ", ".join(reported_name_list)
|
||
datapoint_name = datapoint
|
||
if datapoint_name == "performance_fee":
|
||
datapoint_name = "performance fees"
|
||
else:
|
||
datapoint_name = self.datapoint_name_config.get(datapoint_name, "")
|
||
if len(datapoint_name) == 0:
|
||
datapoint_name = datapoint.upper()
|
||
reported_name = f"The {datapoint_name} reported name could be:\n{joined_reported_name}"
|
||
|
||
instructions.append(reported_name)
|
||
instructions.append("\n")
|
||
instructions.append("\n")
|
||
|
||
if self.language != "english":
|
||
"""
|
||
"multilingual_reported_name": {
|
||
"describe": "Please be careful to extract relevant data by different reported names from multilingual Context.",
|
||
"regular_example_template": "{datapoint} Example {number}:\nLanguage: {language}\n---Context Start-----\n{fund_name}\n{share_name}\n{reported_name}\n{value}\n---Context End-----\nAnswer: {answer}",
|
||
"special_example_template_none": "{datapoint} Example {number}:\nLanguage: {language}\nValue is belong to \"-, *, **, N/A, N/A%, N/A %, NONE\", ignore it\n---Context Start-----\n{fund_name}\n{share_name}\n{reported_name} 2)\n-\n---Context End-----\nAnswer: {answer}",
|
||
"value_examples": ["1,98", "3.25", "2.16", "1,73", "4,53"]
|
||
"fund_example": "Fund 1",
|
||
"share_example": "Share 1"
|
||
}
|
||
"""
|
||
multilingual_reported_name_config = self.instructions_config.get("multilingual_reported_name", {})
|
||
describe = multilingual_reported_name_config.get("describe", "")
|
||
regular_example_template = multilingual_reported_name_config.get("regular_example_template", "")
|
||
special_example_template_none = multilingual_reported_name_config.get("special_example_template_none", "")
|
||
value_examples = multilingual_reported_name_config.get("value_examples", [])
|
||
fund_example = multilingual_reported_name_config.get("fund_example", "")
|
||
share_example = multilingual_reported_name_config.get("share_example", "")
|
||
instructions.append("Multilingual reported name:\n")
|
||
instructions.append(f"{describe}\n")
|
||
|
||
# set language the first char to be upper
|
||
language = self.language[0].upper() + self.language[1:]
|
||
for datapoint in datapoints:
|
||
mul_reported_name_list = self.non_english_reported_name_config.get(datapoint, [])
|
||
# shuffle the reported name list
|
||
mul_reported_name_list = list(set(mul_reported_name_list))
|
||
if len(mul_reported_name_list) == 0:
|
||
continue
|
||
datapoint_name = datapoint
|
||
if datapoint_name == "performance_fee":
|
||
datapoint_name = "performance fees"
|
||
else:
|
||
datapoint_name = datapoint_name.upper()
|
||
example_count = 1
|
||
none_value_example_count = 0
|
||
for mul_reported_name in mul_reported_name_list:
|
||
if datapoint in ["ter", "performance_fee"] and example_count >= 3:
|
||
break
|
||
value = value_examples[example_count % len(value_examples)]
|
||
answer = {"fund name": fund_example,
|
||
"share name": share_example,
|
||
datapoint: float(value.replace(",", "."))}
|
||
# transfer answer to string
|
||
answer = json.dumps(answer, ensure_ascii=False)
|
||
example = regular_example_template.format(
|
||
datapoint=datapoint_name,
|
||
number=example_count,
|
||
language=language,
|
||
fund_name=fund_example,
|
||
share_name=share_example,
|
||
reported_name=mul_reported_name,
|
||
value=value,
|
||
answer=answer,
|
||
)
|
||
instructions.append(example)
|
||
instructions.append("\n")
|
||
instructions.append("\n")
|
||
|
||
example_count += 1
|
||
if len(mul_reported_name.split()) > 1:
|
||
if none_value_example_count != 2:
|
||
none_value_example = special_example_template_none.format(
|
||
datapoint=datapoint_name,
|
||
number=example_count,
|
||
language=language,
|
||
fund_name=fund_example,
|
||
share_name=share_example,
|
||
reported_name=mul_reported_name,
|
||
answer = json.dumps({}, ensure_ascii=False)
|
||
)
|
||
instructions.append(none_value_example)
|
||
instructions.append("\n")
|
||
instructions.append("\n")
|
||
example_count += 1
|
||
none_value_example_count += 1
|
||
|
||
instructions.append("\n")
|
||
instructions.append("## Data business features\n")
|
||
data_business_features = self.instructions_config.get(
|
||
"data_business_features", {}
|
||
)
|
||
common = "\n".join(data_business_features.get("common", []))
|
||
instructions.append(common)
|
||
instructions.append("\n")
|
||
|
||
instructions.append("## Datapoints investment level\n")
|
||
investment_level_info = data_business_features.get("investment_level", {})
|
||
for datapoint in datapoints:
|
||
investment_level = investment_level_info.get(datapoint, "")
|
||
instructions.append(investment_level)
|
||
instructions.append("\n")
|
||
instructions.append("\n")
|
||
|
||
instructions.append("## Datapoints value range\n")
|
||
data_value_range_info = data_business_features.get("data_value_range", {})
|
||
for datapoint in datapoints:
|
||
data_value_range = data_value_range_info.get(datapoint, "")
|
||
instructions.append(data_value_range)
|
||
instructions.append("\n")
|
||
instructions.append("\n")
|
||
|
||
special_rule_info = data_business_features.get("special_rule", {})
|
||
# The reason why apply special_rule_by_keywords is:
|
||
# 1. The special rule is very complex, prompsts are very long.
|
||
# 2. To load it by keywords, is to avoid for simple case, the prompts are too long.
|
||
complex_special_rule = data_business_features.get("sepcial_rule_by_keywords", "")
|
||
with_special_rule_title = False
|
||
found_sub_datapoints = []
|
||
datapoint_special_rule = {}
|
||
for datapoint in datapoints:
|
||
# If some complex special rule is found, and with sub datapoints,
|
||
# need not to load relevant rule again.
|
||
if datapoint in found_sub_datapoints:
|
||
continue
|
||
find_complex_special_rule = False
|
||
if page_text is not None and len(page_text) > 0:
|
||
complex_special_rule_list = complex_special_rule.get(datapoint, [])
|
||
for complex_special_rule in complex_special_rule_list:
|
||
complex_keywords = complex_special_rule.get("keywords", [])
|
||
if len(complex_keywords) == 0:
|
||
continue
|
||
# support keywords to be pure text or regex
|
||
keywords_is_regex = complex_special_rule.get("keywords_is_regex", False)
|
||
exist_keywords = False
|
||
for special_keywords in complex_keywords:
|
||
if keywords_is_regex:
|
||
if re.search(special_keywords, page_text) is not None:
|
||
exist_keywords = True
|
||
break
|
||
else:
|
||
special_keywrods_regex = add_slash_to_text_as_regex(special_keywords)
|
||
if special_keywords in page_text or \
|
||
re.search(special_keywrods_regex, page_text) is not None:
|
||
exist_keywords = True
|
||
break
|
||
if exist_keywords:
|
||
complex_prompts_list = complex_special_rule.get("prompts", [])
|
||
if len(complex_prompts_list) > 0:
|
||
if not with_special_rule_title:
|
||
instructions.append("## Special rule\n")
|
||
with_special_rule_title = True
|
||
complex_prompts = "\n".join(complex_prompts_list)
|
||
instructions.append(complex_prompts)
|
||
instructions.append("\n\n")
|
||
find_complex_special_rule = True
|
||
# If the complex special rule is found, need to find the sub datapoints
|
||
# and add them to the found_sub_datapoints list.
|
||
sub_datapoints = complex_special_rule.get("sub_datapoints", [])
|
||
if len(sub_datapoints) > 0:
|
||
found_sub_datapoints.extend(sub_datapoints)
|
||
if find_complex_special_rule:
|
||
continue
|
||
special_rule_list = special_rule_info.get(datapoint, [])
|
||
if len(special_rule_list) > 0:
|
||
datapoint_special_rule[datapoint] = special_rule_list
|
||
if len(list(datapoint_special_rule.keys())) > 0:
|
||
for datapoint, special_rule_list in datapoint_special_rule.items():
|
||
if datapoint in found_sub_datapoints:
|
||
continue
|
||
if not with_special_rule_title:
|
||
instructions.append("## Special rule\n")
|
||
with_special_rule_title = True
|
||
special_rule = "\n".join(special_rule_list)
|
||
instructions.append(special_rule)
|
||
instructions.append("\n\n")
|
||
|
||
instructions.append("\n")
|
||
|
||
instructions.append("## Special cases\n")
|
||
special_cases = self.instructions_config.get("special_cases", {})
|
||
special_cases_common_list = special_cases.get("common", [])
|
||
special_cases_number = 1
|
||
for special_cases_common in special_cases_common_list:
|
||
title = special_cases_common.get("title", "")
|
||
title = f"{special_cases_number}. {title} "
|
||
special_cases_number += 1
|
||
instructions.append(title)
|
||
instructions.append("\n")
|
||
contents_list = special_cases_common.get("contents", [])
|
||
contents = "\n".join(contents_list)
|
||
instructions.append(contents)
|
||
instructions.append("\n")
|
||
|
||
for datapoint in datapoints:
|
||
special_case_list = special_cases.get(datapoint, [])
|
||
for special_case in special_case_list:
|
||
title = special_case.get("title", "")
|
||
title = f"{special_cases_number}. {title} "
|
||
special_cases_number += 1
|
||
instructions.append(title)
|
||
instructions.append("\n")
|
||
contents_list = special_case.get("contents", [])
|
||
contents = "\n".join(contents_list)
|
||
instructions.append(contents)
|
||
instructions.append("\n")
|
||
|
||
instructions.append("## Output requirement\n")
|
||
output_requirement = self.instructions_config.get("output_requirement", {})
|
||
output_requirement_common_list = output_requirement.get("common", [])
|
||
instructions.append("\n".join(output_requirement_common_list))
|
||
instructions.append("\n")
|
||
|
||
fund_datapoint_value_example = {}
|
||
fund_level_config = output_requirement.get("fund_level", {})
|
||
|
||
share_datapoint_value_example = {}
|
||
share_level_config = output_requirement.get("share_level", {})
|
||
|
||
example_list = []
|
||
dp_reported_name_config = output_requirement.get("dp_reported_name", {})
|
||
dp_reported_name = {}
|
||
for datapoint in datapoints:
|
||
investment_level = self.datapoint_level_config.get(datapoint, "")
|
||
if investment_level == "fund_level":
|
||
# fund_level_example_list = output_requirement.get("fund_level", [])
|
||
# for example in fund_level_example_list:
|
||
# try:
|
||
# sub_example_list = json.loads(example)
|
||
# except:
|
||
# sub_example_list = json_repair.loads(example)
|
||
# example_list.extend(sub_example_list)
|
||
fund_datapoint_value_example[datapoint] = fund_level_config.get(
|
||
f"{datapoint}_value", []
|
||
)
|
||
elif investment_level == "share_level":
|
||
share_datapoint_value_example[datapoint] = share_level_config.get(
|
||
f"{datapoint}_value", []
|
||
)
|
||
dp_reported_name[datapoint] = dp_reported_name_config.get(datapoint, "")
|
||
|
||
instructions.append(f"Example:\n")
|
||
|
||
fund_datapoint_list = list(fund_datapoint_value_example.keys())
|
||
if len(fund_datapoint_list) > 0:
|
||
fund_name_example_list = fund_level_config.get("fund_name", [])
|
||
for index in range(len(fund_name_example_list)):
|
||
example_dict = {
|
||
"fund name": fund_name_example_list[index],
|
||
}
|
||
for fund_datapoint in fund_datapoint_list:
|
||
fund_datapoint_values = fund_datapoint_value_example[fund_datapoint]
|
||
if index < len(fund_datapoint_values):
|
||
example_dict[fund_datapoint] = fund_datapoint_values[index]
|
||
example_list.append(example_dict)
|
||
|
||
share_datapoint_list = list(share_datapoint_value_example.keys())
|
||
if len(share_datapoint_list) > 0:
|
||
fund_name_example_list = share_level_config.get("fund_name", [])
|
||
share_name_example_list = share_level_config.get("share_name", [])
|
||
|
||
for index in range(len(fund_name_example_list)):
|
||
example_dict = {
|
||
"fund name": fund_name_example_list[index],
|
||
"share name": share_name_example_list[index],
|
||
}
|
||
for share_datapoint in share_datapoint_list:
|
||
share_datapoint_values = share_datapoint_value_example[share_datapoint]
|
||
if index < len(share_datapoint_values):
|
||
example_dict[share_datapoint] = share_datapoint_values[index]
|
||
example_list.append(example_dict)
|
||
example_data = {"data": example_list, "dp_reported_name": dp_reported_name}
|
||
instructions.append(json.dumps(example_data, ensure_ascii=False, indent=4))
|
||
instructions.append("\n")
|
||
instructions.append("\n")
|
||
|
||
end_list = self.instructions_config.get("end", [])
|
||
instructions.append("\n".join(end_list))
|
||
instructions.append("\n")
|
||
|
||
if need_exclude and exclude_data is not None and isinstance(exclude_data, list):
|
||
instructions.append("Please exclude below data from output:\n")
|
||
instructions.append(json.dumps(exclude_data, ensure_ascii=False, indent=4))
|
||
instructions.append("\n")
|
||
instructions.append("\n")
|
||
instructions.append("Answer:\n")
|
||
|
||
instructions_text = "".join(instructions)
|
||
return instructions_text
|
||
|
||
# def chat_by_split_context(self,
|
||
# page_text: str,
|
||
# page_datapoints: list,
|
||
# need_exclude: bool,
|
||
# exclude_data: list) -> list:
|
||
# """
|
||
# If occur error, split the context to two parts and try to get data from the two parts
|
||
# Relevant document: 503194284, page index 147
|
||
# """
|
||
# try:
|
||
# logger.info(f"Split context to get data to fix issue which output length is over 4K tokens")
|
||
# split_context = re.split(r"\n", page_text)
|
||
# split_context = [text.strip() for text in split_context
|
||
# if len(text.strip()) > 0]
|
||
# if len(split_context) < 10:
|
||
# return {"data": []}
|
||
|
||
# split_context_len = len(split_context)
|
||
# top_10_context = split_context[:10]
|
||
# rest_context = split_context[10:]
|
||
# header = "\n".join(top_10_context)
|
||
# half_len = split_context_len // 2
|
||
# # the member of half_len should not start with number
|
||
# # reverse iterate the list by half_len
|
||
# half_len_list = [i for i in range(half_len)]
|
||
|
||
# fund_name_line = ""
|
||
# half_line = rest_context[half_len].strip()
|
||
# max_similarity_fund_name, max_similarity = get_most_similar_name(
|
||
# half_line, self.provider_fund_name_list, matching_type="fund"
|
||
# )
|
||
# if max_similarity < 0.2:
|
||
# # get the fund name line text from the first half
|
||
# for index in reversed(half_len_list):
|
||
# line_text = rest_context[index].strip()
|
||
# if len(line_text) == 0:
|
||
# continue
|
||
# line_text_split = line_text.split()
|
||
# if len(line_text_split) < 3:
|
||
# continue
|
||
# first_word = line_text_split[0]
|
||
# if first_word.lower() == "class":
|
||
# continue
|
||
|
||
# max_similarity_fund_name, max_similarity = get_most_similar_name(
|
||
# line_text, self.provider_fund_name_list, matching_type="fund"
|
||
# )
|
||
# if max_similarity >= 0.2:
|
||
# fund_name_line = line_text
|
||
# break
|
||
# else:
|
||
# fund_name_line = half_line
|
||
# half_len += 1
|
||
# if fund_name_line == "":
|
||
# return {"data": []}
|
||
|
||
# logger.info(f"Split first part from 0 to {half_len}")
|
||
# split_first_part = "\n".join(split_context[:half_len])
|
||
# first_part = '\n'.join(split_first_part)
|
||
# first_instructions = self.get_instructions_by_datapoints(
|
||
# first_part, page_datapoints, need_exclude, exclude_data, extract_way="text"
|
||
# )
|
||
# response, with_error = chat(
|
||
# first_instructions, response_format={"type": "json_object"}
|
||
# )
|
||
# first_part_data = {"data": []}
|
||
# if not with_error:
|
||
# try:
|
||
# first_part_data = json.loads(response)
|
||
# except:
|
||
# first_part_data = json_repair.loads(response)
|
||
|
||
# logger.info(f"Split second part from {half_len} to {split_context_len}")
|
||
# split_second_part = "\n".join(split_context[half_len:])
|
||
# second_part = header + "\n" + fund_name_line + "\n" + split_second_part
|
||
# second_instructions = self.get_instructions_by_datapoints(
|
||
# second_part, page_datapoints, need_exclude, exclude_data, extract_way="text"
|
||
# )
|
||
# response, with_error = chat(
|
||
# second_instructions, response_format={"type": "json_object"}
|
||
# )
|
||
# second_part_data = {"data": []}
|
||
# if not with_error:
|
||
# try:
|
||
# second_part_data = json.loads(response)
|
||
# except:
|
||
# second_part_data = json_repair.loads(response)
|
||
|
||
# first_part_data_list = first_part_data.get("data", [])
|
||
# logger.info(f"First part data count: {len(first_part_data_list)}")
|
||
# second_part_data_list = second_part_data.get("data", [])
|
||
# logger.info(f"Second part data count: {len(second_part_data_list)}")
|
||
# for first_data in first_part_data_list:
|
||
# if first_data in second_part_data_list:
|
||
# second_part_data_list.remove(first_data)
|
||
# else:
|
||
# # if the first part data is with same fund name and share name,
|
||
# # remove the second part data
|
||
# first_data_dp = [key for key in list(first_data.keys())
|
||
# if key not in ["fund name", "share name"]]
|
||
# # order the data points
|
||
# first_data_dp.sort()
|
||
# first_fund_name = first_data.get("fund name", "")
|
||
# first_share_name = first_data.get("share name", "")
|
||
# if len(first_fund_name) > 0 and len(first_share_name) > 0:
|
||
# remove_second_list = []
|
||
# for second_data in second_part_data_list:
|
||
# second_fund_name = second_data.get("fund name", "")
|
||
# second_share_name = second_data.get("share name", "")
|
||
# if first_fund_name == second_fund_name and \
|
||
# first_share_name == second_share_name:
|
||
# second_data_dp = [key for key in list(second_data.keys())
|
||
# if key not in ["fund name", "share name"]]
|
||
# second_data_dp.sort()
|
||
# if first_data_dp == second_data_dp:
|
||
# remove_second_list.append(second_data)
|
||
# for remove_second in remove_second_list:
|
||
# if remove_second in second_part_data_list:
|
||
# second_part_data_list.remove(remove_second)
|
||
|
||
# data_list = first_part_data_list + second_part_data_list
|
||
# extract_data = {"data": data_list}
|
||
# return extract_data
|
||
# except Exception as e:
|
||
# logger.error(f"Error in split context: {e}")
|
||
# return {"data": []}
|