import pandas as pd import os from tqdm import tqdm import json from glob import glob import fitz import re import time import traceback import json_repair from utils.logger import logger from utils.pdf_download import download_pdf_from_documents_warehouse from utils.pdf_util import PDFUtil from pdf_table_extraction import PDFTableExtraction def get_unique_docids_from_doc_provider_data(doc_provider_file_path: str): doc_provider_data = pd.read_excel(doc_provider_file_path) # get new data by grouping by docid, and count the number of rows for each docid, # set the new data with 2 columns: docid and provider_count doc_provider_count = ( doc_provider_data.groupby("DocumentId") .size() .reset_index(name="provider_count") ) # sort new data by provider_count in descending order doc_provider_count = doc_provider_count.sort_values( by="provider_count", ascending=False ) # save excel by doc_provider_data and new_data with pd.ExcelWriter(doc_provider_file_path) as writer: doc_provider_data.to_excel( writer, sheet_name="doc_provider_details", index=False ) doc_provider_count.to_excel( writer, sheet_name="doc_provider_count", index=False ) def download_pdf(doc_provider_file_path: str, sheet_name: str, pdf_path: str): document_data = pd.read_excel(doc_provider_file_path, sheet_name=sheet_name) # get all unique docids as list doc_id_list = [ str(doc_id) for doc_id in document_data["DocumentId"].unique().tolist() ] # download pdfs logger.info(f"Start downloading {len(doc_id_list)} pdfs") os.makedirs(pdf_path, exist_ok=True) for doc_id in tqdm.tqdm(doc_id_list): logger.info(f"Downloading pdf for docid: {doc_id}") download_pdf_from_documents_warehouse(pdf_directory=pdf_path, doc_id=doc_id) time.sleep(1) def output_pdf_page_text(pdf_folder: str, output_folder: str): if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder): logger.error(f"Invalid pdf_folder: {pdf_folder}") return if output_folder is None or len(output_folder) == 0: logger.error(f"Invalid output_folder: {output_folder}") return os.makedirs(output_folder, exist_ok=True) pdf_files = glob(os.path.join(pdf_folder, "*.pdf")) logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}") for pdf_file in pdf_files: logger.info(f"Start processing {pdf_file}") pdf_util = PDFUtil(pdf_file) success, text, page_text_dict = pdf_util.extract_text( output_folder=output_folder ) if success: logger.info(f"Successfully extracted text from {pdf_file}") def extract_pdf_table(pdf_folder: str, output_folder: str): if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder): logger.error(f"Invalid pdf_folder: {pdf_folder}") return if output_folder is None or len(output_folder) == 0: logger.error(f"Invalid output_folder: {output_folder}") return os.makedirs(output_folder, exist_ok=True) pdf_files = glob(os.path.join(pdf_folder, "*.pdf")) logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}") for pdf_file in pdf_files: logger.info(f"Start processing {pdf_file}") pdf_table_extraction = PDFTableExtraction(pdf_file, output_folder) pdf_table_extraction.extract_tables() def analyze_json_error(): text_file = r"/data/emea_ar/output/pdf_table_prompts/445877368_4.txt" with open(text_file, "r", encoding="utf-8") as file: text = file.read() json_response = re.search(r"\`\`\`json([\s\S]*)\`\`\`", text) if json_response: json_text = json_response.group(1) json_data = {"tables": []} try: json_data = json.loads(json_text) except: json_data = json_repair.loads(json_text) table_list = json_data.get("tables", []) for table_num, table in enumerate(table_list): table_md_file = os.path.join("/temp/", f"temp_{table_num}.md") table = re.sub(r"(\n)+", "\n", table) with open(table_md_file, "w", encoding="utf-8") as file: file.write(table) def statistics_document( pdf_folder: str, doc_mapping_file_path: str, output_folder: str ): if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder): logger.error(f"Invalid pdf_folder: {pdf_folder}") return if ( doc_mapping_file_path is None or len(doc_mapping_file_path) == 0 or not os.path.exists(doc_mapping_file_path) ): logger.error(f"Invalid doc_mapping_file_path: {doc_mapping_file_path}") return if output_folder is None or len(output_folder) == 0: logger.error(f"Invalid output_folder: {output_folder}") return os.makedirs(output_folder, exist_ok=True) describe_stat_df_list = [] # statistics document mapping information doc_mapping_data = pd.read_excel(doc_mapping_file_path, sheet_name="all_data") # statistics doc_mapping_data for counting FundId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundId count based on DocumentId" ) doc_fund_id_df = doc_mapping_data[["DocumentId", "FundId"]].drop_duplicates() doc_fund_count = ( doc_fund_id_df.groupby("DocumentId").size().reset_index(name="fund_count") ) # order by fund_count in descending order doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=False) # statistics fund_count in doc_fund_count by describe and transform to DataFrame doc_fund_count_stat_df = get_describe_stat( doc_fund_count, "fund_count", "doc_fund_count" ) describe_stat_df_list.append(doc_fund_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundClassId count based on DocumentId" ) doc_share_class_id_df = doc_mapping_data[ ["DocumentId", "FundClassId"] ].drop_duplicates() doc_share_class_count = ( doc_share_class_id_df.groupby("DocumentId") .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order doc_share_class_count = doc_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in doc_share_class_count by describe and transform to DataFrame doc_share_class_count_stat_df = get_describe_stat( doc_share_class_count, "share_class_count", "doc_share_class_count" ) describe_stat_df_list.append(doc_share_class_count_stat_df) # statistics doc_mapping_data for counting FundId count based on ProviderCompanyId and CompanyName logger.info( "statistics doc_mapping_data for counting FundId count based on ProviderCompanyId and CompanyName" ) provider_fund_id_df = doc_mapping_data[ ["ProviderCompanyId", "CompanyName", "FundId"] ].drop_duplicates() provider_fund_count = ( provider_fund_id_df.groupby(["ProviderCompanyId", "CompanyName"]) .size() .reset_index(name="fund_count") ) # order by fund_count in descending order provider_fund_count = provider_fund_count.sort_values( by="fund_count", ascending=False ) # statistics fund_count in provider_fund_count by describe and transform to DataFrame provider_fund_count_stat_df = get_describe_stat( provider_fund_count, "fund_count", "provider_fund_count" ) describe_stat_df_list.append(provider_fund_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on ProviderCompanyId logger.info( "statistics doc_mapping_data for counting FundClassId count based on ProviderCompanyId" ) provider_share_class_id_df = doc_mapping_data[ ["ProviderCompanyId", "CompanyName", "FundClassId"] ].drop_duplicates() provider_share_class_count = ( provider_share_class_id_df.groupby(["ProviderCompanyId", "CompanyName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order provider_share_class_count = provider_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in provider_share_class_count by describe and transform to DataFrame provider_share_class_count_stat_df = get_describe_stat( provider_share_class_count, "share_class_count", "provider_share_class_count" ) describe_stat_df_list.append(provider_share_class_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName logger.info( "statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName" ) fund_share_class_id_df = doc_mapping_data[ ["FundId", "FundLegalName", "FundClassId"] ].drop_duplicates() fund_share_class_count = ( fund_share_class_id_df.groupby(["FundId", "FundLegalName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in fund_share_class_count fund_share_class_count = fund_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in fund_share_class_count by describe and transform to DataFrame fund_share_class_count_stat_df = get_describe_stat( fund_share_class_count, "share_class_count", "fund_share_class_count" ) describe_stat_df_list.append(fund_share_class_count_stat_df) stat_file = os.path.join(output_folder, "doc_mapping_statistics_data.xlsx") # statistics document page number pdf_files = glob(os.path.join(pdf_folder, "*.pdf")) logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}") logger.info("statistics document page number") doc_page_num_list = [] for pdf_file in tqdm(pdf_files): docid = os.path.basename(pdf_file).split(".")[0] doc = fitz.open(pdf_file) page_num = doc.page_count doc_page_num_list.append({"docid": docid, "page_num": page_num}) doc.close() doc_page_num_df = pd.DataFrame(doc_page_num_list) # order by page_num in descending order doc_page_num_df = doc_page_num_df.sort_values(by="page_num", ascending=False) # statistics page_num by describe and transform to DataFrame doc_page_num_stat_df = get_describe_stat( doc_page_num_df, "page_num", "doc_page_num" ) describe_stat_df_list.append(doc_page_num_stat_df) describe_stat_df = pd.concat(describe_stat_df_list) describe_stat_df.reset_index(drop=True, inplace=True) # save statistics data to excel with pd.ExcelWriter(stat_file) as writer: doc_page_num_df.to_excel(writer, sheet_name="doc_page_num", index=False) doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False) doc_share_class_count.to_excel( writer, sheet_name="doc_share_class_count", index=False ) provider_fund_count.to_excel( writer, sheet_name="provider_fund_count", index=False ) provider_share_class_count.to_excel( writer, sheet_name="provider_share_class_count", index=False ) fund_share_class_count.to_excel( writer, sheet_name="fund_share_class_count", index=False ) describe_stat_df.to_excel( writer, sheet_name="all_describe_statistics", index=False ) def statistics_provider_mapping(provider_mapping_data_file: str, output_folder: str): if ( provider_mapping_data_file is None or len(provider_mapping_data_file) == 0 or not os.path.exists(provider_mapping_data_file) ): logger.error( f"Invalid provider_mapping_data_file: {provider_mapping_data_file}" ) return provider_mapping_data = pd.read_excel(provider_mapping_data_file) describe_stat_df_list = [] # statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName logger.info( "statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName" ) provider_fund_id_df = provider_mapping_data[ ["CompanyId", "CompanyName", "FundId"] ].drop_duplicates() provider_fund_count = ( provider_fund_id_df.groupby(["CompanyId", "CompanyName"]) .size() .reset_index(name="fund_count") ) # order by fund_count in descending order provider_fund_count = provider_fund_count.sort_values( by="fund_count", ascending=False ) # statistics fund_count in provider_fund_count by describe and transform to DataFrame provider_fund_count_stat_df = get_describe_stat( provider_fund_count, "fund_count", "provider_fund_count" ) describe_stat_df_list.append(provider_fund_count_stat_df) # Get the fund_count sum of all companies all_companies_fund_count_sum = provider_fund_count["fund_count"].sum() top_n_company_fund_count_list = [] # Get the fund_count sum of top 5 companies top_5_companies_fund_count, top_5_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 5, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 5, "fund_count": top_5_companies_fund_count, "percent": top_5_companies_fund_count_percent, } ) logger.info(f"Top 5 companies fund count sum: {top_5_companies_fund_count}") # Get the fund_count sum of top 10 companies top_10_companies_fund_count, top_10_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 10, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 10, "fund_count": top_10_companies_fund_count, "percent": top_10_companies_fund_count_percent, } ) logger.info(f"Top 10 companies fund count sum: {top_10_companies_fund_count}") # Get the fund_count sum of top 50 companies top_50_companies_fund_count, top_50_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 50, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 50, "fund_count": top_50_companies_fund_count, "percent": top_50_companies_fund_count_percent, } ) logger.info(f"Top 50 companies fund count sum: {top_50_companies_fund_count}") # Get the fund_count sum of top 100 companies top_100_companies_fund_count, top_100_companies_fund_count_percent = ( get_top_n_records_count( provider_fund_count, "fund_count", 100, all_companies_fund_count_sum ) ) top_n_company_fund_count_list.append( { "top_n_providers": 100, "fund_count": top_100_companies_fund_count, "percent": top_100_companies_fund_count_percent, } ) top_n_company_fund_count_list.append( { "top_n_providers": len(provider_fund_count), "fund_count": all_companies_fund_count_sum, "percent": 100, } ) logger.info(f"Top 100 companies fund count sum: {top_100_companies_fund_count}") top_n_company_fund_count_df = pd.DataFrame(top_n_company_fund_count_list) # statistics provider_mapping_data for counting FundClassId count based on CompanyId and CompanyName logger.info( "statistics provider_mapping_data for counting SecId count based on CompanyId and CompanyName" ) provider_share_class_id_df = provider_mapping_data[ ["CompanyId", "CompanyName", "SecId"] ].drop_duplicates() provider_share_class_count = ( provider_share_class_id_df.groupby(["CompanyId", "CompanyName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order provider_share_class_count = provider_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in provider_share_class_count by describe and transform to DataFrame provider_share_class_count_stat_df = get_describe_stat( provider_share_class_count, "share_class_count", "provider_share_class_count" ) describe_stat_df_list.append(provider_share_class_count_stat_df) # Get the fund_count sum of all companies all_companies_share_class_count_sum = provider_share_class_count[ "share_class_count" ].sum() top_n_company_share_class_count_list = [] # Get the fund_count sum of top 5 companies top_5_companies_share_class_count, top_5_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 5, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 5, "share_class_count": top_5_companies_share_class_count, "percent": top_5_companies_share_class_count_percent, } ) logger.info( f"Top 5 companies share class count sum: {top_5_companies_share_class_count}" ) # Get the fund_count sum of top 10 companies top_10_companies_share_class_count, top_10_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 10, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 10, "share_class_count": top_10_companies_share_class_count, "percent": top_10_companies_share_class_count_percent, } ) logger.info( f"Top 10 companies share class count sum: {top_10_companies_share_class_count}" ) # Get the fund_count sum of top 50 companies top_50_companies_share_class_count, top_50_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 50, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 50, "share_class_count": top_50_companies_share_class_count, "percent": top_50_companies_share_class_count_percent, } ) logger.info( f"Top 50 companies share class count sum: {top_50_companies_share_class_count}" ) # Get the fund_count sum of top 100 companies top_100_companies_share_class_count, top_100_companies_share_class_count_percent = ( get_top_n_records_count( provider_share_class_count, "share_class_count", 100, all_companies_share_class_count_sum, ) ) top_n_company_share_class_count_list.append( { "top_n_providers": 100, "share_class_count": top_100_companies_share_class_count, "percent": top_100_companies_share_class_count_percent, } ) logger.info( f"Top 100 companies share class count sum: {top_100_companies_share_class_count}" ) top_n_company_share_class_count_list.append( { "top_n_providers": len(provider_share_class_count), "share_class_count": all_companies_share_class_count_sum, "percent": 100, } ) top_n_company_share_class_count_df = pd.DataFrame( top_n_company_share_class_count_list ) # statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName logger.info( "statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName" ) fund_share_class_id_df = provider_mapping_data[ ["FundId", "FundLegalName", "SecId"] ].drop_duplicates() fund_share_class_count = ( fund_share_class_id_df.groupby(["FundId", "FundLegalName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in fund_share_class_count fund_share_class_count = fund_share_class_count.sort_values( by="share_class_count", ascending=False ) # statistics share_class_count in fund_share_class_count by describe and transform to DataFrame fund_share_class_count_stat_df = get_describe_stat( fund_share_class_count, "share_class_count", "fund_share_class_count" ) describe_stat_df_list.append(fund_share_class_count_stat_df) describe_stat_df = pd.concat(describe_stat_df_list) describe_stat_df.reset_index(drop=True, inplace=True) stat_file = os.path.join(output_folder, "provider_mapping_data_statistics.xlsx") # save statistics data to excel with pd.ExcelWriter(stat_file) as writer: top_n_company_fund_count_df.to_excel( writer, sheet_name="top_n_provider_fund_count", index=False ) top_n_company_share_class_count_df.to_excel( writer, sheet_name="top_n_provider_share_count", index=False ) provider_fund_count.to_excel( writer, sheet_name="provider_fund_count", index=False ) provider_share_class_count.to_excel( writer, sheet_name="provider_share_count", index=False ) fund_share_class_count.to_excel( writer, sheet_name="fund_share_count", index=False ) describe_stat_df.to_excel( writer, sheet_name="all_describe_statistics", index=False ) def statistics_document_fund_share_count(provider_mapping_data_file: str): if ( provider_mapping_data_file is None or len(provider_mapping_data_file) == 0 or not os.path.exists(provider_mapping_data_file) ): logger.error(f"Invalid file_path: {provider_mapping_data_file}") return describe_stat_df_list = [] # statistics document mapping information doc_mapping_data = pd.read_excel(provider_mapping_data_file, sheet_name="all_data") # statistics doc_mapping_data for counting FundId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundId count based on DocumentId" ) doc_fund_id_df = doc_mapping_data[["DocumentId", "CompanyId", "CompanyName", "FundId"]].drop_duplicates() doc_fund_count = ( doc_fund_id_df.groupby(["DocumentId", "CompanyId", "CompanyName"]).size().reset_index(name="fund_count") ) # order by fund_count in descending order doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=True) # statistics fund_count in doc_fund_count by describe and transform to DataFrame doc_fund_count_stat_df = get_describe_stat( doc_fund_count, "fund_count", "doc_fund_count" ) describe_stat_df_list.append(doc_fund_count_stat_df) # statistics doc_mapping_data for counting FundClassId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundClassId count based on DocumentId" ) doc_share_class_id_df = doc_mapping_data[ ["DocumentId", "CompanyId", "CompanyName", "FundClassId"] ].drop_duplicates() doc_share_class_count = ( doc_share_class_id_df.groupby(["DocumentId", "CompanyId", "CompanyName"]) .size() .reset_index(name="share_class_count") ) # order by share_class_count in descending order doc_share_class_count = doc_share_class_count.sort_values( by="share_class_count", ascending=True ) # statistics share_class_count in doc_share_class_count by describe and transform to DataFrame doc_share_class_count_stat_df = get_describe_stat( doc_share_class_count, "share_class_count", "doc_share_class_count" ) describe_stat_df_list.append(doc_share_class_count_stat_df) describe_stat_df = pd.concat(describe_stat_df_list) describe_stat_df.reset_index(drop=True, inplace=True) with pd.ExcelWriter(provider_mapping_data_file) as writer: doc_mapping_data.to_excel(writer, sheet_name="all_data", index=False) doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False) doc_share_class_count.to_excel(writer, sheet_name="doc_share_class_count", index=False) describe_stat_df.to_excel(writer, sheet_name="all_describe_statistics", index=False) def get_top_n_records_count( df: pd.DataFrame, column_name: str, n: int, total_count: int ): top_n_records = df.head(n) top_n_records_count = top_n_records[column_name].sum() top_n_records_count_percent = round((top_n_records_count / total_count) * 100, 2) return top_n_records_count, top_n_records_count_percent def get_describe_stat(df: pd.DataFrame, column_name: str, stat_type_name: str): stat_df = df[column_name].describe().reset_index().T stat_df.columns = ["count", "mean", "std", "min", "25%", "50%", "75%", "max"] stat_df.reset_index(inplace=True) stat_df.rename(columns={"index": "Stat"}, inplace=True) # remove the first row stat_df = stat_df[1:] if stat_type_name is not None: stat_df["Stat_Type"] = stat_type_name stat_df = stat_df[ [ "Stat_Type", "count", "mean", "std", "min", "25%", "50%", "75%", "max", ] ] return stat_df if __name__ == "__main__": doc_provider_file_path = ( r"/data/emea_ar/basic_information/English/latest_provider_ar_document.xlsx" ) doc_mapping_file_path = r"/data/emea_ar/basic_information/English/latest_provider_ar_document_mapping.xlsx" provider_mapping_data_file = ( r"/data/emea_ar/basic_information/English/provider_mapping_data.xlsx" ) doc_mapping_from_top_100_provider_file = ( r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx" ) basic_info_folder = r"/data/emea_ar/basic_information/English/" pdf_folder = r"/data/emea_ar/pdf/" output_folder = r"/data/emea_ar/output/" # get_unique_docids_from_doc_provider_data(doc_provider_file_path) # download_pdf(doc_provider_file_path, 'doc_provider_count', pdf_folder) # output_pdf_page_text(pdf_folder, output_folder) # extract_pdf_table(pdf_folder, output_folder) # analyze_json_error() # statistics_document(pdf_folder, doc_mapping_file_path, basic_info_folder) # statistics_provider_mapping( # provider_mapping_data_file=provider_mapping_data_file, # output_folder=basic_info_folder, # ) statistics_document_fund_share_count(doc_mapping_from_top_100_provider_file)