support identify aus prospectus document category: MIS or Super

This commit is contained in:
Blade He 2025-02-24 15:08:15 -06:00
parent bb6862b179
commit 75ea383354
5 changed files with 110 additions and 14 deletions

View File

@ -73,9 +73,36 @@ class DataExtraction:
self.datapoint_name_config = self.get_datapoint_name()
self.datapoint_reported_name_config, self.non_english_reported_name_config = \
self.get_datapoint_reported_name()
self.document_category = self.get_document_category()
self.extract_way = extract_way
self.output_image_folder = output_image_folder
def get_document_category(self):
document_category = None
if self.doc_source == "aus_prospectus":
first_4_page_text = ""
for page_index, page_text in self.page_text_dict.items():
if page_index > 3:
break
first_4_page_text += page_text + "\n"
first_4_page_text = clean_text(first_4_page_text)
document_category_prompt_file = os.path.join(self.instruction_folder, "document_category_prompts.json")
with open(document_category_prompt_file, "r", encoding="utf-8") as f:
document_category_prompt = "\n".join(json.load(f).get("prompts", []))
if len(document_category_prompt) > 0:
prompts = f"Context: \n{first_4_page_text}\n\Instructions: \n{document_category_prompt}"
result, with_error = chat(
prompt=prompts, response_format={"type": "json_object"}, max_tokens=1000
)
response = result.get("response", "")
if not with_error:
try:
data = json.loads(response)
document_category = data.get("document_category", None)
except:
pass
return document_category
def get_investment_objective_pages(self):
investment_objective_pages = []

View File

@ -0,0 +1,13 @@
{
"prompts": [
"In a prospectus for an MIS(Managed Investment Scheme) product youll typically see references to a “responsible entity”, a registration number (ARSN) and disclosures that comply with the Corporations Acts regime for managed investment schemes (e.g. pooling of funds, unit trusts, detailed product disclosures, and rules on redemption).\n",
"In contrast, a prospectus or product disclosure statement for a Super(superannuation) product will refer to superannuation or MySuper, include terms related to compulsory employer contributions, tax concessions, and comply with superannuation-specific legislation and guidelines (for example, those issued by APRA or the ATO).\n",
"In short, look at the headings, statutory references, product descriptions, and regulatory disclaimers: if they discuss “managed investment schemes” or “responsible entities” and related disclosure obligations under the Corporations Act, its an MIS document; if they mention superannuation, MySuper, employer contributions, and similar features, then it belongs to the Super regime.\n",
"• To determine the regime of a document, simply check the beginning pages where the fund is mentioned.",
"• If keywords like Pension, Transition to Retirement (TTR), Super, Income Stream, or Accumulation are found, the document belongs to the Super regime; In this case, the business rules associated with the Super regime should be applied.",
"If these keywords are not present, the document falls under the MIS regime.",
"Please identify whether the document belongs to the Super or MIS regime according to the context, and output answer as JSON format.",
"The example is: {\"document_category\": \"Super\"}\n",
"Answer:\n"
]
}

74
main.py
View File

@ -1221,9 +1221,11 @@ def merge_output_data_aus_prospectus(
):
# TODO: merge output data for aus prospectus, plan to realize it on 2025-01-16
data_df = pd.read_excel(data_file_path, sheet_name="total_mapping_data")
data_df.fillna("", inplace=True)
document_mapping_df = pd.read_excel(
document_mapping_file, sheet_name="document_mapping"
)
document_mapping_df.fillna("", inplace=True)
# set doc_id to be string type
data_df["doc_id"] = data_df["doc_id"].astype(str)
document_mapping_df["DocumentId"] = document_mapping_df["DocumentId"].astype(str)
@ -1309,7 +1311,7 @@ def merge_output_data_aus_prospectus(
break
if len(share_class_id) > 0 and data["sec_id"] == share_class_id:
update_key = datapoint
if len(data[update_key]) == 0:
if len(str(data[update_key])) == 0:
data[update_key] = value
if page_index not in data["page_index"]:
data["page_index"].append(page_index)
@ -1373,15 +1375,69 @@ def merge_output_data_aus_prospectus(
total_data_df.to_excel(writer, index=False, sheet_name="total_data")
def get_aus_prospectus_document_category():
document_sample_file = (
r"./sample_documents/aus_prospectus_17_documents_sample.txt"
)
with open(document_sample_file, "r", encoding="utf-8") as f:
special_doc_id_list = [doc_id.strip() for doc_id in f.readlines()]
pdf_folder: str = r"/data/aus_prospectus/pdf/"
output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
output_extract_data_child_folder: str = (
r"/data/aus_prospectus/output/extract_data/docs/"
)
output_mapping_child_folder: str = (
r"/data/aus_prospectus/output/mapping_data/docs/"
)
drilldown_folder = r"/data/aus_prospectus/output/drilldown/"
doc_source = "aus_prospectus"
extract_way = "text"
document_category_dict = {}
for doc_id in special_doc_id_list:
emea_ar_parsing = EMEA_AR_Parsing(
doc_id,
doc_source=doc_source,
pdf_folder=pdf_folder,
output_pdf_text_folder=output_pdf_text_folder,
output_extract_data_folder=output_extract_data_child_folder,
output_mapping_data_folder=output_mapping_child_folder,
extract_way=extract_way,
drilldown_folder=drilldown_folder,
compare_with_provider=False
)
data_extraction = DataExtraction(
doc_source=emea_ar_parsing.doc_source,
doc_id=emea_ar_parsing.doc_id,
pdf_file=emea_ar_parsing.pdf_file,
output_data_folder=emea_ar_parsing.output_extract_data_folder,
page_text_dict=emea_ar_parsing.page_text_dict,
datapoint_page_info=emea_ar_parsing.datapoint_page_info,
datapoints=emea_ar_parsing.datapoints,
document_mapping_info_df=emea_ar_parsing.document_mapping_info_df,
extract_way=extract_way
)
logger.info(f"Document: {doc_id}, category: {data_extraction.document_category}")
document_category_dict[doc_id] = data_extraction.document_category
output_extract_document_category_folder: str = (
r"/data/aus_prospectus/output/document_category/"
)
os.makedirs(output_extract_document_category_folder, exist_ok=True)
output_file = os.path.join(output_extract_document_category_folder, "document_category.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(document_category_dict, f, ensure_ascii=False, indent=4)
logger.info(f"Document category: {document_category_dict}")
if __name__ == "__main__":
get_aus_prospectus_document_category()
# test_data_extraction_metrics()
data_file_path = r"/data/aus_prospectus/output/mapping_data/total/mapping_data_info_17_documents_by_text_20250219123515.xlsx"
document_mapping_file_path = r"/data/aus_prospectus/basic_information/17_documents/aus_prospectus_17_documents_mapping.xlsx"
merged_total_data_folder = r'/data/aus_prospectus/output/mapping_data/total/merged/'
os.makedirs(merged_total_data_folder, exist_ok=True)
data_file_base_name = os.path.basename(data_file_path)
output_data_file_path = os.path.join(merged_total_data_folder, "merged_" + data_file_base_name)
merge_output_data_aus_prospectus(data_file_path, document_mapping_file_path, output_data_file_path)
# data_file_path = r"/data/aus_prospectus/output/mapping_data/total/mapping_data_info_17_documents_by_text_20250219123515.xlsx"
# document_mapping_file_path = r"/data/aus_prospectus/basic_information/17_documents/aus_prospectus_17_documents_mapping.xlsx"
# merged_total_data_folder = r'/data/aus_prospectus/output/mapping_data/total/merged/'
# os.makedirs(merged_total_data_folder, exist_ok=True)
# data_file_base_name = os.path.basename(data_file_path)
# output_data_file_path = os.path.join(merged_total_data_folder, "merged_" + data_file_base_name)
# merge_output_data_aus_prospectus(data_file_path, document_mapping_file_path, output_data_file_path)
# doc_source = "aus_prospectus"
# sample_document_list_folder: str = r'./sample_documents/'
@ -1429,7 +1485,7 @@ if __name__ == "__main__":
# "555377021",
# "555654388",
# ]
# special_doc_id_list: list = ["391080133"]
special_doc_id_list: list = ["391080133"]
pdf_folder: str = r"/data/aus_prospectus/pdf/"
output_pdf_text_folder: str = r"/data/aus_prospectus/output/pdf_text/"
output_extract_data_child_folder: str = (

View File

@ -1469,11 +1469,11 @@ def prepare_multi_fund_aus_prospectus_document(data_folder: str = r"/data/aus_pr
if __name__ == "__main__":
# pdf_exist()
# prepare_multi_fund_aus_prospectus_document()
merge_aus_document_prospectus_data(aus_data_folder=r"/data/aus_prospectus/basic_information/biz_rule/",
aus_document_mapping_file="phase1_document_mapping.xlsx",
aus_prospectus_data_file="phase1_aus_prospectus_data.xlsx",
merge_aus_document_prospectus_data(aus_data_folder=r"/data/aus_prospectus/basic_information/17_documents/",
aus_document_mapping_file="aus_prospectus_17_documents_mapping.xlsx",
aus_prospectus_data_file="aus_prospectus_data_17_documents_secid.xlsx",
document_mapping_sheet="document_mapping",
output_file="phase1_aus_document_prospectus.xlsx",
output_file="aus_prospectus_17_documents_data.xlsx",
output_sheet="aus_document_prospectus")
folder = r"/data/emea_ar/basic_information/English/sample_doc/emea_11_06_case/"
file_name = "doc_ar_data_for_emea_11_06.xlsx"

View File

@ -79,7 +79,7 @@ def query_investment_by_provider(company_id: str, rerun=True, output_folder=r"./
def query_data_by_biz_type(biztype: str, para, return_df: bool):
sqlpass_url = "https://api.morningstar.com/sqlpassapi/v1/sql"
sqlpass_url = os.getenv("SQL_PASS_URL")
url = sqlpass_url + "?sqlName={0}&params={1}".format(biztype, str(para))
headers = {"ApiKey": os.getenv("SQL_PASS_KEY")}
if return_df: