dc-ml-emea-ar/prepare_data.py

1042 lines
44 KiB
Python
Raw Normal View History

import pandas as pd
import os
2024-08-22 15:37:56 +00:00
from tqdm import tqdm
import json
from glob import glob
import fitz
import re
import time
2024-08-19 22:59:32 +00:00
import traceback
import json_repair
from utils.logger import logger
from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.pdf_util import PDFUtil
from pdf_table_extraction import PDFTableExtraction
2024-08-22 15:37:56 +00:00
def get_unique_docids_from_doc_provider_data(doc_provider_file_path: str):
doc_provider_data = pd.read_excel(doc_provider_file_path)
# get new data by grouping by docid, and count the number of rows for each docid,
# set the new data with 2 columns: docid and provider_count
2024-08-22 15:37:56 +00:00
doc_provider_count = (
doc_provider_data.groupby("DocumentId")
.size()
.reset_index(name="provider_count")
)
# sort new data by provider_count in descending order
2024-08-22 15:37:56 +00:00
doc_provider_count = doc_provider_count.sort_values(
by="provider_count", ascending=False
)
# save excel by doc_provider_data and new_data
with pd.ExcelWriter(doc_provider_file_path) as writer:
2024-08-22 15:37:56 +00:00
doc_provider_data.to_excel(
writer, sheet_name="doc_provider_details", index=False
)
doc_provider_count.to_excel(
writer, sheet_name="doc_provider_count", index=False
)
2024-10-08 22:16:01 +00:00
def download_pdf(doc_provider_file_path: str,
sheet_name: str,
pdf_path: str,
doc_id_column: str = "DocumentId"):
document_data = pd.read_excel(doc_provider_file_path, sheet_name=sheet_name)
# get all unique docids as list
2024-08-22 15:37:56 +00:00
doc_id_list = [
2024-10-08 22:16:01 +00:00
str(doc_id) for doc_id in document_data[doc_id_column].unique().tolist()
2024-08-22 15:37:56 +00:00
]
# download pdfs
logger.info(f"Start downloading {len(doc_id_list)} pdfs")
os.makedirs(pdf_path, exist_ok=True)
for doc_id in tqdm(doc_id_list):
logger.info(f"Downloading pdf for docid: {doc_id}")
download_pdf_from_documents_warehouse(pdf_directory=pdf_path, doc_id=doc_id)
time.sleep(1)
def output_pdf_page_text(pdf_folder: str, output_folder: str):
if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder):
logger.error(f"Invalid pdf_folder: {pdf_folder}")
return
if output_folder is None or len(output_folder) == 0:
logger.error(f"Invalid output_folder: {output_folder}")
return
2024-08-22 15:37:56 +00:00
os.makedirs(output_folder, exist_ok=True)
2024-08-22 15:37:56 +00:00
pdf_files = glob(os.path.join(pdf_folder, "*.pdf"))
logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}")
for pdf_file in pdf_files:
logger.info(f"Start processing {pdf_file}")
pdf_util = PDFUtil(pdf_file)
2024-08-22 15:37:56 +00:00
success, text, page_text_dict = pdf_util.extract_text(
output_folder=output_folder
)
if success:
logger.info(f"Successfully extracted text from {pdf_file}")
def extract_pdf_table(pdf_folder: str, output_folder: str):
if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder):
logger.error(f"Invalid pdf_folder: {pdf_folder}")
return
if output_folder is None or len(output_folder) == 0:
logger.error(f"Invalid output_folder: {output_folder}")
return
os.makedirs(output_folder, exist_ok=True)
2024-08-22 15:37:56 +00:00
pdf_files = glob(os.path.join(pdf_folder, "*.pdf"))
logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}")
for pdf_file in pdf_files:
logger.info(f"Start processing {pdf_file}")
pdf_table_extraction = PDFTableExtraction(pdf_file, output_folder)
pdf_table_extraction.extract_tables()
2024-08-19 22:59:32 +00:00
def analyze_json_error():
text_file = r"/data/emea_ar/output/pdf_table_prompts/445877368_4.txt"
2024-08-22 15:37:56 +00:00
with open(text_file, "r", encoding="utf-8") as file:
2024-08-19 22:59:32 +00:00
text = file.read()
2024-08-22 15:37:56 +00:00
json_response = re.search(r"\`\`\`json([\s\S]*)\`\`\`", text)
2024-08-19 22:59:32 +00:00
if json_response:
json_text = json_response.group(1)
json_data = {"tables": []}
try:
json_data = json.loads(json_text)
except:
json_data = json_repair.loads(json_text)
2024-08-22 15:37:56 +00:00
table_list = json_data.get("tables", [])
2024-08-19 22:59:32 +00:00
for table_num, table in enumerate(table_list):
2024-08-22 15:37:56 +00:00
table_md_file = os.path.join("/temp/", f"temp_{table_num}.md")
table = re.sub(r"(\n)+", "\n", table)
with open(table_md_file, "w", encoding="utf-8") as file:
2024-08-19 22:59:32 +00:00
file.write(table)
2024-08-22 15:37:56 +00:00
def statistics_document(
pdf_folder: str,
doc_mapping_file_path: str,
sheet_name: str = "all_data",
output_folder: str = "/data/emea_ar/basic_information/English/",
output_file: str = "doc_mapping_statistics_data.xlsx"
2024-08-22 15:37:56 +00:00
):
if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder):
logger.error(f"Invalid pdf_folder: {pdf_folder}")
return
if (
doc_mapping_file_path is None
or len(doc_mapping_file_path) == 0
or not os.path.exists(doc_mapping_file_path)
):
logger.error(f"Invalid doc_mapping_file_path: {doc_mapping_file_path}")
return
if output_folder is None or len(output_folder) == 0:
logger.error(f"Invalid output_folder: {output_folder}")
return
os.makedirs(output_folder, exist_ok=True)
describe_stat_df_list = []
# statistics document mapping information
doc_mapping_data = pd.read_excel(doc_mapping_file_path, sheet_name=sheet_name)
2024-08-22 15:37:56 +00:00
# statistics doc_mapping_data for counting FundId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundId count based on DocumentId"
)
doc_fund_id_df = doc_mapping_data[["DocumentId", "FundId"]].drop_duplicates()
doc_fund_count = (
doc_fund_id_df.groupby("DocumentId").size().reset_index(name="fund_count")
)
# order by fund_count in descending order
doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=False)
# statistics fund_count in doc_fund_count by describe and transform to DataFrame
doc_fund_count_stat_df = get_describe_stat(
doc_fund_count, "fund_count", "doc_fund_count"
)
describe_stat_df_list.append(doc_fund_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on DocumentId"
)
doc_share_class_id_df = doc_mapping_data[
["DocumentId", "FundClassId"]
].drop_duplicates()
doc_share_class_count = (
doc_share_class_id_df.groupby("DocumentId")
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
doc_share_class_count = doc_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in doc_share_class_count by describe and transform to DataFrame
doc_share_class_count_stat_df = get_describe_stat(
doc_share_class_count, "share_class_count", "doc_share_class_count"
)
describe_stat_df_list.append(doc_share_class_count_stat_df)
# statistics doc_mapping_data for counting FundId count based on CompanyId and CompanyName
2024-08-22 15:37:56 +00:00
logger.info(
"statistics doc_mapping_data for counting FundId count based on CompanyId and CompanyName"
2024-08-22 15:37:56 +00:00
)
provider_fund_id_df = doc_mapping_data[
["CompanyId", "CompanyName", "FundId"]
2024-08-22 15:37:56 +00:00
].drop_duplicates()
provider_fund_count = (
provider_fund_id_df.groupby(["CompanyId", "CompanyName"])
2024-08-22 15:37:56 +00:00
.size()
.reset_index(name="fund_count")
)
# order by fund_count in descending order
provider_fund_count = provider_fund_count.sort_values(
by="fund_count", ascending=False
)
# statistics fund_count in provider_fund_count by describe and transform to DataFrame
provider_fund_count_stat_df = get_describe_stat(
provider_fund_count, "fund_count", "provider_fund_count"
)
describe_stat_df_list.append(provider_fund_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on CompanyId
2024-08-22 15:37:56 +00:00
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on CompanyId"
2024-08-22 15:37:56 +00:00
)
provider_share_class_id_df = doc_mapping_data[
["CompanyId", "CompanyName", "FundClassId"]
2024-08-22 15:37:56 +00:00
].drop_duplicates()
provider_share_class_count = (
provider_share_class_id_df.groupby(["CompanyId", "CompanyName"])
2024-08-22 15:37:56 +00:00
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
provider_share_class_count = provider_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in provider_share_class_count by describe and transform to DataFrame
provider_share_class_count_stat_df = get_describe_stat(
provider_share_class_count, "share_class_count", "provider_share_class_count"
)
describe_stat_df_list.append(provider_share_class_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName"
)
fund_share_class_id_df = doc_mapping_data[
["FundId", "FundLegalName", "FundClassId"]
].drop_duplicates()
fund_share_class_count = (
fund_share_class_id_df.groupby(["FundId", "FundLegalName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in fund_share_class_count
fund_share_class_count = fund_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in fund_share_class_count by describe and transform to DataFrame
fund_share_class_count_stat_df = get_describe_stat(
fund_share_class_count, "share_class_count", "fund_share_class_count"
)
describe_stat_df_list.append(fund_share_class_count_stat_df)
stat_file = os.path.join(output_folder, output_file)
doc_id_list = [str(docid) for docid in doc_mapping_data["DocumentId"].unique().tolist()]
2024-08-22 15:37:56 +00:00
# statistics document page number
pdf_files = glob(os.path.join(pdf_folder, "*.pdf"))
logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}")
logger.info("statistics document page number")
doc_page_num_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file).replace(".pdf", "")
if pdf_base_name not in doc_id_list:
continue
2024-08-22 15:37:56 +00:00
docid = os.path.basename(pdf_file).split(".")[0]
doc = fitz.open(pdf_file)
page_num = doc.page_count
doc_page_num_list.append({"docid": docid, "page_num": page_num})
doc.close()
doc_page_num_df = pd.DataFrame(doc_page_num_list)
# order by page_num in descending order
doc_page_num_df = doc_page_num_df.sort_values(by="page_num", ascending=False)
# statistics page_num by describe and transform to DataFrame
doc_page_num_stat_df = get_describe_stat(
doc_page_num_df, "page_num", "doc_page_num"
)
describe_stat_df_list.append(doc_page_num_stat_df)
describe_stat_df = pd.concat(describe_stat_df_list)
describe_stat_df.reset_index(drop=True, inplace=True)
# save statistics data to excel
with pd.ExcelWriter(stat_file) as writer:
doc_page_num_df.to_excel(writer, sheet_name="doc_page_num", index=False)
doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False)
doc_share_class_count.to_excel(
writer, sheet_name="doc_share_class_count", index=False
)
provider_fund_count.to_excel(
writer, sheet_name="provider_fund_count", index=False
)
provider_share_class_count.to_excel(
writer, sheet_name="provider_share_class_count", index=False
)
fund_share_class_count.to_excel(
writer, sheet_name="fund_share_class_count", index=False
)
describe_stat_df.to_excel(
writer, sheet_name="all_describe_statistics", index=False
)
def statistics_provider_mapping(provider_mapping_data_file: str, output_folder: str):
if (
provider_mapping_data_file is None
or len(provider_mapping_data_file) == 0
or not os.path.exists(provider_mapping_data_file)
):
logger.error(
f"Invalid provider_mapping_data_file: {provider_mapping_data_file}"
)
return
provider_mapping_data = pd.read_excel(provider_mapping_data_file)
describe_stat_df_list = []
# statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName
logger.info(
"statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName"
)
provider_fund_id_df = provider_mapping_data[
["CompanyId", "CompanyName", "FundId"]
].drop_duplicates()
provider_fund_count = (
provider_fund_id_df.groupby(["CompanyId", "CompanyName"])
.size()
.reset_index(name="fund_count")
)
# order by fund_count in descending order
provider_fund_count = provider_fund_count.sort_values(
by="fund_count", ascending=False
)
# statistics fund_count in provider_fund_count by describe and transform to DataFrame
provider_fund_count_stat_df = get_describe_stat(
provider_fund_count, "fund_count", "provider_fund_count"
)
describe_stat_df_list.append(provider_fund_count_stat_df)
# Get the fund_count sum of all companies
all_companies_fund_count_sum = provider_fund_count["fund_count"].sum()
top_n_company_fund_count_list = []
# Get the fund_count sum of top 5 companies
top_5_companies_fund_count, top_5_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 5, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 5,
"fund_count": top_5_companies_fund_count,
"percent": top_5_companies_fund_count_percent,
}
)
logger.info(f"Top 5 companies fund count sum: {top_5_companies_fund_count}")
# Get the fund_count sum of top 10 companies
top_10_companies_fund_count, top_10_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 10, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 10,
"fund_count": top_10_companies_fund_count,
"percent": top_10_companies_fund_count_percent,
}
)
logger.info(f"Top 10 companies fund count sum: {top_10_companies_fund_count}")
# Get the fund_count sum of top 50 companies
top_50_companies_fund_count, top_50_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 50, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 50,
"fund_count": top_50_companies_fund_count,
"percent": top_50_companies_fund_count_percent,
}
)
logger.info(f"Top 50 companies fund count sum: {top_50_companies_fund_count}")
# Get the fund_count sum of top 100 companies
top_100_companies_fund_count, top_100_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 100, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 100,
"fund_count": top_100_companies_fund_count,
"percent": top_100_companies_fund_count_percent,
}
)
top_n_company_fund_count_list.append(
{
"top_n_providers": len(provider_fund_count),
"fund_count": all_companies_fund_count_sum,
"percent": 100,
}
)
logger.info(f"Top 100 companies fund count sum: {top_100_companies_fund_count}")
top_n_company_fund_count_df = pd.DataFrame(top_n_company_fund_count_list)
# statistics provider_mapping_data for counting FundClassId count based on CompanyId and CompanyName
logger.info(
"statistics provider_mapping_data for counting SecId count based on CompanyId and CompanyName"
)
provider_share_class_id_df = provider_mapping_data[
["CompanyId", "CompanyName", "SecId"]
].drop_duplicates()
provider_share_class_count = (
provider_share_class_id_df.groupby(["CompanyId", "CompanyName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
provider_share_class_count = provider_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in provider_share_class_count by describe and transform to DataFrame
provider_share_class_count_stat_df = get_describe_stat(
provider_share_class_count, "share_class_count", "provider_share_class_count"
)
describe_stat_df_list.append(provider_share_class_count_stat_df)
# Get the fund_count sum of all companies
all_companies_share_class_count_sum = provider_share_class_count[
"share_class_count"
].sum()
top_n_company_share_class_count_list = []
# Get the fund_count sum of top 5 companies
top_5_companies_share_class_count, top_5_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
5,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 5,
"share_class_count": top_5_companies_share_class_count,
"percent": top_5_companies_share_class_count_percent,
}
)
logger.info(
f"Top 5 companies share class count sum: {top_5_companies_share_class_count}"
)
# Get the fund_count sum of top 10 companies
top_10_companies_share_class_count, top_10_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
10,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 10,
"share_class_count": top_10_companies_share_class_count,
"percent": top_10_companies_share_class_count_percent,
}
)
logger.info(
f"Top 10 companies share class count sum: {top_10_companies_share_class_count}"
)
# Get the fund_count sum of top 50 companies
top_50_companies_share_class_count, top_50_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
50,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 50,
"share_class_count": top_50_companies_share_class_count,
"percent": top_50_companies_share_class_count_percent,
}
)
logger.info(
f"Top 50 companies share class count sum: {top_50_companies_share_class_count}"
)
# Get the fund_count sum of top 100 companies
top_100_companies_share_class_count, top_100_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
100,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 100,
"share_class_count": top_100_companies_share_class_count,
"percent": top_100_companies_share_class_count_percent,
}
)
logger.info(
f"Top 100 companies share class count sum: {top_100_companies_share_class_count}"
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": len(provider_share_class_count),
"share_class_count": all_companies_share_class_count_sum,
"percent": 100,
}
)
top_n_company_share_class_count_df = pd.DataFrame(
top_n_company_share_class_count_list
)
# statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName
logger.info(
"statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName"
)
fund_share_class_id_df = provider_mapping_data[
["FundId", "FundLegalName", "SecId"]
].drop_duplicates()
fund_share_class_count = (
fund_share_class_id_df.groupby(["FundId", "FundLegalName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in fund_share_class_count
fund_share_class_count = fund_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in fund_share_class_count by describe and transform to DataFrame
fund_share_class_count_stat_df = get_describe_stat(
fund_share_class_count, "share_class_count", "fund_share_class_count"
)
describe_stat_df_list.append(fund_share_class_count_stat_df)
describe_stat_df = pd.concat(describe_stat_df_list)
describe_stat_df.reset_index(drop=True, inplace=True)
stat_file = os.path.join(output_folder, "provider_mapping_data_statistics.xlsx")
# save statistics data to excel
with pd.ExcelWriter(stat_file) as writer:
top_n_company_fund_count_df.to_excel(
writer, sheet_name="top_n_provider_fund_count", index=False
)
top_n_company_share_class_count_df.to_excel(
writer, sheet_name="top_n_provider_share_count", index=False
)
provider_fund_count.to_excel(
writer, sheet_name="provider_fund_count", index=False
)
provider_share_class_count.to_excel(
writer, sheet_name="provider_share_count", index=False
)
fund_share_class_count.to_excel(
writer, sheet_name="fund_share_count", index=False
)
describe_stat_df.to_excel(
writer, sheet_name="all_describe_statistics", index=False
)
def statistics_document_fund_share_count(provider_mapping_data_file: str):
if (
provider_mapping_data_file is None
or len(provider_mapping_data_file) == 0
or not os.path.exists(provider_mapping_data_file)
):
logger.error(f"Invalid file_path: {provider_mapping_data_file}")
return
describe_stat_df_list = []
# statistics document mapping information
doc_mapping_data = pd.read_excel(provider_mapping_data_file, sheet_name="all_data")
# set noTor column value to 0 if column tor value is not nan, set 1 otherwise
doc_mapping_data["noTor"] = doc_mapping_data["tor"].apply(
lambda x: 0 if pd.notna(x) else 1
)
# set share_noTer column value to 0 if column share_ter value is not nan, set 1 otherwise
doc_mapping_data["share_noTer"] = doc_mapping_data["share_ter"].apply(
lambda x: 0 if pd.notna(x) else 1
)
# set share_noOgc column value to 0 if column share_ter value is not nan, set 1 otherwise
doc_mapping_data["share_noOgc"] = doc_mapping_data["share_ogc"].apply(
lambda x: 0 if pd.notna(x) else 1
)
# set share_noPerfFee column value to 0 if column share_ter value is not nan, set 1 otherwise
doc_mapping_data["share_noPerfFee"] = doc_mapping_data["share_perfFee"].apply(
lambda x: 0 if pd.notna(x) else 1
)
2024-08-22 15:37:56 +00:00
# statistics doc_mapping_data for counting FundId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundId count based on DocumentId"
)
doc_fund_id_df = doc_mapping_data[["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundId"]].drop_duplicates()
2024-08-22 15:37:56 +00:00
doc_fund_count = (
doc_fund_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"]).size().reset_index(name="fund_count")
2024-08-22 15:37:56 +00:00
)
# order by fund_count in descending order
doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=True)
# set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0
doc_fund_count["with_ar_data"] = False
for index, row in doc_fund_count.iterrows():
document_id = row["DocumentId"]
ar_data = doc_mapping_data[
(doc_mapping_data["DocumentId"] == document_id)
& (
(
(doc_mapping_data["noTor"] == 0)
| (doc_mapping_data["share_noTer"] == 0)
| (doc_mapping_data["share_noOgc"] == 0)
| (doc_mapping_data["share_noPerfFee"] == 0)
)
)
]
if len(ar_data) > 0:
doc_fund_count.loc[index, "with_ar_data"] = True
2024-08-22 15:37:56 +00:00
# statistics fund_count in doc_fund_count by describe and transform to DataFrame
doc_fund_count_stat_df = get_describe_stat(
doc_fund_count, "fund_count", "doc_fund_count"
)
describe_stat_df_list.append(doc_fund_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on DocumentId"
)
doc_share_class_id_df = doc_mapping_data[
["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundClassId"]
2024-08-22 15:37:56 +00:00
].drop_duplicates()
doc_share_class_count = (
doc_share_class_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"])
2024-08-22 15:37:56 +00:00
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
doc_share_class_count = doc_share_class_count.sort_values(
by="share_class_count", ascending=True
)
# set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0
doc_share_class_count["with_ar_data"] = False
for index, row in doc_share_class_count.iterrows():
document_id = row["DocumentId"]
ar_data = doc_mapping_data[
(doc_mapping_data["DocumentId"] == document_id)
& (
(
(doc_mapping_data["noTor"] == 0)
| (doc_mapping_data["share_noTer"] == 0)
| (doc_mapping_data["share_noOgc"] == 0)
| (doc_mapping_data["share_noPerfFee"] == 0)
)
)
]
if len(ar_data) > 0:
doc_share_class_count.loc[index, "with_ar_data"] = True
2024-08-22 15:37:56 +00:00
# statistics share_class_count in doc_share_class_count by describe and transform to DataFrame
doc_share_class_count_stat_df = get_describe_stat(
doc_share_class_count, "share_class_count", "doc_share_class_count"
)
describe_stat_df_list.append(doc_share_class_count_stat_df)
describe_stat_df = pd.concat(describe_stat_df_list)
describe_stat_df.reset_index(drop=True, inplace=True)
with pd.ExcelWriter(provider_mapping_data_file) as writer:
doc_mapping_data.to_excel(writer, sheet_name="all_data", index=False)
doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False)
doc_share_class_count.to_excel(writer, sheet_name="doc_share_class_count", index=False)
describe_stat_df.to_excel(writer, sheet_name="all_describe_statistics", index=False)
def get_top_n_records_count(
df: pd.DataFrame, column_name: str, n: int, total_count: int
):
top_n_records = df.head(n)
top_n_records_count = top_n_records[column_name].sum()
top_n_records_count_percent = round((top_n_records_count / total_count) * 100, 2)
return top_n_records_count, top_n_records_count_percent
def get_describe_stat(df: pd.DataFrame, column_name: str, stat_type_name: str):
stat_df = df[column_name].describe().reset_index().T
stat_df.columns = ["count", "mean", "std", "min", "25%", "50%", "75%", "max"]
stat_df.reset_index(inplace=True)
stat_df.rename(columns={"index": "Stat"}, inplace=True)
# remove the first row
stat_df = stat_df[1:]
if stat_type_name is not None:
stat_df["Stat_Type"] = stat_type_name
stat_df = stat_df[
[
"Stat_Type",
"count",
"mean",
"std",
"min",
"25%",
"50%",
"75%",
"max",
]
]
return stat_df
def pickup_document_from_top_100_providers():
"""
Pickup 100 documents from top 100 providers.
The documents are with less 10 share classes.
The purpose is to analyze the document structure and content from small documents.
"""
provider_mapping_data_file = (
r"/data/emea_ar/basic_information/English/provider_mapping_data_statistics.xlsx"
)
top_100_provider_document_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx"
)
provider_share_count = pd.read_excel(
provider_mapping_data_file, sheet_name="provider_share_count"
)
# add a new column with name share_count_rank to provider_share_count
provider_share_count["share_count_rank"] = provider_share_count[
"share_class_count"
].rank(method="min", ascending=False)
top_100_provider_document_all_data = pd.read_excel(
top_100_provider_document_file, sheet_name="all_data"
)
top_100_provider_document_fund_count = pd.read_excel(
top_100_provider_document_file, sheet_name="doc_fund_count"
)
top_100_provider_document_fund_count.reset_index(drop=True, inplace=True)
top_100_provider_document_share_count = pd.read_excel(
top_100_provider_document_file, sheet_name="doc_share_class_count"
)
top_100_provider_document_share_count = \
top_100_provider_document_share_count[top_100_provider_document_share_count["with_ar_data"] == True]
top_100_provider_document_share_count.reset_index(drop=True, inplace=True)
top_100_provider_document_share_count = pd.merge(
top_100_provider_document_share_count,
top_100_provider_document_fund_count,
on=["DocumentId"],
how="left",
)
top_100_provider_document_share_count = top_100_provider_document_share_count[
["DocumentId", "CompanyId_x", "CompanyName_x", "fund_count", "share_class_count"]
]
top_100_provider_document_share_count.rename(
columns={"CompanyId_x": "CompanyId"}, inplace=True
)
# add a new column with name share_count_rank to top_100_provider_document_share_count by merge with provider_share_count
top_100_provider_document_share_count = pd.merge(
top_100_provider_document_share_count,
provider_share_count,
on=["CompanyId"],
how="left",
)
# Keep columns: DocumentId, CompanyId, CompanyName, share_class_count_x, share_count_rank
top_100_provider_document_share_count = top_100_provider_document_share_count[
["DocumentId", "CompanyId", "CompanyName", "fund_count", "share_class_count_x", "share_count_rank"]
]
# rename column share_class_count_x to share_class_count
top_100_provider_document_share_count.rename(
columns={"share_class_count_x": "share_class_count",
"share_count_rank": "provider_share_count_rank"}, inplace=True
)
top_100_provider_document_share_count = top_100_provider_document_share_count.sort_values(
by=["provider_share_count_rank", "share_class_count"], ascending=True
)
# According to share_count_rank, from 1 to 10,
# random pickup one documents with 1 to 10 share classes for each rank
data_filter = top_100_provider_document_share_count[
(top_100_provider_document_share_count["share_class_count"] <= 10)
& (top_100_provider_document_share_count["share_class_count"] >= 1)
]
data_filter = data_filter.sort_values(
by=["provider_share_count_rank", "share_class_count"], ascending=[True, True]
)
unique_rank_list = top_100_provider_document_share_count["provider_share_count_rank"].unique().tolist()
random_pickup_document_data_list = []
for rank in unique_rank_list:
data_filter_rank = data_filter[data_filter["provider_share_count_rank"] == rank]
if len(data_filter_rank) == 0:
# get the first document with rank from top_100_provider_document_share_count
data_filter_rank = top_100_provider_document_share_count[
top_100_provider_document_share_count["provider_share_count_rank"] == rank
].head(1)
data_filter_rank = data_filter_rank.sample(n=1, random_state=88)
random_pickup_document_data_list.append(data_filter_rank)
random_pickup_document_data = pd.concat(random_pickup_document_data_list)
# sort by share_count_rank in ascending order
random_pickup_document_data = random_pickup_document_data.sort_values(
by="provider_share_count_rank", ascending=True
)
random_pickup_document_data.reset_index(drop=True, inplace=True)
random_pickup_document_mini_data = random_pickup_document_data[
["DocumentId", "provider_share_count_rank"]
]
# get all data from top_100_provider_document_all_data by merge with random_pickup_document_mini_data
random_pickup_document_all_data = pd.merge(
random_pickup_document_mini_data,
top_100_provider_document_all_data,
on=["DocumentId"],
how="left",
)
# sort random_pickup_document_all_data by provider_share_count_rank, FundLegalName, FundClassLegalName in ascending order
random_pickup_document_all_data = random_pickup_document_all_data.sort_values(
by=["provider_share_count_rank", "FundLegalName", "FundClassLegalName"], ascending=True
)
random_small_document_data_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx"
)
with pd.ExcelWriter(random_small_document_data_file) as writer:
top_100_provider_document_share_count.to_excel(
writer, sheet_name="all_doc_with_ar_data", index=False
)
random_pickup_document_data.to_excel(
writer, sheet_name="random_small_document", index=False
)
random_pickup_document_all_data.to_excel(
writer, sheet_name="random_small_document_all_data", index=False
)
def compare_records_count_by_document_id():
data_from_document = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
sheet_name = "mapping_data"
data_from_document_df = pd.read_excel(data_from_document, sheet_name=sheet_name)
data_from_document_df.rename(
columns={"doc_id": "DocumentId"}, inplace=True
)
# get the count of records by DocumentId
document_records_count = data_from_document_df.groupby("DocumentId").size().reset_index(name="records_count")
2024-09-26 17:18:37 +00:00
data_from_database = r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document_from_DocumentAcquisition.xlsx"
sheet_name = "random_small_document_all_data"
data_from_database_df = pd.read_excel(data_from_database, sheet_name=sheet_name)
database_records_count = data_from_database_df.groupby("DocumentId").size().reset_index(name="records_count")
# merge document_records_count with database_records_count
records_count_compare = pd.merge(
document_records_count,
database_records_count,
on=["DocumentId"],
how="left",
)
records_count_compare["records_count_diff"] = records_count_compare["records_count_x"] - records_count_compare["records_count_y"]
records_count_compare = records_count_compare.sort_values(by="records_count_diff", ascending=False)
# rename records_count_x to records_count_document, records_count_y to records_count_database
records_count_compare.rename(
columns={"records_count_x": "records_count_document",
"records_count_y": "records_count_database"}, inplace=True
)
records_count_compare.reset_index(drop=True, inplace=True)
records_count_compare_file = (
2024-09-26 17:18:37 +00:00
r"/data/emea_ar/basic_information/English/records_count_compare_between_document_database_from_DocumentAcquisition.xlsx"
)
with pd.ExcelWriter(records_count_compare_file) as writer:
records_count_compare.to_excel(
writer, sheet_name="records_count_compare", index=False
)
def get_document_extracted_share_diff_by_db():
db_data_file = r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document_from_DocumentAcquisition.xlsx"
extract_data_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
doc_mapping_folder = r"/data/emea_ar/output/mapping/document/"
db_data = pd.read_excel(db_data_file, sheet_name="Sheet1")
extract_data = pd.read_excel(extract_data_file, sheet_name="mapping_data")
# only get data which investment_type is 1
2024-09-26 17:18:37 +00:00
# extract_data = extract_data[extract_data["investment_type"] == 1]
extract_data.reset_index(drop=True, inplace=True)
unique_doc_id = extract_data["doc_id"].unique().tolist()
status_info = {
1: "WIP",
5: "Junked",
3: "AutoSignoff",
2: "Signoffed",
10: "Complete",
20: "AutoDetect",
21: "Checkduplicate",
22: "Mapping",
33: "Not Matched",
99: "Unknown",
}
document_extract_db_compare = []
for doc_id in unique_doc_id:
doc_mapping_file = os.path.join(doc_mapping_folder, f"{doc_id}.xlsx")
if not os.path.exists(doc_mapping_file):
logger.error(f"Invalid mapping_file: {doc_mapping_file}")
doc_mapping_share_class_id_df = pd.DataFrame()
else:
doc_mapping_data = pd.read_excel(doc_mapping_file)
doc_mapping_share_class_id_df = doc_mapping_data[["SecId"]].drop_duplicates()
ar_db_data_doc = db_data[db_data["DocumentId"] == doc_id]
try:
masterProcess_status = ar_db_data_doc["MasterProcess_Status"].values[0]
except Exception as e:
logger.error(f"Error: {e}")
masterProcess_status = 99
masterProcess_status = int(masterProcess_status)
masterProcess_status_defination = status_info.get(masterProcess_status, "Unknown")
# get data from ar_db_data_doc which noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0
ar_db_data_doc = ar_db_data_doc[
(ar_db_data_doc["noTor"] == 0)
| (ar_db_data_doc["share_noTer"] == 0)
| (ar_db_data_doc["share_noOgc"] == 0)
| (ar_db_data_doc["share_noPerfFee"] == 0)
]
extract_data_doc = extract_data[extract_data["doc_id"] == doc_id]
# unique raw_name in extract_data_doc
unique_raw_name = extract_data_doc["raw_name"].unique().tolist()
doc_mapping_share_class_count = len(doc_mapping_share_class_id_df)
extract_share_class_count = len(unique_raw_name)
extract_vs_doc_share_count_diff = extract_share_class_count - doc_mapping_share_class_count
db_share_class_count = len(ar_db_data_doc)
extract_vs_ar_db_share_count_diff = extract_share_class_count - db_share_class_count
document_extract_db_compare.append({
"DocumentId": doc_id,
"status": masterProcess_status,
"status_defination": masterProcess_status_defination,
"extract_share_count": extract_share_class_count,
"doc_share_count": doc_mapping_share_class_count,
"extract_vs_doc_share_count_diff": extract_vs_doc_share_count_diff,
"ar_db_share_count": db_share_class_count,
"extract_vs_ar_db_share_count_diff": extract_vs_ar_db_share_count_diff,
})
document_extract_db_compare_df = pd.DataFrame(document_extract_db_compare)
# output to excel
document_extract_db_compare_file = (
r"/data/emea_ar/basic_information/English/document_extract_db_compare.xlsx"
)
with pd.ExcelWriter(document_extract_db_compare_file) as writer:
document_extract_db_compare_df.to_excel(
writer, sheet_name="document_extract_db_compare", index=False
)
def concat_mapping(mapping_folder: str,
output_file: str):
excel_files = glob(os.path.join(mapping_folder, "*.xlsx"))
logger.info(f"Total {len(excel_files)} excel files found in {mapping_folder}")
all_data_list = []
for excel_file in excel_files:
doc_mapping_data = pd.read_excel(excel_file)
all_data_list.append(doc_mapping_data)
all_data = pd.concat(all_data_list)
all_data.reset_index(drop=True, inplace=True)
with open(output_file, "wb") as f:
all_data.to_excel(f, index=False)
2024-08-22 15:37:56 +00:00
if __name__ == "__main__":
doc_provider_file_path = (
r"/data/emea_ar/basic_information/English/latest_provider_ar_document.xlsx"
)
doc_mapping_file_path = r"/data/emea_ar/basic_information/English/latest_provider_ar_document_mapping.xlsx"
provider_mapping_data_file = (
r"/data/emea_ar/basic_information/English/provider_mapping_data.xlsx"
)
doc_mapping_from_top_100_provider_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx"
)
basic_info_folder = r"/data/emea_ar/basic_information/English/"
pdf_folder = r"/data/emea_ar/pdf/"
output_folder = r"/data/emea_ar/output/"
# get_unique_docids_from_doc_provider_data(doc_provider_file_path)
# download_pdf(doc_provider_file_path, 'doc_provider_count', pdf_folder)
# pdf_folder = r"/data/emea_ar/small_pdf/"
output_folder = r"/data/emea_ar/small_pdf_txt/"
random_small_document_data_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx"
)
2024-10-28 20:15:55 +00:00
doc_provider_file_path = r"/data/emea_ar/basic_information/English/sample_doc/emea_final_typical_case/Final list of EMEA documents.xlsx"
2024-10-08 22:16:01 +00:00
pdf_folder = r"/data/emea_ar/pdf/"
download_pdf(
doc_provider_file_path=doc_provider_file_path,
2024-10-28 20:15:55 +00:00
sheet_name="Sheet1",
doc_id_column="Document Id",
2024-10-08 22:16:01 +00:00
pdf_path=pdf_folder)
# output_pdf_page_text(pdf_folder, output_folder)
2024-08-22 15:37:56 +00:00
# extract_pdf_table(pdf_folder, output_folder)
# analyze_json_error()
latest_top_100_provider_ar_data_file = r"/data/emea_ar/basic_information/English/top_100_provider_latest_document_most_mapping/lux_english_ar_from_top_100_provider_latest_document_with_most_mappings.xlsx"
# download_pdf(latest_top_100_provider_ar_data_file,
# 'latest_ar_document_most_mapping',
# pdf_folder)
2024-10-28 20:15:55 +00:00
doc_mapping_file_path = r"/data/emea_ar/basic_information/English/sample_doc/emea_final_typical_case/doc_ar_data_for_final_list_emea_documents.xlsx"
output_data_folder = r"/data/emea_ar/basic_information/English/sample_doc/emea_final_typical_case/"
# statistics_document(pdf_folder=pdf_folder,
2024-10-28 20:15:55 +00:00
# doc_mapping_file_path=doc_mapping_file_path,
# sheet_name="doc_ar_data_in_db",
# output_folder=output_data_folder,
2024-10-28 20:15:55 +00:00
# output_file="doc_ar_data_statistics.xlsx")
2024-10-08 22:16:01 +00:00
# get_document_extracted_share_diff_by_db()
2024-08-22 15:37:56 +00:00
# statistics_provider_mapping(
# provider_mapping_data_file=provider_mapping_data_file,
# output_folder=basic_info_folder,
# )
# statistics_document_fund_share_count(doc_mapping_from_top_100_provider_file)
# pickup_document_from_top_100_providers()
# compare_records_count_by_document_id()
2024-09-26 17:18:37 +00:00
# document_mapping_folder = r"/data/emea_ar/output/mapping/document/"
# all_data_file = r"/data/emea_ar/output/mapping/all_document_mapping.xlsx"
# concat_mapping(document_mapping_folder, all_data_file)
2024-09-26 17:18:37 +00:00
# provider_mapping_folder = r"/data/emea_ar/output/mapping/provider/"
# all_data_file = r"/data/emea_ar/output/mapping/all_provider_mapping.xlsx"
# concat_mapping(provider_mapping_folder, all_data_file)