diff --git a/.gitignore b/.gitignore index cf4b335..34c5c08 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /log /utils/__pycache__ /__pycache__/*.pyc +/core/__pycache__/*.pyc diff --git a/configuration/datapoint_keyword.json b/configuration/datapoint_keyword.json new file mode 100644 index 0000000..49fe551 --- /dev/null +++ b/configuration/datapoint_keyword.json @@ -0,0 +1,468 @@ +{ + "ISIN": { + "english": [ + "ISIN", + "ISIN Code" + ], + "spanish": [ + "ISIN", + "ISIN Code" + ], + "german": [ + "ISIN", + "ISIN Code" + ], + "dutch": [ + "ISIN", + "ISIN Code" + ], + "french": [ + "ISIN", + "ISIN Code" + ], + "finnish": [ + "ISIN", + "ISIN Code" + ], + "swedish": [ + "ISIN", + "ISIN Code" + ], + "danish": [ + "ISIN", + "ISIN Code" + ], + "norwegian": [ + "ISIN", + "ISIN Code" + ], + "lithuanian": [ + "ISIN", + "ISIN Code" + ], + "polish": [ + "ISIN", + "ISIN Code" + ], + "latvian": [ + "ISIN", + "ISIN Code" + ], + "india(keywords)": [ + "ISIN", + "ISIN Code" + ], + "estonian": [ + "ISIN", + "ISIN Code" + ], + "malay": [ + "ISIN", + "ISIN Code" + ], + "italian": [ + "ISIN", + "ISIN Code" + ], + "portuguese": [ + "ISIN", + "ISIN Code" + ] + }, + "ter": { + "english": [ + "Synthetic TER", + "Fund TER", + "TER", + "T.E.R", + "TER_REF", + "Total Expense Ratio", + "Total Expense Ratios", + "Expense ratio", + "Total Fund Charge", + "Gross Expense Ratio", + "Gross Expense Ratios", + "all-in-fee", + "all-in fee", + "all in fee", + "Total Net Expense Ratio", + "Total Net Expense Ratios", + "Total Operating Expense", + "Expense Ratio", + "Expense Ratio -Direct", + "Expense Ratio -Regular", + "month End Expense ratio", + "expense ratio", + "Expenses Ratios", + "Weighted AverageExpense Ratio", + "expenses ratio", + "Total Expense as % of AAuM", + "Total Expenses as a % of AAuM", + "Recurring Expenses as a percentage to Average Net Assets", + "Total Expenses as % of AAuM", + "Income and Expenditure", + "Expenditure at Plan level as %", + "Total Expenses Inclusive of Management Fees of Direct Plan", + "Total Expenses Inclusive of Management Fees of Regular Plan" + ], + "spanish": [ + "Rácio da despesa total", + "Ratio Total de Gastos", + "TER", + "Ratio de gastos totales" + ], + "german": [ + "Gesamtgebühren", + "Kostenpauschale", + "Gesamtkostenquote", + "Gesamtaufwandsquoten", + "Betriebskostenquote des Fonds", + "TER", + "Total Expense Ratio", + "Total Expense Ratios" + ], + "dutch": [ + "Totale-kostenpercentage", + "Totale Kostenratio", + "TKR", + "Total Expense Ratio", + "Totale kostenpercentage", + "Totaal kostenpercentage" + ], + "french": [ + "Total des frais sur encours", + "TFE", + "Ratios des charges totales", + "Frais sur encours", + "RCT", + "Total des frais sur encours", + "TER", + "Ratio des dépenses totales", + "Ratio de dépenses totales" + ], + "finnish": [ + "palkkiot yhteensä", + "total expence ratio" + ], + "swedish": [ + "Totalkostnadsandel", + "TER" + ], + "danish": [ + "Administrationsomk", + "Omkostningsprocent" + ], + "norwegian": [ + "TER", + "Kostnadsratio" + ], + "lithuanian": [ + "Bendrųjų išlaidų koeficientas", + "Bendrasis metinis išlaidų rodikli" + ], + "polish": [ + "Współczynnik kosztów całkowitych", + "WKC" + ], + "latvian": [ + "Kopējo izdevumu koeficients", + "KIK" + ], + "india(keywords)": [ + "Expenditure", + "expense ratio", + "ratio of", + "gross expense" + ], + "estonian": [ + "Kogukulude suhe", + "Kogukulude suhe aasta lõikes" + ], + "malay": [ + "NPP", + "Nisbah Perbelanjaan Pengurusan" + ], + "italian": [ + "TER", + "Total Expense Ratio", + "Coefficienti di spesa totale", + "Coefficiente di spesa totale" + ], + "portuguese": [ + "Taxa Global de custos", + "Quocientes de Despesa Total", + "Totale kostenpercentage" + ], + "hungarian": [ + "Összes ráfordítás aránya" + ] + }, + "tor": { + "english": [ + "TOR", + "Turnover*", + "Turnover", + "Turnover Ratio", + "Turnover Rate", + "Portfolio Turnover", + "Portfolio turnover ratio", + "Portfolio turnover rate", + "PTR", + "Annual Portfolio Turnover Ratio" + ], + "india": [ + "Aggregate Value of Purchase and Sale", + "The aggregate value of investments", + "The aggregate value of purchases", + "the aggregate of sales", + "Aggregate value of investments purchased and sold", + "The aggregate value of purchases and sales" + ], + "spanish": [ + "Rotación de la Cartera", + "Índice de rotación de la cartera", + "Ratio de rotación de la cartera" + ], + "german": [ + "Rotation", + "Umschlagshaufigkeit", + "Portfolioumschlagshäufigkeit", + "Umschlagshäufigkeit", + "PTR", + "Portfolio Turnover Rate", + "Portefeuilleumsatz", + "Portfolio Turnover Ratio", + "Umsatz", + "Portfolioumschlagsra", + "Umschlagkennziffer", + "Portfolioumschlag", + "Portfolioumschlagsrate" + ], + "dutch": [ + "Omloopsnelheid", + "Omloopfactor", + "Omloopsnelheid", + "Turnover rate", + "Rotatie van de portefeuille", + "POF", + "Turnover ratio" + ], + "french": [ + "taux de rotation", + "Taux de rotation du portefeuille", + "Rotation de portefeuille", + "Ratio de rotation du portefeuille" + ], + "finnish": [ + "salkun kiertonopeus" + ], + "swedish": [ + "Omsättningshastighet" + ], + "danish": [ + "Omsætningshastighed" + ], + "norwegian": [ + "Omløpshastighet" + ], + "india(keywords)": [ + "Aggregate value" + ], + "malay": [ + "PGP", + "Pusing Ganti Portfolio" + ], + "italian": [ + "Tasso di movimentazione del portafoglio", + "Turnover del portafoglio", + "Indice di Rotazione del Portafoglio" + ], + "portuguese": [ + "Rotação média da carteira", + "Índice de Movimentação da Carteira de Investimento" + ] + }, + "ogc": { + "english": [ + "Synthetic Ongoing Charges excluding", + "On-going Charge", + "Ongoing Charge", + "ongoing charges", + "On-going Fee", + "Ongoing fee", + "OGC", + "OGF", + "Operation Charge", + "On Going Charges", + "OC", + "Ongoing Charge Figure OCF", + "Ongoing Fund Charge", + "Operating Charge", + "Operating Charges", + "Operating expenses", + "Operating, Administrative and Servicing Expenses" + ], + "spanish": [ + "Gastos Corrientes", + "Gastos Recurrentes" + ], + "german": [ + "Laufende Kosten", + "OGC", + "Ongoing Charge", + "laufende kosten in prozent", + "laufenden Kosten", + "Betriebskosten", + "Betriebsgebühren" + ], + "dutch": [ + "Lopende kosten", + "Lopende kosten factor", + "LKF", + "Ongoing Charge", + "OCF" + ], + "french": [ + "Frais courants", + "Commission de frais opérationels" + ], + "italian": [ + "Spese Correnti" + ], + "portuguese": [ + "Encargos Correntes", + "Custos correntes" + ] + }, + "performance_fee": { + "english": [ + "Performance Fee", + "Performance Fees", + "performance-based fee", + "performance-related fee" + ], + "spanish": [ + "Comisión de Gestión sobre Resultados", + "Comisión sobre Resultados", + "Comisión de Rentabilidad", + "Comisiones de éxito", + "Comisión de Éxito", + "Comisión por resultados", + "comisión de rentabilidad" + ], + "german": [ + "Erfolgsabhängige Vergütung", + "Erfolgsbezogene Vergütung", + "Performancegebühren", + "An die Wertentwicklung des Fonds gebundene Gebühren", + "Performancegebühr", + "Performance-gebühr", + "Erfolgshonorare", + "Erfolgsabhän-giger Vergütung", + "Erfolgshonorar", + "Performance-Fee", + "Erfolgsgebühr", + "perfolgsabhängige Verwaltungsvergütung", + "performanceabhängige Vergütung", + "Performance- gebühren" + ], + "dutch": [ + "Prestatievergoeding", + "Performance Fee" + ], + "french": [ + "Les commissions de surperformance", + "Commission de performance", + "Commissions de surperformance", + "frais de performance" + ], + "italian": [ + "Commissioni di performance", + "Commissioni legate al rendimento", + "Commissioni d’incentivo" + ], + "portuguese": [ + "Comissão de desempenho", + "Custos de performance", + "Comissão de Gestão Variável" + ], + "estonian": [ + " Edukustasud aasta lõikes" + ], + "latvian": [ + "Gada prēmijas par sasniegtajiem rezultātiem" + ], + "Lithuanian": [ + "Metinis mokestis už veiklos rezultatu" + ] + }, + "trading expense ratio": { + "english": [ + "Trading expense ratio", + "Trading Expense Ratio10" + ] + }, + "mer": { + "english": [ + "Management expense ratio", + "Management expense ratio after taxes", + "Expense ratio" + ] + }, + "MgtFee": { + "english": [ + "Management Fee as % of AAuM", + "Management Fee including GST as % of AAuM", + "Management Fees", + "Management fee inclusive of service tax GST at annualised average rate", + "Management and Trusteeship Fees", + "Investment Management and Trusteeship fees", + "Investment management fees " + ] + }, + "max_management_fee": { + "english": [ + "management fee", + "Periodic Charge", + "Advisory", + "max_management_fee" + ] + }, + "max_front_load": { + "english": [ + "Sales charge", + "subscription fee", + "subscription charge", + "subscription commission", + "sales fee", + "entry fee", + "initial charge", + "preliminary charge", + "preliminary fee", + "Entry Charge", + "Initial Sales Charge", + "max_front_load" + ] + }, + "min_initial_purchase": { + "english": [ + "Minimum Initial Subscription", + "Minimum Subscription", + "Minimum Subscription Amount", + "Minimum initial investment", + "min_initial_purchase" + ] + }, + "min_subsequent_purchase": { + "english": [ + "Minimum Additional", + "Minimum Additional Subscription Amount", + "Minimum initial and subsequence subscription", + "Minimum Additional Subscription", + "Minimum Subsequent Investment", + "Minimum Subsequent Purchase", + "additional", + "min_subsequent_purchase" + ] + } +} \ No newline at end of file diff --git a/configuration/domicile_datapoints.json b/configuration/domicile_datapoints.json new file mode 100644 index 0000000..14796ae --- /dev/null +++ b/configuration/domicile_datapoints.json @@ -0,0 +1,30 @@ +{ + "CAN": { + "ar": [ + "mer", + "tor", + "trading expense ratio" + ] + }, + "IND": { + "ar": [ + "ter", + "MgtFee", + "tor" + ] + }, + "default": { + "ar": [ + "tor", + "ter", + "ogc", + "performance_fee" + ], + "prospectus": [ + "max_management_fee", + "max_front_load", + "min_initial_purchase", + "min_subsequent_purchase" + ] + } +} \ No newline at end of file diff --git a/configuration/language.json b/configuration/language.json new file mode 100644 index 0000000..230ef90 --- /dev/null +++ b/configuration/language.json @@ -0,0 +1,22 @@ +{ + "0L00000122": "english", + "0LMIX00001": "english", + "0LMIX00002": "english", + "0L00000482": "english", + "0LMIX00003": "german", + "0L00000152": "german", + "0L00000114": "dutch", + "0L00000138": "french", + "0L00000203": "italian", + "0L00000408": "spanish", + "0L00000348": "portuguese", + "0L00000135": "Finnish", + "0L00000415": "Swedish", + "0L00000104": "Danish", + "0L00000320": "Norwegian", + "0L00000254": "Lithuanian", + "0L00000347": "Polish", + "0L00000250": "Latvian", + "0L00000127": "Estonian", + "0L00000273": "Malay" +} \ No newline at end of file diff --git a/core/page_filter.py b/core/page_filter.py new file mode 100644 index 0000000..7a0774a --- /dev/null +++ b/core/page_filter.py @@ -0,0 +1,129 @@ +import os +import json +import re +import fitz +import pandas as pd +from utils.pdf_util import PDFUtil +from utils.sql_query_util import query_document_fund_mapping +from utils.logger import logger +from utils.biz_utils import add_slash_to_text_as_regex + + +class FilterPages: + def __init__( + self, doc_id: str, pdf_file: str, document_mapping_info_df: pd.DataFrame + ) -> None: + self.doc_id = doc_id + self.pdf_file = pdf_file + self.page_text_dict = self.get_pdf_page_text_dict() + if document_mapping_info_df is None or len(document_mapping_info_df) == 0: + self.document_mapping_info_df = query_document_fund_mapping(doc_id) + else: + self.document_mapping_info_df = document_mapping_info_df + self.get_configuration_from_file() + self.doc_info = self.get_doc_info() + self.datapoint_config = self.get_datapoint_config() + + def get_pdf_page_text_dict(self) -> dict: + pdf_util = PDFUtil(self.pdf_file) + success, text, page_text_dict = pdf_util.extract_text() + return page_text_dict + + def get_configuration_from_file(self) -> dict: + language_config_file = r"./configuration/language.json" + domicile_datapoint_config_file = r"./configuration/domicile_datapoints.json" + datapoint_keywords_config_file = r"./configuration/datapoint_keyword.json" + with open(language_config_file, "r", encoding="utf-8") as file: + self.language_config = json.load(file) + with open(domicile_datapoint_config_file, "r", encoding="utf-8") as file: + self.domicile_datapoint_config = json.load(file) + with open(datapoint_keywords_config_file, "r", encoding="utf-8") as file: + self.datapoint_keywords_config = json.load(file) + + def get_doc_info(self) -> dict: + if len(self.document_mapping_info_df) == 0: + return { + "effective_date": None, + "document_type": "ar", + "language_id": "0L00000122", + "language": "english", + "domicile": "LUX", + } + effective_date = self.document_mapping_info_df["EffectiveDate"].iloc[0] + document_type = self.document_mapping_info_df["DocumentType"].iloc[0] + if document_type in [4, 5]: + document_type = "ar" + elif document_type == 1: + document_type = "prospectus" + language_id = self.document_mapping_info_df["Language"].iloc[0] + language = self.language_config.get(language_id, None) + domicile = self.document_mapping_info_df["Domicile"].iloc[0] + return { + "effective_date": effective_date, + "document_type": document_type, + "language_id": language_id, + "language": language, + "domicile": domicile, + } + + def get_datapoint_config(self) -> dict: + domicile = self.doc_info.get("domicile", None) + document_type = self.doc_info.get("document_type", None) + language = self.doc_info.get("language", None) + if language is None: + language = "english" + if self.domicile_datapoint_config.get(domicile, None) is None: + domicile = "default" + if self.domicile_datapoint_config[domicile].get(document_type, None) is None: + document_type = "ar" + datapoint_list = self.domicile_datapoint_config[domicile][document_type] + datapoint_keywords = {} + for datapoint in datapoint_list: + keywords = self.datapoint_keywords_config.get(datapoint, {}).get(language, []) + if len(keywords) > 0: + keywords = self.optimize_keywords_regex(keywords) + datapoint_keywords[datapoint] = keywords + if language != "english": + english_keywords = self.datapoint_keywords_config.get(datapoint, {}).get("english", []) + if len(english_keywords) > 0: + english_keywords = self.optimize_keywords_regex(english_keywords) + datapoint_keywords[datapoint] += english_keywords + return datapoint_keywords + + def optimize_keywords_regex(self, keywords: list) -> list: + new_keywords = [] + for keyword in keywords: + new_keyword = add_slash_to_text_as_regex(keyword) + new_keywords.append(new_keyword) + return new_keywords + + def clean_text(self, text: str) -> str: + text = text.lower() + text = re.sub(r"\s+", ' ', text.strip()) + return text + + def start_job(self) -> dict: + logger.info(f"Start extracting datapoints from {self.pdf_file}") + """ + 1. Ierate document pages + 2. Filter by data point keywords + 3. Result should be like this: + { + "doc_id": "445256897", + "ter": [5, 6, 10, 15], + "tor": [6, 8, 10] + } + """ + result = {"doc_id": self.doc_id} + for datapoint in self.datapoint_config.keys(): + result[datapoint] = [] + for page_num, page_text in self.page_text_dict.items(): + text = self.clean_text(page_text) + for datapoint, keywords in self.datapoint_config.items(): + # idx = idx & np.array([re.findall(r'\b' + word + r'\d*\b', page) != [] for page in self.pages_clean]) + for keyword in keywords: + search_regex = r"\b{0}\d*\b\s*".format(keyword) + if re.search(search_regex, text, re.IGNORECASE): + result[datapoint].append(page_num) + break + return result diff --git a/main.py b/main.py index 5df5440..48f640f 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,65 @@ -def main(): - print("Hello World!") - +import os +import json +import pandas as pd +from glob import glob +from tqdm import tqdm +import time +from utils.logger import logger +from utils.pdf_download import download_pdf_from_documents_warehouse +from utils.sql_query_util import query_document_fund_mapping +from core.page_filter import FilterPages + + +class EMEA_AR_Parsing: + def __init__(self, doc_id: str, pdf_folder: str = r"/data/emea_ar/pdf/") -> None: + self.doc_id = doc_id + self.pdf_folder = pdf_folder + os.makedirs(self.pdf_folder, exist_ok=True) + self.pdf_file = self.download_pdf() + self.document_mapping_info_df = query_document_fund_mapping(doc_id) + self.datapoint_page_info = self.get_datapoint_page_info() + + def download_pdf(self) -> str: + pdf_file = download_pdf_from_documents_warehouse(self.pdf_folder, self.doc_id) + return pdf_file + + def get_datapoint_page_info(self) -> dict: + filter_pages = FilterPages( + self.doc_id, self.pdf_file, self.document_mapping_info_df + ) + datapoint_page_info = filter_pages.start_job() + return datapoint_page_info + + +def filter_pages(doc_id: str, pdf_folder: str) -> None: + logger.info(f"Parsing EMEA AR for doc_id: {doc_id}") + emea_ar_parsing = EMEA_AR_Parsing(doc_id, pdf_folder) + return emea_ar_parsing.datapoint_page_info + + +def batch_filter_pdf_files(pdf_folder: str, output_folder: str) -> None: + pdf_files = glob(pdf_folder + "*.pdf") + result_list = [] + for pdf_file in tqdm(pdf_files): + pdf_base_name = os.path.basename(pdf_file) + doc_id = pdf_base_name.split(".")[0] + datapoint_page_info = filter_pages(doc_id=doc_id, pdf_folder=pdf_folder) + result_list.append(datapoint_page_info) + result_df = pd.DataFrame(result_list) + result_df.reset_index(drop=True, inplace=True) + + logger.info(f"Saving the result to {output_folder}") + os.makedirs(output_folder, exist_ok=True) + time_stamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) + output_file = os.path.join( + output_folder, + f"datapoint_page_info_{len(result_df)}_documents_{time_stamp}.xlsx", + ) + with pd.ExcelWriter(output_file) as writer: + result_df.to_excel(writer, index=False) + + if __name__ == "__main__": - main() \ No newline at end of file + pdf_folder = r"/data/emea_ar/small_pdf/" + output_folder = r"/data/emea_ar/output/filter_pages/" + batch_filter_pdf_files(pdf_folder, output_folder) diff --git a/prepare_data.py b/prepare_data.py index 409e406..14d8c5e 100644 --- a/prepare_data.py +++ b/prepare_data.py @@ -48,7 +48,7 @@ def download_pdf(doc_provider_file_path: str, sheet_name: str, pdf_path: str): # download pdfs logger.info(f"Start downloading {len(doc_id_list)} pdfs") os.makedirs(pdf_path, exist_ok=True) - for doc_id in tqdm.tqdm(doc_id_list): + for doc_id in tqdm(doc_id_list): logger.info(f"Downloading pdf for docid: {doc_id}") download_pdf_from_documents_warehouse(pdf_directory=pdf_path, doc_id=doc_id) time.sleep(1) @@ -565,17 +565,53 @@ def statistics_document_fund_share_count(provider_mapping_data_file: str): describe_stat_df_list = [] # statistics document mapping information doc_mapping_data = pd.read_excel(provider_mapping_data_file, sheet_name="all_data") + # set noTor column value to 0 if column tor value is not nan, set 1 otherwise + doc_mapping_data["noTor"] = doc_mapping_data["tor"].apply( + lambda x: 0 if pd.notna(x) else 1 + ) + # set share_noTer column value to 0 if column share_ter value is not nan, set 1 otherwise + doc_mapping_data["share_noTer"] = doc_mapping_data["share_ter"].apply( + lambda x: 0 if pd.notna(x) else 1 + ) + # set share_noOgc column value to 0 if column share_ter value is not nan, set 1 otherwise + doc_mapping_data["share_noOgc"] = doc_mapping_data["share_ogc"].apply( + lambda x: 0 if pd.notna(x) else 1 + ) + # set share_noPerfFee column value to 0 if column share_ter value is not nan, set 1 otherwise + doc_mapping_data["share_noPerfFee"] = doc_mapping_data["share_perfFee"].apply( + lambda x: 0 if pd.notna(x) else 1 + ) # statistics doc_mapping_data for counting FundId count based on DocumentId logger.info( "statistics doc_mapping_data for counting FundId count based on DocumentId" ) - doc_fund_id_df = doc_mapping_data[["DocumentId", "CompanyId", "CompanyName", "FundId"]].drop_duplicates() + doc_fund_id_df = doc_mapping_data[["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundId"]].drop_duplicates() doc_fund_count = ( - doc_fund_id_df.groupby(["DocumentId", "CompanyId", "CompanyName"]).size().reset_index(name="fund_count") + doc_fund_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"]).size().reset_index(name="fund_count") ) # order by fund_count in descending order doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=True) + + # set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0 + doc_fund_count["with_ar_data"] = False + for index, row in doc_fund_count.iterrows(): + document_id = row["DocumentId"] + ar_data = doc_mapping_data[ + (doc_mapping_data["DocumentId"] == document_id) + & ( + ( + (doc_mapping_data["noTor"] == 0) + | (doc_mapping_data["share_noTer"] == 0) + | (doc_mapping_data["share_noOgc"] == 0) + | (doc_mapping_data["share_noPerfFee"] == 0) + ) + ) + ] + if len(ar_data) > 0: + doc_fund_count.loc[index, "with_ar_data"] = True + + # statistics fund_count in doc_fund_count by describe and transform to DataFrame doc_fund_count_stat_df = get_describe_stat( doc_fund_count, "fund_count", "doc_fund_count" @@ -587,10 +623,10 @@ def statistics_document_fund_share_count(provider_mapping_data_file: str): "statistics doc_mapping_data for counting FundClassId count based on DocumentId" ) doc_share_class_id_df = doc_mapping_data[ - ["DocumentId", "CompanyId", "CompanyName", "FundClassId"] + ["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundClassId"] ].drop_duplicates() doc_share_class_count = ( - doc_share_class_id_df.groupby(["DocumentId", "CompanyId", "CompanyName"]) + doc_share_class_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"]) .size() .reset_index(name="share_class_count") ) @@ -598,6 +634,24 @@ def statistics_document_fund_share_count(provider_mapping_data_file: str): doc_share_class_count = doc_share_class_count.sort_values( by="share_class_count", ascending=True ) + # set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0 + doc_share_class_count["with_ar_data"] = False + for index, row in doc_share_class_count.iterrows(): + document_id = row["DocumentId"] + ar_data = doc_mapping_data[ + (doc_mapping_data["DocumentId"] == document_id) + & ( + ( + (doc_mapping_data["noTor"] == 0) + | (doc_mapping_data["share_noTer"] == 0) + | (doc_mapping_data["share_noOgc"] == 0) + | (doc_mapping_data["share_noPerfFee"] == 0) + ) + ) + ] + if len(ar_data) > 0: + doc_share_class_count.loc[index, "with_ar_data"] = True + # statistics share_class_count in doc_share_class_count by describe and transform to DataFrame doc_share_class_count_stat_df = get_describe_stat( doc_share_class_count, "share_class_count", "doc_share_class_count" @@ -648,6 +702,116 @@ def get_describe_stat(df: pd.DataFrame, column_name: str, stat_type_name: str): return stat_df +def pickup_document_from_top_100_providers(): + """ + Pickup 100 documents from top 100 providers. + The documents are with less 10 share classes. + The purpose is to analyze the document structure and content from small documents. + """ + provider_mapping_data_file = ( + r"/data/emea_ar/basic_information/English/provider_mapping_data_statistics.xlsx" + ) + top_100_provider_document_file = ( + r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx" + ) + provider_share_count = pd.read_excel( + provider_mapping_data_file, sheet_name="provider_share_count" + ) + # add a new column with name share_count_rank to provider_share_count + provider_share_count["share_count_rank"] = provider_share_count[ + "share_class_count" + ].rank(method="min", ascending=False) + + top_100_provider_document_all_data = pd.read_excel( + top_100_provider_document_file, sheet_name="all_data" + ) + + top_100_provider_document_share_count = pd.read_excel( + top_100_provider_document_file, sheet_name="doc_share_class_count" + ) + top_100_provider_document_share_count = \ + top_100_provider_document_share_count[top_100_provider_document_share_count["with_ar_data"] == True] + top_100_provider_document_share_count.reset_index(drop=True, inplace=True) + + # add a new column with name share_count_rank to top_100_provider_document_share_count by merge with provider_share_count + top_100_provider_document_share_count = pd.merge( + top_100_provider_document_share_count, + provider_share_count, + on=["CompanyId"], + how="left", + ) + # Keep columns: DocumentId, CompanyId, CompanyName, share_class_count_x, share_count_rank + top_100_provider_document_share_count = top_100_provider_document_share_count[ + ["DocumentId", "CompanyId", "CompanyName_x", "share_class_count_x", "share_count_rank"] + ] + # rename column share_class_count_x to share_class_count + top_100_provider_document_share_count.rename( + columns={"share_class_count_x": "share_class_count", + "CompanyName_x": "Company_Name", + "share_count_rank": "provider_share_count_rank"}, inplace=True + ) + top_100_provider_document_share_count = top_100_provider_document_share_count.sort_values( + by=["provider_share_count_rank", "share_class_count"], ascending=True + ) + + # According to share_count_rank, from 1 to 10, + # random pickup one documents with 1 to 10 share classes for each rank + data_filter = top_100_provider_document_share_count[ + (top_100_provider_document_share_count["share_class_count"] <= 10) + & (top_100_provider_document_share_count["share_class_count"] >= 1) + ] + data_filter = data_filter.sort_values( + by=["provider_share_count_rank", "share_class_count"], ascending=[True, True] + ) + unique_rank_list = top_100_provider_document_share_count["provider_share_count_rank"].unique().tolist() + random_pickup_document_data_list = [] + for rank in unique_rank_list: + data_filter_rank = data_filter[data_filter["provider_share_count_rank"] == rank] + if len(data_filter_rank) == 0: + # get the first document with rank from top_100_provider_document_share_count + data_filter_rank = top_100_provider_document_share_count[ + top_100_provider_document_share_count["provider_share_count_rank"] == rank + ].head(1) + data_filter_rank = data_filter_rank.sample(n=1, random_state=88) + random_pickup_document_data_list.append(data_filter_rank) + random_pickup_document_data = pd.concat(random_pickup_document_data_list) + # sort by share_count_rank in ascending order + random_pickup_document_data = random_pickup_document_data.sort_values( + by="provider_share_count_rank", ascending=True + ) + random_pickup_document_data.reset_index(drop=True, inplace=True) + + random_pickup_document_mini_data = random_pickup_document_data[ + ["DocumentId", "provider_share_count_rank"] + ] + # get all data from top_100_provider_document_all_data by merge with random_pickup_document_mini_data + random_pickup_document_all_data = pd.merge( + random_pickup_document_mini_data, + top_100_provider_document_all_data, + on=["DocumentId"], + how="left", + ) + # sort random_pickup_document_all_data by provider_share_count_rank, FundLegalName, FundClassLegalName in ascending order + random_pickup_document_all_data = random_pickup_document_all_data.sort_values( + by=["provider_share_count_rank", "FundLegalName", "FundClassLegalName"], ascending=True + ) + + random_small_document_data_file = ( + r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx" + ) + with pd.ExcelWriter(random_small_document_data_file) as writer: + top_100_provider_document_share_count.to_excel( + writer, sheet_name="all_doc_with_ar_data", index=False + ) + random_pickup_document_data.to_excel( + writer, sheet_name="random_small_document", index=False + ) + + random_pickup_document_all_data.to_excel( + writer, sheet_name="random_small_document_all_data", index=False + ) + + if __name__ == "__main__": doc_provider_file_path = ( r"/data/emea_ar/basic_information/English/latest_provider_ar_document.xlsx" @@ -664,7 +828,14 @@ if __name__ == "__main__": output_folder = r"/data/emea_ar/output/" # get_unique_docids_from_doc_provider_data(doc_provider_file_path) # download_pdf(doc_provider_file_path, 'doc_provider_count', pdf_folder) - # output_pdf_page_text(pdf_folder, output_folder) + pdf_folder = r"/data/emea_ar/small_pdf/" + output_folder = r"/data/emea_ar/small_pdf_txt/" + random_small_document_data_file = ( + r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx" + ) + download_pdf(random_small_document_data_file, 'random_small_document', pdf_folder) + output_pdf_page_text(pdf_folder, output_folder) + # extract_pdf_table(pdf_folder, output_folder) # analyze_json_error() @@ -674,4 +845,5 @@ if __name__ == "__main__": # provider_mapping_data_file=provider_mapping_data_file, # output_folder=basic_info_folder, # ) - statistics_document_fund_share_count(doc_mapping_from_top_100_provider_file) + # statistics_document_fund_share_count(doc_mapping_from_top_100_provider_file) + # pickup_document_from_top_100_providers() diff --git a/utils/biz_utils.py b/utils/biz_utils.py new file mode 100644 index 0000000..72346a7 --- /dev/null +++ b/utils/biz_utils.py @@ -0,0 +1,14 @@ +import re + +def add_slash_to_text_as_regex(text: str): + if text is None or len(text) == 0: + return text + special_char_iter = re.finditer("\W", text) + for special_iter in special_char_iter: + if len(special_iter.group().strip()) == 0: + continue + replace = r"\{0}".format(special_iter.group()) + if replace not in text: + text = re.sub(replace, replace, text) + text = re.sub(r"\s+", r"\\s+", text) + return text \ No newline at end of file diff --git a/utils/sql_query_util.py b/utils/sql_query_util.py new file mode 100644 index 0000000..0e8cddf --- /dev/null +++ b/utils/sql_query_util.py @@ -0,0 +1,80 @@ +import json +import time +from urllib import request +import pandas as pd +import os +import dotenv +# loads .env file with your OPENAI_API_KEY +dotenv.load_dotenv() + + +def query_document_fund_mapping(doc_id): + count = 1 + while True: + try: + document_mapping_info_df = query_data_by_biz_type( + biztype="getFundInfoByDocId", para=doc_id, return_df=True + ).drop_duplicates() + if len(document_mapping_info_df) == 0: + return document_mapping_info_df + document_mapping_info_df = document_mapping_info_df.sort_values( + by=["FundName", "ShareClassName"] + ).reset_index(drop=True) + return document_mapping_info_df + except Exception as e: + print(e) + time.sleep(3) + if count == 5: + break + count += 1 + + +def query_investment_by_provider(company_id: str): + count = 1 + while True: + try: + investment_by_provider_df = query_data_by_biz_type(biztype='getInvestmentByProvider', + para=company_id, + return_df=True).drop_duplicates() + investment_by_provider_df = investment_by_provider_df \ + .sort_values(by=['FundName', 'ShareClassName']) \ + .reset_index(drop=True) + return investment_by_provider_df + except Exception as e: + print(e) + time.sleep(3) + if count == 5: + break + count += 1 + + +def query_data_by_biz_type(biztype: str, para, return_df: bool): + sqlpass_url = "https://api.morningstar.com/sqlpassapi/v1/sql" + url = sqlpass_url + "?sqlName={0}¶ms={1}".format(biztype, str(para)) + headers = {"ApiKey": os.getenv("SQL_PASS_KEY")} + if return_df: + return pd.DataFrame(query_data_by_url(url, headers)) + else: + return query_data_by_url(url, headers) + + +def query_data_by_url(url, headers): + res = None + count = 1 + while True: + try: + req = request.Request(url=url, headers=headers) + res = request.urlopen(req) + res = res.read().decode(encoding="utf-8", errors="ignore") + break + except Exception as e: + print(e) + time.sleep(3) + if count == 5: + break + count += 1 + if res is not None: + dic = json.loads(res) + return dic["result"] + else: + return None