import pandas as pd import os from tqdm import tqdm import json from glob import glob import fitz import re import time import traceback import json_repair from utils.logger import logger from utils.pdf_download import download_pdf_from_documents_warehouse from utils.pdf_util import PDFUtil def get_unique_docids_from_doc_provider_data(doc_provider_file_path: str): doc_provider_data = pd.read_excel(doc_provider_file_path) # get new data by grouping by docid, and count the number of rows for each docid, # set the new data with 2 columns: docid and provider_count doc_provider_count = ( doc_provider_data.groupby("DocumentId") .size() .reset_index(name="provider_count") ) # sort new data by provider_count in descending order doc_provider_count = doc_provider_count.sort_values( by="provider_count", ascending=False ) # save excel by doc_provider_data and new_data with pd.ExcelWriter(doc_provider_file_path) as writer: doc_provider_data.to_excel( writer, sheet_name="doc_provider_details", index=False ) doc_provider_count.to_excel( writer, sheet_name="doc_provider_count", index=False ) def download_pdf(doc_provider_file_path: str, sheet_name: str, pdf_path: str, doc_id_column: str = "DocumentId"): document_data = pd.read_excel(doc_provider_file_path, sheet_name=sheet_name) # get all unique docids as list doc_id_list = [ str(doc_id) for doc_id in document_data[doc_id_column].unique().tolist() ] # download pdfs logger.info(f"Start downloading {len(doc_id_list)} pdfs") os.makedirs(pdf_path, exist_ok=True) for doc_id in tqdm(doc_id_list): logger.info(f"Downloading pdf for docid: {doc_id}") download_pdf_from_documents_warehouse(pdf_directory=pdf_path, doc_id=doc_id) time.sleep(1) def output_pdf_page_text(pdf_folder: str, output_folder: str): if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder): logger.error(f"Invalid pdf_folder: {pdf_folder}") return if output_folder is None or len(output_folder) == 0: logger.error(f"Invalid output_folder: {output_folder}") return os.makedirs(output_folder, exist_ok=True) pdf_files = glob(os.path.join(pdf_folder, "*.pdf")) logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}") for pdf_file in pdf_files: logger.info(f"Start processing {pdf_file}") pdf_util = PDFUtil(pdf_file) success, text, page_text_dict = pdf_util.extract_text( output_folder=output_folder ) if success: logger.info(f"Successfully extracted text from {pdf_file}") def analyze_json_error(): text_file = r"/data/emea_ar/output/pdf_table_prompts/445877368_4.txt" with open(text_file, "r", encoding="utf-8") as file: text = file.read() json_response = re.search(r"\`\`\`json([\s\S]*)\`\`\`", text) if json_response: json_text = json_response.group(1) json_data = {"tables": []} try: json_data = json.loads(json_text) except: json_data = json_repair.loads(json_text) table_list = json_data.get("tables", []) for table_num, table in enumerate(table_list): table_md_file = os.path.join("/temp/", f"temp_{table_num}.md") table = re.sub(r"(\n)+", "\n", table) with open(table_md_file, "w", encoding="utf-8") as file: file.write(table) def statistics_document( pdf_folder: str, doc_mapping_file_path: str, doc_ar_data_file_path: str, mapping_sheet_name: str = "Sheet1", ar_data_sheet_name: str = "doc_ar_data_in_db", output_folder: str = "/data/emea_ar/basic_information/English/", output_file: str = "doc_mapping_statistics_data.xlsx" ): if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder): logger.error(f"Invalid pdf_folder: {pdf_folder}") return if ( doc_mapping_file_path is None or len(doc_mapping_file_path) == 0 or not os.path.exists(doc_mapping_file_path) ): logger.error(f"Invalid doc_mapping_file_path: {doc_mapping_file_path}") return if output_folder is None or len(output_folder) == 0: logger.error(f"Invalid output_folder: {output_folder}") return os.makedirs(output_folder, exist_ok=True) describe_stat_df_list = [] # statistics document mapping information doc_mapping_data = pd.read_excel(doc_mapping_file_path, sheet_name=mapping_sheet_name) # statistics doc_mapping_data for counting FundId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundId count based on DocumentId" ) doc_fund_id_df = doc_mapping_data[["DocumentId", "FundId"]].drop_duplicates() doc_fund_count = ( doc_fund_id_df.groupby("DocumentId").size().reset_index(name="fund_count") ) # order by fund_count in descending order doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=False) # statistics fund_count in doc_fund_count by describe and transform to DataFrame doc_fund_count_stat_df = get_describe_stat( doc_fund_count, "fund_count", "doc_fund_count" ) describe_stat_df_list.append(doc_fund_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundClassId count based on DocumentId" ) doc_share_class_id_df = doc_mapping_data[ ["DocumentId", "FundClassId"] ].drop_duplicates() doc_share_class_count = ( doc_share_class_id_df.groupby("DocumentId") .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order doc_share_class_count = doc_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in doc_share_class_count by describe and transform to DataFrame doc_share_class_count_stat_df = get_describe_stat( doc_share_class_count, "share_class_count", "doc_share_class_count" ) describe_stat_df_list.append(doc_share_class_count_stat_df) # statistics doc_mapping_data for counting FundId count based on CompanyId and CompanyName logger.info( "statistics doc_mapping_data for counting FundId count based on CompanyId and CompanyName" ) provider_fund_id_df = doc_mapping_data[ ["CompanyId", "CompanyName", "FundId"] ].drop_duplicates() provider_fund_count = ( provider_fund_id_df.groupby(["CompanyId", "CompanyName"]) .size() .reset_index(name="fund_count") ) # order by fund_count in descending order provider_fund_count = provider_fund_count.sort_values( by="fund_count", ascending=False ) # statistics fund_count in provider_fund_count by describe and transform to DataFrame provider_fund_count_stat_df = get_describe_stat( provider_fund_count, "fund_count", "provider_fund_count" ) describe_stat_df_list.append(provider_fund_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on CompanyId logger.info( "statistics doc_mapping_data for counting FundClassId count based on CompanyId" ) provider_share_class_id_df = doc_mapping_data[ ["CompanyId", "CompanyName", "FundClassId"] ].drop_duplicates() provider_share_class_count = ( provider_share_class_id_df.groupby(["CompanyId", "CompanyName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order provider_share_class_count = provider_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in provider_share_class_count by describe and transform to DataFrame provider_share_class_count_stat_df = get_describe_stat( provider_share_class_count, "share_class_count", "provider_share_class_count" ) describe_stat_df_list.append(provider_share_class_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName logger.info( "statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName" ) fund_share_class_id_df = doc_mapping_data[ ["FundId", "FundLegalName", "FundClassId"] ].drop_duplicates() fund_share_class_count = ( fund_share_class_id_df.groupby(["FundId", "FundLegalName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in fund_share_class_count fund_share_class_count = fund_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in fund_share_class_count by describe and transform to DataFrame fund_share_class_count_stat_df = get_describe_stat( fund_share_class_count, "share_class_count", "fund_share_class_count" ) describe_stat_df_list.append(fund_share_class_count_stat_df) stat_file = os.path.join(output_folder, output_file) doc_id_list = [str(docid) for docid in doc_mapping_data["DocumentId"].unique().tolist()] # statistics document page number pdf_files = glob(os.path.join(pdf_folder, "*.pdf")) logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}") logger.info("statistics document page number") doc_page_num_list = [] for pdf_file in tqdm(pdf_files): pdf_base_name = os.path.basename(pdf_file).replace(".pdf", "") if pdf_base_name not in doc_id_list: continue docid = os.path.basename(pdf_file).split(".")[0] doc = fitz.open(pdf_file) page_num = doc.page_count doc_page_num_list.append({"DocumentId": docid, "page_num": page_num}) doc.close() doc_page_num_df = pd.DataFrame(doc_page_num_list) # order by page_num in descending order doc_page_num_df = doc_page_num_df.sort_values(by="page_num", ascending=False) # statistics page_num by describe and transform to DataFrame doc_page_num_stat_df = get_describe_stat( doc_page_num_df, "page_num", "doc_page_num" ) describe_stat_df_list.append(doc_page_num_stat_df) describe_stat_df = pd.concat(describe_stat_df_list) describe_stat_df.reset_index(drop=True, inplace=True) doc_dp_data_df = None if doc_ar_data_file_path is not None and os.path.exists(doc_ar_data_file_path): doc_ar_data = pd.read_excel(doc_ar_data_file_path, sheet_name=ar_data_sheet_name) doc_dp_result = get_document_with_all_4_data_points(None, None, doc_ar_data) doc_dp_data_list = [] for doc_id in doc_id_list: doc_id = int(doc_id) doc_dp_data = {"DocumentId": doc_id, "tor": 0, "ter": 0, "ogc": 0, "perf_fee": 0} if doc_id in doc_dp_result["tor"]: doc_dp_data["tor"] = 1 if doc_id in doc_dp_result["ter"]: doc_dp_data["ter"] = 1 if doc_id in doc_dp_result["ogc"]: doc_dp_data["ogc"] = 1 if doc_id in doc_dp_result["perf_fee"]: doc_dp_data["perf_fee"] = 1 doc_dp_data_list.append(doc_dp_data) doc_dp_data_df = pd.DataFrame(doc_dp_data_list) doc_dp_data_df = doc_dp_data_df.sort_values(by="DocumentId", ascending=True) doc_dp_data_df.reset_index(drop=True, inplace=True) # set all of DocumentId in DataFrame objects to be string type doc_page_num_df["DocumentId"] = doc_page_num_df["DocumentId"].astype(str) doc_fund_count["DocumentId"] = doc_fund_count["DocumentId"].astype(str) doc_share_class_count["DocumentId"] = doc_share_class_count["DocumentId"].astype(str) if doc_dp_data_df is not None: doc_dp_data_df["DocumentId"] = doc_dp_data_df["DocumentId"].astype(str) # merge statistics data for doc_page_num_df, doc_dp_data_df, doc_fund_count, doc_share_class_count based on DocumentId doc_page_num_df = doc_page_num_df.merge(doc_fund_count, on="DocumentId", how="left") doc_page_num_df = doc_page_num_df.merge(doc_share_class_count, on="DocumentId", how="left") if doc_dp_data_df is not None: doc_page_num_df = doc_page_num_df.merge(doc_dp_data_df, on="DocumentId", how="left") # save statistics data to excel with pd.ExcelWriter(stat_file) as writer: doc_page_num_df.to_excel(writer, sheet_name="doc_level_stats", index=False) # doc_dp_data_df.to_excel(writer, sheet_name="doc_dp_data", index=False) # doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False) # doc_share_class_count.to_excel( # writer, sheet_name="doc_share_class_count", index=False # ) provider_fund_count.to_excel( writer, sheet_name="provider_fund_count", index=False ) provider_share_class_count.to_excel( writer, sheet_name="provider_share_class_count", index=False ) fund_share_class_count.to_excel( writer, sheet_name="fund_share_class_count", index=False ) describe_stat_df.to_excel( writer, sheet_name="all_describe_statistics", index=False ) def get_document_with_all_4_data_points(folder: str, file_name: str, data: pd.DataFrame): if data is None: file_path = os.path.join(folder, file_name) if os.path.exists(file_path): data = pd.read_excel(file_path, sheet_name="doc_ar_data_in_db") else: logger.error(f"Invalid file path: {file_path}") return # get document id list which noTor is 0 noTor_0_doc_id_list = data[data["noTor"] == 0]["DocumentId"].unique().tolist() # get document id list which share_noTer is 0 share_noTer_0_doc_id_list = data[data["share_noTer"] == 0]["DocumentId"].unique().tolist() # get document id list which share_noOgc is 0 share_noOgc_0_doc_id_list = data[data["share_noOgc"] == 0]["DocumentId"].unique().tolist() # get document id list which share_noPerfFee is 0 share_noPerfFee_0_doc_id_list = data[data["share_noPerfFee"] == 0]["DocumentId"].unique().tolist() logger.info(f"noTor_0_doc_id_list: {len(noTor_0_doc_id_list)}") logger.info(f"share_noTer_0_doc_id_list: {len(share_noTer_0_doc_id_list)}") logger.info(f"share_noOgc_0_doc_id_list: {len(share_noOgc_0_doc_id_list)}") logger.info(f"share_noPerfFee_0_doc_id_list: {len(share_noPerfFee_0_doc_id_list)}") all_4_data_points_doc_id_list = list(set(noTor_0_doc_id_list) & set(share_noTer_0_doc_id_list) & set(share_noOgc_0_doc_id_list) & set(share_noPerfFee_0_doc_id_list)) logger.info(f"all_4_data_points_doc_id_list: {len(all_4_data_points_doc_id_list)}") result = {"tor": noTor_0_doc_id_list, "ter": share_noTer_0_doc_id_list, "ogc": share_noOgc_0_doc_id_list, "perf_fee": share_noPerfFee_0_doc_id_list} return result def statistics_provider_mapping(provider_mapping_data_file: str, output_folder: str): if ( provider_mapping_data_file is None or len(provider_mapping_data_file) == 0 or not os.path.exists(provider_mapping_data_file) ): logger.error( f"Invalid provider_mapping_data_file: {provider_mapping_data_file}" ) return provider_mapping_data = pd.read_excel(provider_mapping_data_file) describe_stat_df_list = [] # statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName logger.info( "statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName" ) provider_fund_id_df = provider_mapping_data[ ["CompanyId", "CompanyName", "FundId"] ].drop_duplicates() provider_fund_count = ( provider_fund_id_df.groupby(["CompanyId", "CompanyName"]) .size() .reset_index(name="fund_count") ) # order by fund_count in descending order provider_fund_count = provider_fund_count.sort_values( by="fund_count", ascending=False ) # statistics fund_count in provider_fund_count by describe and transform to DataFrame provider_fund_count_stat_df = get_describe_stat( provider_fund_count, "fund_count", "provider_fund_count" ) describe_stat_df_list.append(provider_fund_count_stat_df) # Get the fund_count sum of all companies all_companies_fund_count_sum = provider_fund_count["fund_count"].sum() top_n_company_fund_count_list = [] # Get the fund_count sum of top 5 companies top_5_companies_fund_count, top_5_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 5, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 5, "fund_count": top_5_companies_fund_count, "percent": top_5_companies_fund_count_percent, } ) logger.info(f"Top 5 companies fund count sum: {top_5_companies_fund_count}") # Get the fund_count sum of top 10 companies top_10_companies_fund_count, top_10_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 10, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 10, "fund_count": top_10_companies_fund_count, "percent": top_10_companies_fund_count_percent, } ) logger.info(f"Top 10 companies fund count sum: {top_10_companies_fund_count}") # Get the fund_count sum of top 50 companies top_50_companies_fund_count, top_50_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 50, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 50, "fund_count": top_50_companies_fund_count, "percent": top_50_companies_fund_count_percent, } ) logger.info(f"Top 50 companies fund count sum: {top_50_companies_fund_count}") # Get the fund_count sum of top 100 companies top_100_companies_fund_count, top_100_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 100, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 100, "fund_count": top_100_companies_fund_count, "percent": top_100_companies_fund_count_percent, } ) top_n_company_fund_count_list.append( { "top_n_providers": len(provider_fund_count), "fund_count": all_companies_fund_count_sum, "percent": 100, } ) logger.info(f"Top 100 companies fund count sum: {top_100_companies_fund_count}") top_n_company_fund_count_df = pd.DataFrame(top_n_company_fund_count_list) # statistics provider_mapping_data for counting FundClassId count based on CompanyId and CompanyName logger.info( "statistics provider_mapping_data for counting SecId count based on CompanyId and CompanyName" ) provider_share_class_id_df = provider_mapping_data[ ["CompanyId", "CompanyName", "SecId"] ].drop_duplicates() provider_share_class_count = ( provider_share_class_id_df.groupby(["CompanyId", "CompanyName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order provider_share_class_count = provider_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in provider_share_class_count by describe and transform to DataFrame provider_share_class_count_stat_df = get_describe_stat( provider_share_class_count, "share_class_count", "provider_share_class_count" ) describe_stat_df_list.append(provider_share_class_count_stat_df) # Get the fund_count sum of all companies all_companies_share_class_count_sum = provider_share_class_count[ "share_class_count" ].sum() top_n_company_share_class_count_list = [] # Get the fund_count sum of top 5 companies top_5_companies_share_class_count, top_5_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 5, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 5, "share_class_count": top_5_companies_share_class_count, "percent": top_5_companies_share_class_count_percent, } ) logger.info( f"Top 5 companies share class count sum: {top_5_companies_share_class_count}" ) # Get the fund_count sum of top 10 companies top_10_companies_share_class_count, top_10_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 10, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 10, "share_class_count": top_10_companies_share_class_count, "percent": top_10_companies_share_class_count_percent, } ) logger.info( f"Top 10 companies share class count sum: {top_10_companies_share_class_count}" ) # Get the fund_count sum of top 50 companies top_50_companies_share_class_count, top_50_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 50, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 50, "share_class_count": top_50_companies_share_class_count, "percent": top_50_companies_share_class_count_percent, } ) logger.info( f"Top 50 companies share class count sum: {top_50_companies_share_class_count}" ) # Get the fund_count sum of top 100 companies top_100_companies_share_class_count, top_100_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 100, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 100, "share_class_count": top_100_companies_share_class_count, "percent": top_100_companies_share_class_count_percent, } ) logger.info( f"Top 100 companies share class count sum: {top_100_companies_share_class_count}" ) top_n_company_share_class_count_list.append( { "top_n_providers": len(provider_share_class_count), "share_class_count": all_companies_share_class_count_sum, "percent": 100, } ) top_n_company_share_class_count_df = pd.DataFrame( top_n_company_share_class_count_list ) # statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName logger.info( "statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName" ) fund_share_class_id_df = provider_mapping_data[ ["FundId", "FundLegalName", "SecId"] ].drop_duplicates() fund_share_class_count = ( fund_share_class_id_df.groupby(["FundId", "FundLegalName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in fund_share_class_count fund_share_class_count = fund_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in fund_share_class_count by describe and transform to DataFrame fund_share_class_count_stat_df = get_describe_stat( fund_share_class_count, "share_class_count", "fund_share_class_count" ) describe_stat_df_list.append(fund_share_class_count_stat_df) describe_stat_df = pd.concat(describe_stat_df_list) describe_stat_df.reset_index(drop=True, inplace=True) stat_file = os.path.join(output_folder, "provider_mapping_data_statistics.xlsx") # save statistics data to excel with pd.ExcelWriter(stat_file) as writer: top_n_company_fund_count_df.to_excel( writer, sheet_name="top_n_provider_fund_count", index=False ) top_n_company_share_class_count_df.to_excel( writer, sheet_name="top_n_provider_share_count", index=False ) provider_fund_count.to_excel( writer, sheet_name="provider_fund_count", index=False ) provider_share_class_count.to_excel( writer, sheet_name="provider_share_count", index=False ) fund_share_class_count.to_excel( writer, sheet_name="fund_share_count", index=False ) describe_stat_df.to_excel( writer, sheet_name="all_describe_statistics", index=False ) def statistics_document_fund_share_count(provider_mapping_data_file: str): if ( provider_mapping_data_file is None or len(provider_mapping_data_file) == 0 or not os.path.exists(provider_mapping_data_file) ): logger.error(f"Invalid file_path: {provider_mapping_data_file}") return describe_stat_df_list = [] # statistics document mapping information doc_mapping_data = pd.read_excel(provider_mapping_data_file, sheet_name="all_data") # set noTor column value to 0 if column tor value is not nan, set 1 otherwise doc_mapping_data["noTor"] = doc_mapping_data["tor"].apply( lambda x: 0 if pd.notna(x) else 1 ) # set share_noTer column value to 0 if column share_ter value is not nan, set 1 otherwise doc_mapping_data["share_noTer"] = doc_mapping_data["share_ter"].apply( lambda x: 0 if pd.notna(x) else 1 ) # set share_noOgc column value to 0 if column share_ter value is not nan, set 1 otherwise doc_mapping_data["share_noOgc"] = doc_mapping_data["share_ogc"].apply( lambda x: 0 if pd.notna(x) else 1 ) # set share_noPerfFee column value to 0 if column share_ter value is not nan, set 1 otherwise doc_mapping_data["share_noPerfFee"] = doc_mapping_data["share_perfFee"].apply( lambda x: 0 if pd.notna(x) else 1 ) # statistics doc_mapping_data for counting FundId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundId count based on DocumentId" ) doc_fund_id_df = doc_mapping_data[["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundId"]].drop_duplicates() doc_fund_count = ( doc_fund_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"]).size().reset_index(name="fund_count") ) # order by fund_count in descending order doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=True) # set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0 doc_fund_count["with_ar_data"] = False for index, row in doc_fund_count.iterrows(): document_id = row["DocumentId"] ar_data = doc_mapping_data[ (doc_mapping_data["DocumentId"] == document_id) & ( ( (doc_mapping_data["noTor"] == 0) | (doc_mapping_data["share_noTer"] == 0) | (doc_mapping_data["share_noOgc"] == 0) | (doc_mapping_data["share_noPerfFee"] == 0) ) ) ] if len(ar_data) > 0: doc_fund_count.loc[index, "with_ar_data"] = True # statistics fund_count in doc_fund_count by describe and transform to DataFrame doc_fund_count_stat_df = get_describe_stat( doc_fund_count, "fund_count", "doc_fund_count" ) describe_stat_df_list.append(doc_fund_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundClassId count based on DocumentId" ) doc_share_class_id_df = doc_mapping_data[ ["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundClassId"] ].drop_duplicates() doc_share_class_count = ( doc_share_class_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order doc_share_class_count = doc_share_class_count.sort_values( by="share_class_count", ascending=True ) # set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0 doc_share_class_count["with_ar_data"] = False for index, row in doc_share_class_count.iterrows(): document_id = row["DocumentId"] ar_data = doc_mapping_data[ (doc_mapping_data["DocumentId"] == document_id) & ( ( (doc_mapping_data["noTor"] == 0) | (doc_mapping_data["share_noTer"] == 0) | (doc_mapping_data["share_noOgc"] == 0) | (doc_mapping_data["share_noPerfFee"] == 0) ) ) ] if len(ar_data) > 0: doc_share_class_count.loc[index, "with_ar_data"] = True # statistics share_class_count in doc_share_class_count by describe and transform to DataFrame doc_share_class_count_stat_df = get_describe_stat( doc_share_class_count, "share_class_count", "doc_share_class_count" ) describe_stat_df_list.append(doc_share_class_count_stat_df) describe_stat_df = pd.concat(describe_stat_df_list) describe_stat_df.reset_index(drop=True, inplace=True) with pd.ExcelWriter(provider_mapping_data_file) as writer: doc_mapping_data.to_excel(writer, sheet_name="all_data", index=False) doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False) doc_share_class_count.to_excel(writer, sheet_name="doc_share_class_count", index=False) describe_stat_df.to_excel(writer, sheet_name="all_describe_statistics", index=False) def get_top_n_records_count( df: pd.DataFrame, column_name: str, n: int, total_count: int ): top_n_records = df.head(n) top_n_records_count = top_n_records[column_name].sum() top_n_records_count_percent = round((top_n_records_count / total_count) * 100, 2) return top_n_records_count, top_n_records_count_percent def get_describe_stat(df: pd.DataFrame, column_name: str, stat_type_name: str): stat_df = df[column_name].describe().reset_index().T stat_df.columns = ["count", "mean", "std", "min", "25%", "50%", "75%", "max"] stat_df.reset_index(inplace=True) stat_df.rename(columns={"index": "Stat"}, inplace=True) # remove the first row stat_df = stat_df[1:] if stat_type_name is not None: stat_df["Stat_Type"] = stat_type_name stat_df = stat_df[ [ "Stat_Type", "count", "mean", "std", "min", "25%", "50%", "75%", "max", ] ] return stat_df def pickup_document_from_top_100_providers(): """ Pickup 100 documents from top 100 providers. The documents are with less 10 share classes. The purpose is to analyze the document structure and content from small documents. """ provider_mapping_data_file = ( r"/data/emea_ar/basic_information/English/provider_mapping_data_statistics.xlsx" ) top_100_provider_document_file = ( r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx" ) provider_share_count = pd.read_excel( provider_mapping_data_file, sheet_name="provider_share_count" ) # add a new column with name share_count_rank to provider_share_count provider_share_count["share_count_rank"] = provider_share_count[ "share_class_count" ].rank(method="min", ascending=False) top_100_provider_document_all_data = pd.read_excel( top_100_provider_document_file, sheet_name="all_data" ) top_100_provider_document_fund_count = pd.read_excel( top_100_provider_document_file, sheet_name="doc_fund_count" ) top_100_provider_document_fund_count.reset_index(drop=True, inplace=True) top_100_provider_document_share_count = pd.read_excel( top_100_provider_document_file, sheet_name="doc_share_class_count" ) top_100_provider_document_share_count = \ top_100_provider_document_share_count[top_100_provider_document_share_count["with_ar_data"] == True] top_100_provider_document_share_count.reset_index(drop=True, inplace=True) top_100_provider_document_share_count = pd.merge( top_100_provider_document_share_count, top_100_provider_document_fund_count, on=["DocumentId"], how="left", ) top_100_provider_document_share_count = top_100_provider_document_share_count[ ["DocumentId", "CompanyId_x", "CompanyName_x", "fund_count", "share_class_count"] ] top_100_provider_document_share_count.rename( columns={"CompanyId_x": "CompanyId"}, inplace=True ) # add a new column with name share_count_rank to top_100_provider_document_share_count by merge with provider_share_count top_100_provider_document_share_count = pd.merge( top_100_provider_document_share_count, provider_share_count, on=["CompanyId"], how="left", ) # Keep columns: DocumentId, CompanyId, CompanyName, share_class_count_x, share_count_rank top_100_provider_document_share_count = top_100_provider_document_share_count[ ["DocumentId", "CompanyId", "CompanyName", "fund_count", "share_class_count_x", "share_count_rank"] ] # rename column share_class_count_x to share_class_count top_100_provider_document_share_count.rename( columns={"share_class_count_x": "share_class_count", "share_count_rank": "provider_share_count_rank"}, inplace=True ) top_100_provider_document_share_count = top_100_provider_document_share_count.sort_values( by=["provider_share_count_rank", "share_class_count"], ascending=True ) # According to share_count_rank, from 1 to 10, # random pickup one documents with 1 to 10 share classes for each rank data_filter = top_100_provider_document_share_count[ (top_100_provider_document_share_count["share_class_count"] <= 10) & (top_100_provider_document_share_count["share_class_count"] >= 1) ] data_filter = data_filter.sort_values( by=["provider_share_count_rank", "share_class_count"], ascending=[True, True] ) unique_rank_list = top_100_provider_document_share_count["provider_share_count_rank"].unique().tolist() random_pickup_document_data_list = [] for rank in unique_rank_list: data_filter_rank = data_filter[data_filter["provider_share_count_rank"] == rank] if len(data_filter_rank) == 0: # get the first document with rank from top_100_provider_document_share_count data_filter_rank = top_100_provider_document_share_count[ top_100_provider_document_share_count["provider_share_count_rank"] == rank ].head(1) data_filter_rank = data_filter_rank.sample(n=1, random_state=88) random_pickup_document_data_list.append(data_filter_rank) random_pickup_document_data = pd.concat(random_pickup_document_data_list) # sort by share_count_rank in ascending order random_pickup_document_data = random_pickup_document_data.sort_values( by="provider_share_count_rank", ascending=True ) random_pickup_document_data.reset_index(drop=True, inplace=True) random_pickup_document_mini_data = random_pickup_document_data[ ["DocumentId", "provider_share_count_rank"] ] # get all data from top_100_provider_document_all_data by merge with random_pickup_document_mini_data random_pickup_document_all_data = pd.merge( random_pickup_document_mini_data, top_100_provider_document_all_data, on=["DocumentId"], how="left", ) # sort random_pickup_document_all_data by provider_share_count_rank, FundLegalName, FundClassLegalName in ascending order random_pickup_document_all_data = random_pickup_document_all_data.sort_values( by=["provider_share_count_rank", "FundLegalName", "FundClassLegalName"], ascending=True ) random_small_document_data_file = ( r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx" ) with pd.ExcelWriter(random_small_document_data_file) as writer: top_100_provider_document_share_count.to_excel( writer, sheet_name="all_doc_with_ar_data", index=False ) random_pickup_document_data.to_excel( writer, sheet_name="random_small_document", index=False ) random_pickup_document_all_data.to_excel( writer, sheet_name="random_small_document_all_data", index=False ) def compare_records_count_by_document_id(): data_from_document = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" sheet_name = "mapping_data" data_from_document_df = pd.read_excel(data_from_document, sheet_name=sheet_name) data_from_document_df.rename( columns={"doc_id": "DocumentId"}, inplace=True ) # get the count of records by DocumentId document_records_count = data_from_document_df.groupby("DocumentId").size().reset_index(name="records_count") data_from_database = r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document_from_DocumentAcquisition.xlsx" sheet_name = "random_small_document_all_data" data_from_database_df = pd.read_excel(data_from_database, sheet_name=sheet_name) database_records_count = data_from_database_df.groupby("DocumentId").size().reset_index(name="records_count") # merge document_records_count with database_records_count records_count_compare = pd.merge( document_records_count, database_records_count, on=["DocumentId"], how="left", ) records_count_compare["records_count_diff"] = records_count_compare["records_count_x"] - records_count_compare["records_count_y"] records_count_compare = records_count_compare.sort_values(by="records_count_diff", ascending=False) # rename records_count_x to records_count_document, records_count_y to records_count_database records_count_compare.rename( columns={"records_count_x": "records_count_document", "records_count_y": "records_count_database"}, inplace=True ) records_count_compare.reset_index(drop=True, inplace=True) records_count_compare_file = ( r"/data/emea_ar/basic_information/English/records_count_compare_between_document_database_from_DocumentAcquisition.xlsx" ) with pd.ExcelWriter(records_count_compare_file) as writer: records_count_compare.to_excel( writer, sheet_name="records_count_compare", index=False ) def get_document_extracted_share_diff_by_db(): db_data_file = r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document_from_DocumentAcquisition.xlsx" extract_data_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx" doc_mapping_folder = r"/data/emea_ar/output/mapping/document/" db_data = pd.read_excel(db_data_file, sheet_name="Sheet1") extract_data = pd.read_excel(extract_data_file, sheet_name="mapping_data") # only get data which investment_type is 1 # extract_data = extract_data[extract_data["investment_type"] == 1] extract_data.reset_index(drop=True, inplace=True) unique_doc_id = extract_data["doc_id"].unique().tolist() status_info = { 1: "WIP", 5: "Junked", 3: "AutoSignoff", 2: "Signoffed", 10: "Complete", 20: "AutoDetect", 21: "Checkduplicate", 22: "Mapping", 33: "Not Matched", 99: "Unknown", } document_extract_db_compare = [] for doc_id in unique_doc_id: doc_mapping_file = os.path.join(doc_mapping_folder, f"{doc_id}.xlsx") if not os.path.exists(doc_mapping_file): logger.error(f"Invalid mapping_file: {doc_mapping_file}") doc_mapping_share_class_id_df = pd.DataFrame() else: doc_mapping_data = pd.read_excel(doc_mapping_file) doc_mapping_share_class_id_df = doc_mapping_data[["SecId"]].drop_duplicates() ar_db_data_doc = db_data[db_data["DocumentId"] == doc_id] try: masterProcess_status = ar_db_data_doc["MasterProcess_Status"].values[0] except Exception as e: logger.error(f"Error: {e}") masterProcess_status = 99 masterProcess_status = int(masterProcess_status) masterProcess_status_defination = status_info.get(masterProcess_status, "Unknown") # get data from ar_db_data_doc which noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0 ar_db_data_doc = ar_db_data_doc[ (ar_db_data_doc["noTor"] == 0) | (ar_db_data_doc["share_noTer"] == 0) | (ar_db_data_doc["share_noOgc"] == 0) | (ar_db_data_doc["share_noPerfFee"] == 0) ] extract_data_doc = extract_data[extract_data["doc_id"] == doc_id] # unique raw_name in extract_data_doc unique_raw_name = extract_data_doc["raw_name"].unique().tolist() doc_mapping_share_class_count = len(doc_mapping_share_class_id_df) extract_share_class_count = len(unique_raw_name) extract_vs_doc_share_count_diff = extract_share_class_count - doc_mapping_share_class_count db_share_class_count = len(ar_db_data_doc) extract_vs_ar_db_share_count_diff = extract_share_class_count - db_share_class_count document_extract_db_compare.append({ "DocumentId": doc_id, "status": masterProcess_status, "status_defination": masterProcess_status_defination, "extract_share_count": extract_share_class_count, "doc_share_count": doc_mapping_share_class_count, "extract_vs_doc_share_count_diff": extract_vs_doc_share_count_diff, "ar_db_share_count": db_share_class_count, "extract_vs_ar_db_share_count_diff": extract_vs_ar_db_share_count_diff, }) document_extract_db_compare_df = pd.DataFrame(document_extract_db_compare) # output to excel document_extract_db_compare_file = ( r"/data/emea_ar/basic_information/English/document_extract_db_compare.xlsx" ) with pd.ExcelWriter(document_extract_db_compare_file) as writer: document_extract_db_compare_df.to_excel( writer, sheet_name="document_extract_db_compare", index=False ) def concat_mapping(mapping_folder: str, output_file: str): excel_files = glob(os.path.join(mapping_folder, "*.xlsx")) logger.info(f"Total {len(excel_files)} excel files found in {mapping_folder}") all_data_list = [] for excel_file in excel_files: doc_mapping_data = pd.read_excel(excel_file) all_data_list.append(doc_mapping_data) all_data = pd.concat(all_data_list) all_data.reset_index(drop=True, inplace=True) with open(output_file, "wb") as f: all_data.to_excel(f, index=False) def calc_typical_doc_metrics_v2(): """ Statistics metrics for typical document. 1. Fund level datapoint: TOR 2. Share level datapoint: OGC, TER, Performance fees 3. Only statistics the record which with document investment mapping """ from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score result_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_20_new_emea_documents_sample_Accuracy.xlsx" sheet_name = "record_level_Results" data = pd.read_excel(result_file, sheet_name=sheet_name) data.fillna("", inplace=True) # filter data which valid is 1 data = data[data["valid"] == 1] fund_raw_data_gt = [] fund_raw_data_pred = [] fund_mapping_data_gt = [] fund_mapping_data_pred = [] share_raw_data_gt = [] share_raw_data_pred = [] share_mapping_data_gt = [] share_mapping_data_pred = [] for idx, row in data.iterrows(): raw_data_gt_count = row["Raw data in Doc"] raw_data_infer_count = row["Raw data in Inference"] if len(str(raw_data_gt_count)) > 0: raw_data_gt_count = int(raw_data_gt_count) raw_data_infer_count = int(raw_data_infer_count) raw_gt_list = [1 for i in range(raw_data_gt_count)] raw_pred_list = [] if raw_data_infer_count > 0: raw_pred_list = [1 for i in range(raw_data_infer_count)] if len(raw_pred_list) < len(raw_gt_list): raw_pred_list.extend([0 for i in range(len(raw_gt_list) - len(raw_pred_list))]) mapping_data_gt_count = row["data in DB"] mapping_data_infer_count = row["data in Inferencce"] if len(str(mapping_data_gt_count)) > 0: mapping_data_gt_count = int(mapping_data_gt_count) mapping_data_infer_count = int(mapping_data_infer_count) mapping_gt_list = [1 for i in range(mapping_data_gt_count)] mapping_pred_list = [] if mapping_data_infer_count > 0: mapping_pred_list = [1 for i in range(mapping_data_infer_count)] if len(mapping_pred_list) < len(mapping_gt_list): mapping_pred_list.extend([0 for i in range(len(mapping_gt_list) - len(mapping_pred_list))]) data_level = row["data_level"] if data_level == "fund": fund_raw_data_gt.extend(raw_gt_list) fund_raw_data_pred.extend(raw_pred_list) fund_mapping_data_gt.extend(mapping_gt_list) fund_mapping_data_pred.extend(mapping_pred_list) else: share_raw_data_gt.extend(raw_gt_list) share_raw_data_pred.extend(raw_pred_list) share_mapping_data_gt.extend(mapping_gt_list) share_mapping_data_pred.extend(mapping_pred_list) share_raw_data_gt.extend([0, 0, 0, 0, 0, 0]) share_raw_data_pred.extend([1, 1, 1, 1, 1, 1]) share_mapping_data_gt.extend([0, 0, 0, 0, 0, 0]) share_mapping_data_pred.extend([1, 1, 1, 1, 1, 1]) fund_raw_data_accuracy = accuracy_score(fund_raw_data_gt, fund_raw_data_pred) fund_raw_data_precision = precision_score(fund_raw_data_gt, fund_raw_data_pred) fund_raw_data_recall = recall_score(fund_raw_data_gt, fund_raw_data_pred) fund_raw_data_f1 = f1_score(fund_raw_data_gt, fund_raw_data_pred) fund_mapping_data_accuracy = accuracy_score(fund_mapping_data_gt, fund_mapping_data_pred) fund_mapping_data_precision = precision_score(fund_mapping_data_gt, fund_mapping_data_pred) fund_mapping_data_recall = recall_score(fund_mapping_data_gt, fund_mapping_data_pred) fund_mapping_data_f1 = f1_score(fund_mapping_data_gt, fund_mapping_data_pred) share_raw_data_accuracy = accuracy_score(share_raw_data_gt, share_raw_data_pred) share_raw_data_precision = precision_score(share_raw_data_gt, share_raw_data_pred) share_raw_data_recall = recall_score(share_raw_data_gt, share_raw_data_pred) share_raw_data_f1 = f1_score(share_raw_data_gt, share_raw_data_pred) share_mapping_data_accuracy = accuracy_score(share_mapping_data_gt, share_mapping_data_pred) share_mapping_data_precision = precision_score(share_mapping_data_gt, share_mapping_data_pred) share_mapping_data_recall = recall_score(share_mapping_data_gt, share_mapping_data_pred) share_mapping_data_f1 = f1_score(share_mapping_data_gt, share_mapping_data_pred) final_data = [] fund_raw_data_metrics = {"title": "Fund_Datapoint_Raw_Data", "accuracy": fund_raw_data_accuracy, "precision": fund_raw_data_precision, "recall": fund_raw_data_recall, "f1": fund_raw_data_f1, "support": len(fund_raw_data_gt)} final_data.append(fund_raw_data_metrics) logger.info(f"fund_raw_data_accuracy: {fund_raw_data_accuracy}") logger.info(f"fund_raw_data_precision: {fund_raw_data_precision}") logger.info(f"fund_raw_data_recall: {fund_raw_data_recall}") logger.info(f"fund_raw_data_f1: {fund_raw_data_f1}") logger.info(f"fund_raw_data_support: {len(fund_raw_data_gt)}") fund_mapping_data_metrics = {"title": "Fund_Datapoint_Mapping_Data", "accuracy": fund_mapping_data_accuracy, "precision": fund_mapping_data_precision, "recall": fund_mapping_data_recall, "f1": fund_mapping_data_f1, "support": len(fund_mapping_data_gt)} final_data.append(fund_mapping_data_metrics) logger.info(f"fund_mapping_data_accuracy: {fund_mapping_data_accuracy}") logger.info(f"fund_mapping_data_precision: {fund_mapping_data_precision}") logger.info(f"fund_mapping_data_recall: {fund_mapping_data_recall}") logger.info(f"fund_mapping_data_f1: {fund_mapping_data_f1}") logger.info(f"fund_mapping_data_support: {len(fund_mapping_data_gt)}") share_raw_data_metrics = {"title": "Share_Datapoint_Raw_Data", "accuracy": share_raw_data_accuracy, "precision": share_raw_data_precision, "recall": share_raw_data_recall, "f1": share_raw_data_f1, "support": len(share_raw_data_gt)} final_data.append(share_raw_data_metrics) logger.info(f"share_raw_data_accuracy: {share_raw_data_accuracy}") logger.info(f"share_raw_data_precision: {share_raw_data_precision}") logger.info(f"share_raw_data_recall: {share_raw_data_recall}") logger.info(f"share_raw_data_f1: {share_raw_data_f1}") logger.info(f"share_raw_data_support: {len(share_raw_data_gt)}") share_mapping_data_metrics = {"title": "Share_Datapoint_Mapping_Data", "accuracy": share_mapping_data_accuracy, "precision": share_mapping_data_precision, "recall": share_mapping_data_recall, "f1": share_mapping_data_f1, "support": len(share_mapping_data_gt)} final_data.append(share_mapping_data_metrics) logger.info(f"share_mapping_data_accuracy: {share_mapping_data_accuracy}") logger.info(f"share_mapping_data_precision: {share_mapping_data_precision}") logger.info(f"share_mapping_data_recall: {share_mapping_data_recall}") logger.info(f"share_mapping_data_f1: {share_mapping_data_f1}") logger.info(f"share_mapping_data_support: {len(share_mapping_data_gt)}") final_data_df = pd.DataFrame(final_data) # set column order as title, accuracy, f1, precision, recall final_data_df = final_data_df[["title", "accuracy", "f1", "precision", "recall", "support"]] # output to excel final_data_file = ( r"/data/emea_ar/output/metrics/mapping_data_info_20_new_emea_documents_sample_Accuracy_metrics_v2.xlsx" ) with pd.ExcelWriter(final_data_file) as writer: final_data_df.to_excel( writer, sheet_name="metrics", index=False ) def calc_typical_doc_metrics_v1(): from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score result_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_20_new_emea_documents_sample_Accuracy.xlsx" sheet_name = "record_level_Results" data = pd.read_excel(result_file, sheet_name=sheet_name) data.fillna("", inplace=True) fund_raw_data_list = data["Raw Mapping"].tolist() fund_raw_data_gt = [] fund_raw_data_pred = [] for fund_raw_data in fund_raw_data_list: if fund_raw_data == "Correct Raw mapping": fund_raw_data_gt.append(1) fund_raw_data_pred.append(1) elif fund_raw_data == "Incorrect Raw mapping": fund_raw_data_gt.append(1) fund_raw_data_pred.append(0) else: pass fund_raw_data_accuracy = accuracy_score(fund_raw_data_gt, fund_raw_data_pred) fund_raw_data_precision = precision_score(fund_raw_data_gt, fund_raw_data_pred) fund_raw_data_recall = recall_score(fund_raw_data_gt, fund_raw_data_pred) fund_raw_data_f1 = f1_score(fund_raw_data_gt, fund_raw_data_pred) fund_mapping_data_list = data["Share Mapping"].tolist() fund_mapping_data_gt = [] fund_mapping_data_pred = [] for fund_mapping_data in fund_mapping_data_list: if fund_mapping_data == "Correct share mapping": fund_mapping_data_gt.append(1) fund_mapping_data_pred.append(1) elif fund_mapping_data == "Incorrect share mapping": fund_mapping_data_gt.append(1) fund_mapping_data_pred.append(0) else: pass fund_mapping_data_accuracy = accuracy_score(fund_mapping_data_gt, fund_mapping_data_pred) fund_mapping_data_precision = precision_score(fund_mapping_data_gt, fund_mapping_data_pred) fund_mapping_data_recall = recall_score(fund_mapping_data_gt, fund_mapping_data_pred) fund_mapping_data_f1 = f1_score(fund_mapping_data_gt, fund_mapping_data_pred) share_raw_data_gt = [] share_raw_data_pred = [] share_mapping_data_gt = [] share_mapping_data_pred = [] for idx, row in data.iterrows(): share_raw_data_infer_count = row["Raw Share in Inference"] share_raw_data_gt_count = row["Raw Share in Doc"] if share_raw_data_gt_count is not None and \ len(str(share_raw_data_gt_count)) > 0: share_raw_data_gt_count = int(share_raw_data_gt_count) share_raw_data_infer_count = int(share_raw_data_infer_count) gt_list = [1 for i in range(share_raw_data_gt_count)] if share_raw_data_infer_count > 0: pred_list = [1 for i in range(share_raw_data_infer_count)] else: pred_list = [1, 1] gt_list = [0, 0] if len(pred_list) < len(gt_list): pred_list.extend([0 for i in range(len(gt_list) - len(pred_list))]) share_raw_data_gt.extend(gt_list) share_raw_data_pred.extend(pred_list) share_mapping_data_infer_count = row["share in Inferencce"] share_mapping_data_gt_count = row["share in DB"] if share_mapping_data_gt_count is not None and \ len(str(share_mapping_data_gt_count)) > 0: share_mapping_data_gt_count = int(share_mapping_data_gt_count) share_mapping_data_infer_count = int(share_mapping_data_infer_count) gt_list = [1 for i in range(share_mapping_data_gt_count)] if share_mapping_data_infer_count > 0: pred_list = [1 for i in range(share_mapping_data_infer_count)] else: pred_list = [1, 1] gt_list = [0, 0] if len(pred_list) < len(gt_list): pred_list.extend([0 for i in range(len(gt_list) - len(pred_list))]) share_mapping_data_gt.extend(gt_list) share_mapping_data_pred.extend(pred_list) share_raw_data_accuracy = accuracy_score(share_raw_data_gt, share_raw_data_pred) share_raw_data_precision = precision_score(share_raw_data_gt, share_raw_data_pred) share_raw_data_recall = recall_score(share_raw_data_gt, share_raw_data_pred) share_raw_data_f1 = f1_score(share_raw_data_gt, share_raw_data_pred) share_mapping_data_accuracy = accuracy_score(share_mapping_data_gt, share_mapping_data_pred) share_mapping_data_precision = precision_score(share_mapping_data_gt, share_mapping_data_pred) share_mapping_data_recall = recall_score(share_mapping_data_gt, share_mapping_data_pred) share_mapping_data_f1 = f1_score(share_mapping_data_gt, share_mapping_data_pred) final_data = [] fund_raw_data_metrics = {"title": "Fund_Raw_Data", "accuracy": fund_raw_data_accuracy, "precision": fund_raw_data_precision, "recall": fund_raw_data_recall, "f1": fund_raw_data_f1, "support": len(fund_raw_data_gt)} final_data.append(fund_raw_data_metrics) logger.info(f"fund_raw_data_accuracy: {fund_raw_data_accuracy}") logger.info(f"fund_raw_data_precision: {fund_raw_data_precision}") logger.info(f"fund_raw_data_recall: {fund_raw_data_recall}") logger.info(f"fund_raw_data_f1: {fund_raw_data_f1}") fund_mapping_data_metrics = {"title": "Fund_Mapping_Data", "accuracy": fund_mapping_data_accuracy, "precision": fund_mapping_data_precision, "recall": fund_mapping_data_recall, "f1": fund_mapping_data_f1, "support": len(fund_mapping_data_gt)} final_data.append(fund_mapping_data_metrics) logger.info(f"fund_mapping_data_accuracy: {fund_mapping_data_accuracy}") logger.info(f"fund_mapping_data_precision: {fund_mapping_data_precision}") logger.info(f"fund_mapping_data_recall: {fund_mapping_data_recall}") logger.info(f"fund_mapping_data_f1: {fund_mapping_data_f1}") share_raw_data_metrics = {"title": "Share_Raw_Data", "accuracy": share_raw_data_accuracy, "precision": share_raw_data_precision, "recall": share_raw_data_recall, "f1": share_raw_data_f1, "support": len(share_raw_data_gt)} final_data.append(share_raw_data_metrics) logger.info(f"share_raw_data_accuracy: {share_raw_data_accuracy}") logger.info(f"share_raw_data_precision: {share_raw_data_precision}") logger.info(f"share_raw_data_recall: {share_raw_data_recall}") logger.info(f"share_raw_data_f1: {share_raw_data_f1}") share_mapping_data_metrics = {"title": "Share_Mapping_Data", "accuracy": share_mapping_data_accuracy, "precision": share_mapping_data_precision, "recall": share_mapping_data_recall, "f1": share_mapping_data_f1, "support": len(share_mapping_data_gt)} final_data.append(share_mapping_data_metrics) logger.info(f"share_mapping_data_accuracy: {share_mapping_data_accuracy}") logger.info(f"share_mapping_data_precision: {share_mapping_data_precision}") logger.info(f"share_mapping_data_recall: {share_mapping_data_recall}") logger.info(f"share_mapping_data_f1: {share_mapping_data_f1}") final_data_df = pd.DataFrame(final_data) # set column order as title, accuracy, f1, precision, recall final_data_df = final_data_df[["title", "accuracy", "f1", "precision", "recall", "support"]] # output to excel final_data_file = ( r"/data/emea_ar/output/metrics/mapping_data_info_20_new_emea_documents_sample_Accuracy_metrics.xlsx" ) with pd.ExcelWriter(final_data_file) as writer: final_data_df.to_excel( writer, sheet_name="metrics", index=False ) def merge_aus_document_prospectus_data(): """ Merge AUS document and prospectus data. """ aus_document_file = r"/data/aus_prospectus/basic_information/from_2024_documents/document_mapping.xlsx" aus_prospectus_file = r"/data/aus_prospectus/basic_information/from_2024_documents/aus_prospectus_data.xlsx" aus_document_data = pd.read_excel(aus_document_file, sheet_name="document_mapping") aus_prospectus_data = pd.read_excel(aus_prospectus_file) aus_document_data["DocumentId"] = aus_document_data["DocumentId"].astype(str) aus_document_prospectus_data = pd.merge( aus_document_data, aus_prospectus_data, on=["FundClassId", "EffectiveDate"], how="left", ) aus_document_prospectus_file = r"/data/aus_prospectus/basic_information/from_2024_documents/aus_document_prospectus.xlsx" with pd.ExcelWriter(aus_document_prospectus_file) as writer: aus_document_prospectus_data.to_excel( writer, sheet_name="aus_document_prospectus", index=False ) def pdf_exist(): data_folder = r"/data/aus_prospectus/basic_information/from_2024_documents/" data_file = os.path.join(data_folder, "aus_100_document_prospectus_multi_fund.xlsx") percentile_result_df = pd.read_excel(data_file, sheet_name="percentile_result") document_id_list = percentile_result_df["DocumentId"].unique().tolist() pdf_doc_path = r"/data/aus_prospectus/pdf/" for doc_id in document_id_list: pdf_file_path = os.path.join(pdf_doc_path, f"{doc_id}.pdf") if not os.path.exists(pdf_file_path): logger.error(f"pdf file not exist: {pdf_file_path}") else: logger.info(f"pdf file exist: {pdf_file_path}") def prepare_multi_fund_aus_prospectus_document(): data_folder = r"/data/aus_prospectus/basic_information/from_2024_documents/" document_mapping_file = os.path.join(data_folder, "document_mapping.xlsx") document_data_file = os.path.join(data_folder, "aus_document_prospectus.xlsx") document_mapping_df = pd.read_excel(document_mapping_file, sheet_name="document_mapping") document_fund_count_df = pd.read_excel(document_mapping_file, sheet_name="document_fund_count") document_data_df = pd.read_excel(document_data_file, sheet_name="aus_document_prospectus") document_data_df.fillna("", inplace=True) # get data from document_data_df which SecurityName is not empty string document_data_df = document_data_df[document_data_df["SecurityName"] != ""] document_id_list = document_data_df["DocumentId"].unique().tolist() # get document which fund count > 1 document_fund_count_df = document_fund_count_df[document_fund_count_df["DocumentId"].isin(document_id_list)] document_fund_count_df = document_fund_count_df[document_fund_count_df["DistinctFundCount"] > 1] document_fund_count_df = document_fund_count_df.sort_values(by="DistinctFundCount", ascending=False) # Calculate percentile percentiles = [0, 0.3, 0.6, 1] quantile_values = document_fund_count_df['DistinctFundCount'].quantile(percentiles) # Group by percentile bins = [quantile_values[0], quantile_values[0.3], quantile_values[0.6], quantile_values[1]] document_fund_count_df['Percentile_Group'] = pd.cut(document_fund_count_df['DistinctFundCount'], bins=bins, labels=["0-30", "30-60", "60-100"], include_lowest=True) # Get relevant samples based on percentile group percentile_result = pd.DataFrame() for group, count in zip(["0-30", "30-60", "60-100"], [30, 30, 40]): group_df = document_fund_count_df[document_fund_count_df['Percentile_Group'] == group] sampled_df = group_df.sample(n=min(len(group_df), count), random_state=42) percentile_result = pd.concat([percentile_result, sampled_df], ignore_index=True) percentile_result.reset_index(drop=True, inplace=True) document_id_list = percentile_result["DocumentId"].unique().tolist() final_document_mapping_df = document_mapping_df[document_mapping_df["DocumentId"].isin(document_id_list)] # order by DocumentId, FundLegalName, FundClassLegalName final_document_mapping_df = final_document_mapping_df.sort_values(by=["DocumentId", "FundLegalName", "FundClassLegalName"], ascending=True) final_document_mapping_df.reset_index(drop=True, inplace=True) # get CompanyId, CompanyName from final_document_mapping_df final_document_provider_df = final_document_mapping_df[["CompanyId", "CompanyName"]].drop_duplicates() # order by CompanyName final_document_provider_df = final_document_provider_df.sort_values(by="CompanyName", ascending=True) final_document_provider_df.reset_index(drop=True, inplace=True) final_document_data_df = document_data_df[document_data_df["DocumentId"].isin(document_id_list)] # order by DocumentId, FundLegalName, FundClassLegalName final_document_data_df = final_document_data_df.sort_values(by=["DocumentId", "FundLegalName", "FundClassLegalName"], ascending=True) final_document_data_df.reset_index(drop=True, inplace=True) output_file = os.path.join(data_folder, "aus_100_document_prospectus_multi_fund.xlsx") with pd.ExcelWriter(output_file) as writer: final_document_mapping_df.to_excel( writer, sheet_name="document_mapping", index=False ) final_document_provider_df.to_excel( writer, sheet_name="document_provider", index=False ) final_document_data_df.to_excel( writer, sheet_name="aus_document_data", index=False ) percentile_result.to_excel( writer, sheet_name="percentile_result", index=False ) output_sample_document_file = os.path.join(r"./sample_documents/", "aus_prospectus_100_documents_multi_fund_sample.txt") # output document id to txt file with open(output_sample_document_file, "w") as f: for doc_id in document_id_list: f.write(f"{doc_id}\n") if __name__ == "__main__": # pdf_exist() prepare_multi_fund_aus_prospectus_document() # merge_aus_document_prospectus_data() folder = r"/data/emea_ar/basic_information/English/sample_doc/emea_11_06_case/" file_name = "doc_ar_data_for_emea_11_06.xlsx" # get_document_with_all_4_data_points(folder, file_name, None) # calc_typical_doc_metrics_v1() # calc_typical_doc_metrics_v2() doc_provider_file_path = ( r"/data/emea_ar/basic_information/English/latest_provider_ar_document.xlsx" ) doc_ar_data_file_path = r"/data/emea_ar/basic_information/English/latest_provider_ar_document_mapping.xlsx" provider_mapping_data_file = ( r"/data/emea_ar/basic_information/English/provider_mapping_data.xlsx" ) doc_mapping_from_top_100_provider_file = ( r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx" ) basic_info_folder = r"/data/emea_ar/basic_information/English/" pdf_folder = r"/data/emea_ar/pdf/" output_folder = r"/data/emea_ar/output/" # get_unique_docids_from_doc_provider_data(doc_provider_file_path) # download_pdf(doc_provider_file_path, 'doc_provider_count', pdf_folder) # pdf_folder = r"/data/emea_ar/small_pdf/" output_folder = r"/data/emea_ar/small_pdf_txt/" random_small_document_data_file = ( r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx" ) doc_provider_file_path = r"/data/emea_ar/basic_information/English/sample_doc/emea_final_typical_case/Final list of EMEA documents.xlsx" pdf_folder = r"/data/emea_ar/pdf/" # download_pdf( # doc_provider_file_path=doc_provider_file_path, # sheet_name="Sheet1", # doc_id_column="Document Id", # pdf_path=pdf_folder) pdf_folder = r"/data/aus_prospectus/pdf/" output_folder = r"/data/aus_prospectus/pdf_txt/" # output_pdf_page_text(pdf_folder, output_folder) # extract_pdf_table(pdf_folder, output_folder) # analyze_json_error() latest_top_100_provider_ar_data_file = r"/data/emea_ar/basic_information/English/top_100_provider_latest_document_most_mapping/lux_english_ar_from_top_100_provider_latest_document_with_most_mappings.xlsx" # download_pdf(latest_top_100_provider_ar_data_file, # 'latest_ar_document_most_mapping', # pdf_folder) doc_ar_data_file_path = r"/data/emea_ar/basic_information/sample_documents/sample_doc/sample_documents_12_11/doc_ar_data_12_11.xlsx" doc_mapping_data_file_path = r"/data/emea_ar/basic_information/sample_documents/sample_doc/sample_documents_12_11/doc_mapping_12_11.xlsx" output_data_folder = r"/data/emea_ar/basic_information/sample_documents/sample_doc/sample_documents_12_11/" output_file="doc_ar_data_sample_documents_12_11_statistics.xlsx" # pdf_folder = r"/data/aus_prospectus/pdf/" # doc_ar_data_file_path = None # doc_mapping_data_file_path = r"/data/aus_prospectus/basic_information/from_2024_documents/aus_100_document_prospectus_multi_fund.xlsx" # output_data_folder = r"/data/aus_prospectus/basic_information/from_2024_documents/" # output_file = "aus_100_document_prospectus_multi_fund_statistics.xlsx" # statistics_document(pdf_folder=pdf_folder, # doc_mapping_file_path=doc_mapping_data_file_path, # doc_ar_data_file_path=doc_ar_data_file_path, # mapping_sheet_name="document_mapping", # ar_data_sheet_name="aus_document_data", # output_folder=output_data_folder, # output_file=output_file) # get_document_extracted_share_diff_by_db() # statistics_provider_mapping( # provider_mapping_data_file=provider_mapping_data_file, # output_folder=basic_info_folder, # ) # statistics_document_fund_share_count(doc_mapping_from_top_100_provider_file) # pickup_document_from_top_100_providers() # compare_records_count_by_document_id() # document_mapping_folder = r"/data/emea_ar/output/mapping/document/" # all_data_file = r"/data/emea_ar/output/mapping/all_document_mapping.xlsx" # concat_mapping(document_mapping_folder, all_data_file) # provider_mapping_folder = r"/data/emea_ar/output/mapping/provider/" # all_data_file = r"/data/emea_ar/output/mapping/all_provider_mapping.xlsx" # concat_mapping(provider_mapping_folder, all_data_file)