dc-ml-emea-ar/prepare_data.py

1479 lines
66 KiB
Python

import pandas as pd
import os
from tqdm import tqdm
import json
from glob import glob
import fitz
import re
import time
import traceback
import json_repair
from utils.logger import logger
from utils.pdf_download import download_pdf_from_documents_warehouse
from utils.pdf_util import PDFUtil
from pdf_table_extraction import PDFTableExtraction
def get_unique_docids_from_doc_provider_data(doc_provider_file_path: str):
doc_provider_data = pd.read_excel(doc_provider_file_path)
# get new data by grouping by docid, and count the number of rows for each docid,
# set the new data with 2 columns: docid and provider_count
doc_provider_count = (
doc_provider_data.groupby("DocumentId")
.size()
.reset_index(name="provider_count")
)
# sort new data by provider_count in descending order
doc_provider_count = doc_provider_count.sort_values(
by="provider_count", ascending=False
)
# save excel by doc_provider_data and new_data
with pd.ExcelWriter(doc_provider_file_path) as writer:
doc_provider_data.to_excel(
writer, sheet_name="doc_provider_details", index=False
)
doc_provider_count.to_excel(
writer, sheet_name="doc_provider_count", index=False
)
def download_pdf(doc_provider_file_path: str,
sheet_name: str,
pdf_path: str,
doc_id_column: str = "DocumentId"):
document_data = pd.read_excel(doc_provider_file_path, sheet_name=sheet_name)
# get all unique docids as list
doc_id_list = [
str(doc_id) for doc_id in document_data[doc_id_column].unique().tolist()
]
# download pdfs
logger.info(f"Start downloading {len(doc_id_list)} pdfs")
os.makedirs(pdf_path, exist_ok=True)
for doc_id in tqdm(doc_id_list):
logger.info(f"Downloading pdf for docid: {doc_id}")
download_pdf_from_documents_warehouse(pdf_directory=pdf_path, doc_id=doc_id)
time.sleep(1)
def output_pdf_page_text(pdf_folder: str, output_folder: str):
if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder):
logger.error(f"Invalid pdf_folder: {pdf_folder}")
return
if output_folder is None or len(output_folder) == 0:
logger.error(f"Invalid output_folder: {output_folder}")
return
os.makedirs(output_folder, exist_ok=True)
pdf_files = glob(os.path.join(pdf_folder, "*.pdf"))
logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}")
for pdf_file in pdf_files:
logger.info(f"Start processing {pdf_file}")
pdf_util = PDFUtil(pdf_file)
success, text, page_text_dict = pdf_util.extract_text(
output_folder=output_folder
)
if success:
logger.info(f"Successfully extracted text from {pdf_file}")
def extract_pdf_table(pdf_folder: str, output_folder: str):
if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder):
logger.error(f"Invalid pdf_folder: {pdf_folder}")
return
if output_folder is None or len(output_folder) == 0:
logger.error(f"Invalid output_folder: {output_folder}")
return
os.makedirs(output_folder, exist_ok=True)
pdf_files = glob(os.path.join(pdf_folder, "*.pdf"))
logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}")
for pdf_file in pdf_files:
logger.info(f"Start processing {pdf_file}")
pdf_table_extraction = PDFTableExtraction(pdf_file, output_folder)
pdf_table_extraction.extract_tables()
def analyze_json_error():
text_file = r"/data/emea_ar/output/pdf_table_prompts/445877368_4.txt"
with open(text_file, "r", encoding="utf-8") as file:
text = file.read()
json_response = re.search(r"\`\`\`json([\s\S]*)\`\`\`", text)
if json_response:
json_text = json_response.group(1)
json_data = {"tables": []}
try:
json_data = json.loads(json_text)
except:
json_data = json_repair.loads(json_text)
table_list = json_data.get("tables", [])
for table_num, table in enumerate(table_list):
table_md_file = os.path.join("/temp/", f"temp_{table_num}.md")
table = re.sub(r"(\n)+", "\n", table)
with open(table_md_file, "w", encoding="utf-8") as file:
file.write(table)
def statistics_document(
pdf_folder: str,
doc_mapping_file_path: str,
doc_ar_data_file_path: str,
mapping_sheet_name: str = "Sheet1",
ar_data_sheet_name: str = "doc_ar_data_in_db",
output_folder: str = "/data/emea_ar/basic_information/English/",
output_file: str = "doc_mapping_statistics_data.xlsx"
):
if pdf_folder is None or len(pdf_folder) == 0 or not os.path.exists(pdf_folder):
logger.error(f"Invalid pdf_folder: {pdf_folder}")
return
if (
doc_mapping_file_path is None
or len(doc_mapping_file_path) == 0
or not os.path.exists(doc_mapping_file_path)
):
logger.error(f"Invalid doc_mapping_file_path: {doc_mapping_file_path}")
return
if output_folder is None or len(output_folder) == 0:
logger.error(f"Invalid output_folder: {output_folder}")
return
os.makedirs(output_folder, exist_ok=True)
describe_stat_df_list = []
# statistics document mapping information
doc_mapping_data = pd.read_excel(doc_mapping_file_path, sheet_name=mapping_sheet_name)
# statistics doc_mapping_data for counting FundId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundId count based on DocumentId"
)
doc_fund_id_df = doc_mapping_data[["DocumentId", "FundId"]].drop_duplicates()
doc_fund_count = (
doc_fund_id_df.groupby("DocumentId").size().reset_index(name="fund_count")
)
# order by fund_count in descending order
doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=False)
# statistics fund_count in doc_fund_count by describe and transform to DataFrame
doc_fund_count_stat_df = get_describe_stat(
doc_fund_count, "fund_count", "doc_fund_count"
)
describe_stat_df_list.append(doc_fund_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on DocumentId"
)
doc_share_class_id_df = doc_mapping_data[
["DocumentId", "FundClassId"]
].drop_duplicates()
doc_share_class_count = (
doc_share_class_id_df.groupby("DocumentId")
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
doc_share_class_count = doc_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in doc_share_class_count by describe and transform to DataFrame
doc_share_class_count_stat_df = get_describe_stat(
doc_share_class_count, "share_class_count", "doc_share_class_count"
)
describe_stat_df_list.append(doc_share_class_count_stat_df)
# statistics doc_mapping_data for counting FundId count based on CompanyId and CompanyName
logger.info(
"statistics doc_mapping_data for counting FundId count based on CompanyId and CompanyName"
)
provider_fund_id_df = doc_mapping_data[
["CompanyId", "CompanyName", "FundId"]
].drop_duplicates()
provider_fund_count = (
provider_fund_id_df.groupby(["CompanyId", "CompanyName"])
.size()
.reset_index(name="fund_count")
)
# order by fund_count in descending order
provider_fund_count = provider_fund_count.sort_values(
by="fund_count", ascending=False
)
# statistics fund_count in provider_fund_count by describe and transform to DataFrame
provider_fund_count_stat_df = get_describe_stat(
provider_fund_count, "fund_count", "provider_fund_count"
)
describe_stat_df_list.append(provider_fund_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on CompanyId
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on CompanyId"
)
provider_share_class_id_df = doc_mapping_data[
["CompanyId", "CompanyName", "FundClassId"]
].drop_duplicates()
provider_share_class_count = (
provider_share_class_id_df.groupby(["CompanyId", "CompanyName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
provider_share_class_count = provider_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in provider_share_class_count by describe and transform to DataFrame
provider_share_class_count_stat_df = get_describe_stat(
provider_share_class_count, "share_class_count", "provider_share_class_count"
)
describe_stat_df_list.append(provider_share_class_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on FundId and FundLegalName"
)
fund_share_class_id_df = doc_mapping_data[
["FundId", "FundLegalName", "FundClassId"]
].drop_duplicates()
fund_share_class_count = (
fund_share_class_id_df.groupby(["FundId", "FundLegalName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in fund_share_class_count
fund_share_class_count = fund_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in fund_share_class_count by describe and transform to DataFrame
fund_share_class_count_stat_df = get_describe_stat(
fund_share_class_count, "share_class_count", "fund_share_class_count"
)
describe_stat_df_list.append(fund_share_class_count_stat_df)
stat_file = os.path.join(output_folder, output_file)
doc_id_list = [str(docid) for docid in doc_mapping_data["DocumentId"].unique().tolist()]
# statistics document page number
pdf_files = glob(os.path.join(pdf_folder, "*.pdf"))
logger.info(f"Total {len(pdf_files)} pdf files found in {pdf_folder}")
logger.info("statistics document page number")
doc_page_num_list = []
for pdf_file in tqdm(pdf_files):
pdf_base_name = os.path.basename(pdf_file).replace(".pdf", "")
if pdf_base_name not in doc_id_list:
continue
docid = os.path.basename(pdf_file).split(".")[0]
doc = fitz.open(pdf_file)
page_num = doc.page_count
doc_page_num_list.append({"DocumentId": docid, "page_num": page_num})
doc.close()
doc_page_num_df = pd.DataFrame(doc_page_num_list)
# order by page_num in descending order
doc_page_num_df = doc_page_num_df.sort_values(by="page_num", ascending=False)
# statistics page_num by describe and transform to DataFrame
doc_page_num_stat_df = get_describe_stat(
doc_page_num_df, "page_num", "doc_page_num"
)
describe_stat_df_list.append(doc_page_num_stat_df)
describe_stat_df = pd.concat(describe_stat_df_list)
describe_stat_df.reset_index(drop=True, inplace=True)
doc_dp_data_df = None
if doc_ar_data_file_path is not None and os.path.exists(doc_ar_data_file_path):
doc_ar_data = pd.read_excel(doc_ar_data_file_path, sheet_name=ar_data_sheet_name)
doc_dp_result = get_document_with_all_4_data_points(None, None, doc_ar_data)
doc_dp_data_list = []
for doc_id in doc_id_list:
doc_id = int(doc_id)
doc_dp_data = {"DocumentId": doc_id, "tor": 0, "ter": 0, "ogc": 0, "perf_fee": 0}
if doc_id in doc_dp_result["tor"]:
doc_dp_data["tor"] = 1
if doc_id in doc_dp_result["ter"]:
doc_dp_data["ter"] = 1
if doc_id in doc_dp_result["ogc"]:
doc_dp_data["ogc"] = 1
if doc_id in doc_dp_result["perf_fee"]:
doc_dp_data["perf_fee"] = 1
doc_dp_data_list.append(doc_dp_data)
doc_dp_data_df = pd.DataFrame(doc_dp_data_list)
doc_dp_data_df = doc_dp_data_df.sort_values(by="DocumentId", ascending=True)
doc_dp_data_df.reset_index(drop=True, inplace=True)
# set all of DocumentId in DataFrame objects to be string type
doc_page_num_df["DocumentId"] = doc_page_num_df["DocumentId"].astype(str)
doc_fund_count["DocumentId"] = doc_fund_count["DocumentId"].astype(str)
doc_share_class_count["DocumentId"] = doc_share_class_count["DocumentId"].astype(str)
if doc_dp_data_df is not None:
doc_dp_data_df["DocumentId"] = doc_dp_data_df["DocumentId"].astype(str)
# merge statistics data for doc_page_num_df, doc_dp_data_df, doc_fund_count, doc_share_class_count based on DocumentId
doc_page_num_df = doc_page_num_df.merge(doc_fund_count, on="DocumentId", how="left")
doc_page_num_df = doc_page_num_df.merge(doc_share_class_count, on="DocumentId", how="left")
if doc_dp_data_df is not None:
doc_page_num_df = doc_page_num_df.merge(doc_dp_data_df, on="DocumentId", how="left")
# save statistics data to excel
with pd.ExcelWriter(stat_file) as writer:
doc_page_num_df.to_excel(writer, sheet_name="doc_level_stats", index=False)
# doc_dp_data_df.to_excel(writer, sheet_name="doc_dp_data", index=False)
# doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False)
# doc_share_class_count.to_excel(
# writer, sheet_name="doc_share_class_count", index=False
# )
provider_fund_count.to_excel(
writer, sheet_name="provider_fund_count", index=False
)
provider_share_class_count.to_excel(
writer, sheet_name="provider_share_class_count", index=False
)
fund_share_class_count.to_excel(
writer, sheet_name="fund_share_class_count", index=False
)
describe_stat_df.to_excel(
writer, sheet_name="all_describe_statistics", index=False
)
def get_document_with_all_4_data_points(folder: str, file_name: str, data: pd.DataFrame):
if data is None:
file_path = os.path.join(folder, file_name)
if os.path.exists(file_path):
data = pd.read_excel(file_path, sheet_name="doc_ar_data_in_db")
else:
logger.error(f"Invalid file path: {file_path}")
return
# get document id list which noTor is 0
noTor_0_doc_id_list = data[data["noTor"] == 0]["DocumentId"].unique().tolist()
# get document id list which share_noTer is 0
share_noTer_0_doc_id_list = data[data["share_noTer"] == 0]["DocumentId"].unique().tolist()
# get document id list which share_noOgc is 0
share_noOgc_0_doc_id_list = data[data["share_noOgc"] == 0]["DocumentId"].unique().tolist()
# get document id list which share_noPerfFee is 0
share_noPerfFee_0_doc_id_list = data[data["share_noPerfFee"] == 0]["DocumentId"].unique().tolist()
logger.info(f"noTor_0_doc_id_list: {len(noTor_0_doc_id_list)}")
logger.info(f"share_noTer_0_doc_id_list: {len(share_noTer_0_doc_id_list)}")
logger.info(f"share_noOgc_0_doc_id_list: {len(share_noOgc_0_doc_id_list)}")
logger.info(f"share_noPerfFee_0_doc_id_list: {len(share_noPerfFee_0_doc_id_list)}")
all_4_data_points_doc_id_list = list(set(noTor_0_doc_id_list) & set(share_noTer_0_doc_id_list) & set(share_noOgc_0_doc_id_list) & set(share_noPerfFee_0_doc_id_list))
logger.info(f"all_4_data_points_doc_id_list: {len(all_4_data_points_doc_id_list)}")
result = {"tor": noTor_0_doc_id_list,
"ter": share_noTer_0_doc_id_list,
"ogc": share_noOgc_0_doc_id_list,
"perf_fee": share_noPerfFee_0_doc_id_list}
return result
def statistics_provider_mapping(provider_mapping_data_file: str, output_folder: str):
if (
provider_mapping_data_file is None
or len(provider_mapping_data_file) == 0
or not os.path.exists(provider_mapping_data_file)
):
logger.error(
f"Invalid provider_mapping_data_file: {provider_mapping_data_file}"
)
return
provider_mapping_data = pd.read_excel(provider_mapping_data_file)
describe_stat_df_list = []
# statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName
logger.info(
"statistics provider_mapping_data for counting FundId count based on CompanyId and CompanyName"
)
provider_fund_id_df = provider_mapping_data[
["CompanyId", "CompanyName", "FundId"]
].drop_duplicates()
provider_fund_count = (
provider_fund_id_df.groupby(["CompanyId", "CompanyName"])
.size()
.reset_index(name="fund_count")
)
# order by fund_count in descending order
provider_fund_count = provider_fund_count.sort_values(
by="fund_count", ascending=False
)
# statistics fund_count in provider_fund_count by describe and transform to DataFrame
provider_fund_count_stat_df = get_describe_stat(
provider_fund_count, "fund_count", "provider_fund_count"
)
describe_stat_df_list.append(provider_fund_count_stat_df)
# Get the fund_count sum of all companies
all_companies_fund_count_sum = provider_fund_count["fund_count"].sum()
top_n_company_fund_count_list = []
# Get the fund_count sum of top 5 companies
top_5_companies_fund_count, top_5_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 5, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 5,
"fund_count": top_5_companies_fund_count,
"percent": top_5_companies_fund_count_percent,
}
)
logger.info(f"Top 5 companies fund count sum: {top_5_companies_fund_count}")
# Get the fund_count sum of top 10 companies
top_10_companies_fund_count, top_10_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 10, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 10,
"fund_count": top_10_companies_fund_count,
"percent": top_10_companies_fund_count_percent,
}
)
logger.info(f"Top 10 companies fund count sum: {top_10_companies_fund_count}")
# Get the fund_count sum of top 50 companies
top_50_companies_fund_count, top_50_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 50, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 50,
"fund_count": top_50_companies_fund_count,
"percent": top_50_companies_fund_count_percent,
}
)
logger.info(f"Top 50 companies fund count sum: {top_50_companies_fund_count}")
# Get the fund_count sum of top 100 companies
top_100_companies_fund_count, top_100_companies_fund_count_percent = (
get_top_n_records_count(
provider_fund_count, "fund_count", 100, all_companies_fund_count_sum
)
)
top_n_company_fund_count_list.append(
{
"top_n_providers": 100,
"fund_count": top_100_companies_fund_count,
"percent": top_100_companies_fund_count_percent,
}
)
top_n_company_fund_count_list.append(
{
"top_n_providers": len(provider_fund_count),
"fund_count": all_companies_fund_count_sum,
"percent": 100,
}
)
logger.info(f"Top 100 companies fund count sum: {top_100_companies_fund_count}")
top_n_company_fund_count_df = pd.DataFrame(top_n_company_fund_count_list)
# statistics provider_mapping_data for counting FundClassId count based on CompanyId and CompanyName
logger.info(
"statistics provider_mapping_data for counting SecId count based on CompanyId and CompanyName"
)
provider_share_class_id_df = provider_mapping_data[
["CompanyId", "CompanyName", "SecId"]
].drop_duplicates()
provider_share_class_count = (
provider_share_class_id_df.groupby(["CompanyId", "CompanyName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
provider_share_class_count = provider_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in provider_share_class_count by describe and transform to DataFrame
provider_share_class_count_stat_df = get_describe_stat(
provider_share_class_count, "share_class_count", "provider_share_class_count"
)
describe_stat_df_list.append(provider_share_class_count_stat_df)
# Get the fund_count sum of all companies
all_companies_share_class_count_sum = provider_share_class_count[
"share_class_count"
].sum()
top_n_company_share_class_count_list = []
# Get the fund_count sum of top 5 companies
top_5_companies_share_class_count, top_5_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
5,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 5,
"share_class_count": top_5_companies_share_class_count,
"percent": top_5_companies_share_class_count_percent,
}
)
logger.info(
f"Top 5 companies share class count sum: {top_5_companies_share_class_count}"
)
# Get the fund_count sum of top 10 companies
top_10_companies_share_class_count, top_10_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
10,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 10,
"share_class_count": top_10_companies_share_class_count,
"percent": top_10_companies_share_class_count_percent,
}
)
logger.info(
f"Top 10 companies share class count sum: {top_10_companies_share_class_count}"
)
# Get the fund_count sum of top 50 companies
top_50_companies_share_class_count, top_50_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
50,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 50,
"share_class_count": top_50_companies_share_class_count,
"percent": top_50_companies_share_class_count_percent,
}
)
logger.info(
f"Top 50 companies share class count sum: {top_50_companies_share_class_count}"
)
# Get the fund_count sum of top 100 companies
top_100_companies_share_class_count, top_100_companies_share_class_count_percent = (
get_top_n_records_count(
provider_share_class_count,
"share_class_count",
100,
all_companies_share_class_count_sum,
)
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": 100,
"share_class_count": top_100_companies_share_class_count,
"percent": top_100_companies_share_class_count_percent,
}
)
logger.info(
f"Top 100 companies share class count sum: {top_100_companies_share_class_count}"
)
top_n_company_share_class_count_list.append(
{
"top_n_providers": len(provider_share_class_count),
"share_class_count": all_companies_share_class_count_sum,
"percent": 100,
}
)
top_n_company_share_class_count_df = pd.DataFrame(
top_n_company_share_class_count_list
)
# statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName
logger.info(
"statistics provider_mapping_data for counting SecId count based on FundId and FundLegalName"
)
fund_share_class_id_df = provider_mapping_data[
["FundId", "FundLegalName", "SecId"]
].drop_duplicates()
fund_share_class_count = (
fund_share_class_id_df.groupby(["FundId", "FundLegalName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in fund_share_class_count
fund_share_class_count = fund_share_class_count.sort_values(
by="share_class_count", ascending=False
)
# statistics share_class_count in fund_share_class_count by describe and transform to DataFrame
fund_share_class_count_stat_df = get_describe_stat(
fund_share_class_count, "share_class_count", "fund_share_class_count"
)
describe_stat_df_list.append(fund_share_class_count_stat_df)
describe_stat_df = pd.concat(describe_stat_df_list)
describe_stat_df.reset_index(drop=True, inplace=True)
stat_file = os.path.join(output_folder, "provider_mapping_data_statistics.xlsx")
# save statistics data to excel
with pd.ExcelWriter(stat_file) as writer:
top_n_company_fund_count_df.to_excel(
writer, sheet_name="top_n_provider_fund_count", index=False
)
top_n_company_share_class_count_df.to_excel(
writer, sheet_name="top_n_provider_share_count", index=False
)
provider_fund_count.to_excel(
writer, sheet_name="provider_fund_count", index=False
)
provider_share_class_count.to_excel(
writer, sheet_name="provider_share_count", index=False
)
fund_share_class_count.to_excel(
writer, sheet_name="fund_share_count", index=False
)
describe_stat_df.to_excel(
writer, sheet_name="all_describe_statistics", index=False
)
def statistics_document_fund_share_count(provider_mapping_data_file: str):
if (
provider_mapping_data_file is None
or len(provider_mapping_data_file) == 0
or not os.path.exists(provider_mapping_data_file)
):
logger.error(f"Invalid file_path: {provider_mapping_data_file}")
return
describe_stat_df_list = []
# statistics document mapping information
doc_mapping_data = pd.read_excel(provider_mapping_data_file, sheet_name="all_data")
# set noTor column value to 0 if column tor value is not nan, set 1 otherwise
doc_mapping_data["noTor"] = doc_mapping_data["tor"].apply(
lambda x: 0 if pd.notna(x) else 1
)
# set share_noTer column value to 0 if column share_ter value is not nan, set 1 otherwise
doc_mapping_data["share_noTer"] = doc_mapping_data["share_ter"].apply(
lambda x: 0 if pd.notna(x) else 1
)
# set share_noOgc column value to 0 if column share_ter value is not nan, set 1 otherwise
doc_mapping_data["share_noOgc"] = doc_mapping_data["share_ogc"].apply(
lambda x: 0 if pd.notna(x) else 1
)
# set share_noPerfFee column value to 0 if column share_ter value is not nan, set 1 otherwise
doc_mapping_data["share_noPerfFee"] = doc_mapping_data["share_perfFee"].apply(
lambda x: 0 if pd.notna(x) else 1
)
# statistics doc_mapping_data for counting FundId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundId count based on DocumentId"
)
doc_fund_id_df = doc_mapping_data[["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundId"]].drop_duplicates()
doc_fund_count = (
doc_fund_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"]).size().reset_index(name="fund_count")
)
# order by fund_count in descending order
doc_fund_count = doc_fund_count.sort_values(by="fund_count", ascending=True)
# set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0
doc_fund_count["with_ar_data"] = False
for index, row in doc_fund_count.iterrows():
document_id = row["DocumentId"]
ar_data = doc_mapping_data[
(doc_mapping_data["DocumentId"] == document_id)
& (
(
(doc_mapping_data["noTor"] == 0)
| (doc_mapping_data["share_noTer"] == 0)
| (doc_mapping_data["share_noOgc"] == 0)
| (doc_mapping_data["share_noPerfFee"] == 0)
)
)
]
if len(ar_data) > 0:
doc_fund_count.loc[index, "with_ar_data"] = True
# statistics fund_count in doc_fund_count by describe and transform to DataFrame
doc_fund_count_stat_df = get_describe_stat(
doc_fund_count, "fund_count", "doc_fund_count"
)
describe_stat_df_list.append(doc_fund_count_stat_df)
# statistics doc_mapping_data for counting FundClassId count based on DocumentId
logger.info(
"statistics doc_mapping_data for counting FundClassId count based on DocumentId"
)
doc_share_class_id_df = doc_mapping_data[
["DocumentId", "EffectiveDate", "CompanyId", "CompanyName", "FundClassId"]
].drop_duplicates()
doc_share_class_count = (
doc_share_class_id_df.groupby(["DocumentId", "EffectiveDate", "CompanyId", "CompanyName"])
.size()
.reset_index(name="share_class_count")
)
# order by share_class_count in descending order
doc_share_class_count = doc_share_class_count.sort_values(
by="share_class_count", ascending=True
)
# set with_ar_data to True if noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0
doc_share_class_count["with_ar_data"] = False
for index, row in doc_share_class_count.iterrows():
document_id = row["DocumentId"]
ar_data = doc_mapping_data[
(doc_mapping_data["DocumentId"] == document_id)
& (
(
(doc_mapping_data["noTor"] == 0)
| (doc_mapping_data["share_noTer"] == 0)
| (doc_mapping_data["share_noOgc"] == 0)
| (doc_mapping_data["share_noPerfFee"] == 0)
)
)
]
if len(ar_data) > 0:
doc_share_class_count.loc[index, "with_ar_data"] = True
# statistics share_class_count in doc_share_class_count by describe and transform to DataFrame
doc_share_class_count_stat_df = get_describe_stat(
doc_share_class_count, "share_class_count", "doc_share_class_count"
)
describe_stat_df_list.append(doc_share_class_count_stat_df)
describe_stat_df = pd.concat(describe_stat_df_list)
describe_stat_df.reset_index(drop=True, inplace=True)
with pd.ExcelWriter(provider_mapping_data_file) as writer:
doc_mapping_data.to_excel(writer, sheet_name="all_data", index=False)
doc_fund_count.to_excel(writer, sheet_name="doc_fund_count", index=False)
doc_share_class_count.to_excel(writer, sheet_name="doc_share_class_count", index=False)
describe_stat_df.to_excel(writer, sheet_name="all_describe_statistics", index=False)
def get_top_n_records_count(
df: pd.DataFrame, column_name: str, n: int, total_count: int
):
top_n_records = df.head(n)
top_n_records_count = top_n_records[column_name].sum()
top_n_records_count_percent = round((top_n_records_count / total_count) * 100, 2)
return top_n_records_count, top_n_records_count_percent
def get_describe_stat(df: pd.DataFrame, column_name: str, stat_type_name: str):
stat_df = df[column_name].describe().reset_index().T
stat_df.columns = ["count", "mean", "std", "min", "25%", "50%", "75%", "max"]
stat_df.reset_index(inplace=True)
stat_df.rename(columns={"index": "Stat"}, inplace=True)
# remove the first row
stat_df = stat_df[1:]
if stat_type_name is not None:
stat_df["Stat_Type"] = stat_type_name
stat_df = stat_df[
[
"Stat_Type",
"count",
"mean",
"std",
"min",
"25%",
"50%",
"75%",
"max",
]
]
return stat_df
def pickup_document_from_top_100_providers():
"""
Pickup 100 documents from top 100 providers.
The documents are with less 10 share classes.
The purpose is to analyze the document structure and content from small documents.
"""
provider_mapping_data_file = (
r"/data/emea_ar/basic_information/English/provider_mapping_data_statistics.xlsx"
)
top_100_provider_document_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx"
)
provider_share_count = pd.read_excel(
provider_mapping_data_file, sheet_name="provider_share_count"
)
# add a new column with name share_count_rank to provider_share_count
provider_share_count["share_count_rank"] = provider_share_count[
"share_class_count"
].rank(method="min", ascending=False)
top_100_provider_document_all_data = pd.read_excel(
top_100_provider_document_file, sheet_name="all_data"
)
top_100_provider_document_fund_count = pd.read_excel(
top_100_provider_document_file, sheet_name="doc_fund_count"
)
top_100_provider_document_fund_count.reset_index(drop=True, inplace=True)
top_100_provider_document_share_count = pd.read_excel(
top_100_provider_document_file, sheet_name="doc_share_class_count"
)
top_100_provider_document_share_count = \
top_100_provider_document_share_count[top_100_provider_document_share_count["with_ar_data"] == True]
top_100_provider_document_share_count.reset_index(drop=True, inplace=True)
top_100_provider_document_share_count = pd.merge(
top_100_provider_document_share_count,
top_100_provider_document_fund_count,
on=["DocumentId"],
how="left",
)
top_100_provider_document_share_count = top_100_provider_document_share_count[
["DocumentId", "CompanyId_x", "CompanyName_x", "fund_count", "share_class_count"]
]
top_100_provider_document_share_count.rename(
columns={"CompanyId_x": "CompanyId"}, inplace=True
)
# add a new column with name share_count_rank to top_100_provider_document_share_count by merge with provider_share_count
top_100_provider_document_share_count = pd.merge(
top_100_provider_document_share_count,
provider_share_count,
on=["CompanyId"],
how="left",
)
# Keep columns: DocumentId, CompanyId, CompanyName, share_class_count_x, share_count_rank
top_100_provider_document_share_count = top_100_provider_document_share_count[
["DocumentId", "CompanyId", "CompanyName", "fund_count", "share_class_count_x", "share_count_rank"]
]
# rename column share_class_count_x to share_class_count
top_100_provider_document_share_count.rename(
columns={"share_class_count_x": "share_class_count",
"share_count_rank": "provider_share_count_rank"}, inplace=True
)
top_100_provider_document_share_count = top_100_provider_document_share_count.sort_values(
by=["provider_share_count_rank", "share_class_count"], ascending=True
)
# According to share_count_rank, from 1 to 10,
# random pickup one documents with 1 to 10 share classes for each rank
data_filter = top_100_provider_document_share_count[
(top_100_provider_document_share_count["share_class_count"] <= 10)
& (top_100_provider_document_share_count["share_class_count"] >= 1)
]
data_filter = data_filter.sort_values(
by=["provider_share_count_rank", "share_class_count"], ascending=[True, True]
)
unique_rank_list = top_100_provider_document_share_count["provider_share_count_rank"].unique().tolist()
random_pickup_document_data_list = []
for rank in unique_rank_list:
data_filter_rank = data_filter[data_filter["provider_share_count_rank"] == rank]
if len(data_filter_rank) == 0:
# get the first document with rank from top_100_provider_document_share_count
data_filter_rank = top_100_provider_document_share_count[
top_100_provider_document_share_count["provider_share_count_rank"] == rank
].head(1)
data_filter_rank = data_filter_rank.sample(n=1, random_state=88)
random_pickup_document_data_list.append(data_filter_rank)
random_pickup_document_data = pd.concat(random_pickup_document_data_list)
# sort by share_count_rank in ascending order
random_pickup_document_data = random_pickup_document_data.sort_values(
by="provider_share_count_rank", ascending=True
)
random_pickup_document_data.reset_index(drop=True, inplace=True)
random_pickup_document_mini_data = random_pickup_document_data[
["DocumentId", "provider_share_count_rank"]
]
# get all data from top_100_provider_document_all_data by merge with random_pickup_document_mini_data
random_pickup_document_all_data = pd.merge(
random_pickup_document_mini_data,
top_100_provider_document_all_data,
on=["DocumentId"],
how="left",
)
# sort random_pickup_document_all_data by provider_share_count_rank, FundLegalName, FundClassLegalName in ascending order
random_pickup_document_all_data = random_pickup_document_all_data.sort_values(
by=["provider_share_count_rank", "FundLegalName", "FundClassLegalName"], ascending=True
)
random_small_document_data_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx"
)
with pd.ExcelWriter(random_small_document_data_file) as writer:
top_100_provider_document_share_count.to_excel(
writer, sheet_name="all_doc_with_ar_data", index=False
)
random_pickup_document_data.to_excel(
writer, sheet_name="random_small_document", index=False
)
random_pickup_document_all_data.to_excel(
writer, sheet_name="random_small_document_all_data", index=False
)
def compare_records_count_by_document_id():
data_from_document = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
sheet_name = "mapping_data"
data_from_document_df = pd.read_excel(data_from_document, sheet_name=sheet_name)
data_from_document_df.rename(
columns={"doc_id": "DocumentId"}, inplace=True
)
# get the count of records by DocumentId
document_records_count = data_from_document_df.groupby("DocumentId").size().reset_index(name="records_count")
data_from_database = r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document_from_DocumentAcquisition.xlsx"
sheet_name = "random_small_document_all_data"
data_from_database_df = pd.read_excel(data_from_database, sheet_name=sheet_name)
database_records_count = data_from_database_df.groupby("DocumentId").size().reset_index(name="records_count")
# merge document_records_count with database_records_count
records_count_compare = pd.merge(
document_records_count,
database_records_count,
on=["DocumentId"],
how="left",
)
records_count_compare["records_count_diff"] = records_count_compare["records_count_x"] - records_count_compare["records_count_y"]
records_count_compare = records_count_compare.sort_values(by="records_count_diff", ascending=False)
# rename records_count_x to records_count_document, records_count_y to records_count_database
records_count_compare.rename(
columns={"records_count_x": "records_count_document",
"records_count_y": "records_count_database"}, inplace=True
)
records_count_compare.reset_index(drop=True, inplace=True)
records_count_compare_file = (
r"/data/emea_ar/basic_information/English/records_count_compare_between_document_database_from_DocumentAcquisition.xlsx"
)
with pd.ExcelWriter(records_count_compare_file) as writer:
records_count_compare.to_excel(
writer, sheet_name="records_count_compare", index=False
)
def get_document_extracted_share_diff_by_db():
db_data_file = r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document_from_DocumentAcquisition.xlsx"
extract_data_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_73_documents.xlsx"
doc_mapping_folder = r"/data/emea_ar/output/mapping/document/"
db_data = pd.read_excel(db_data_file, sheet_name="Sheet1")
extract_data = pd.read_excel(extract_data_file, sheet_name="mapping_data")
# only get data which investment_type is 1
# extract_data = extract_data[extract_data["investment_type"] == 1]
extract_data.reset_index(drop=True, inplace=True)
unique_doc_id = extract_data["doc_id"].unique().tolist()
status_info = {
1: "WIP",
5: "Junked",
3: "AutoSignoff",
2: "Signoffed",
10: "Complete",
20: "AutoDetect",
21: "Checkduplicate",
22: "Mapping",
33: "Not Matched",
99: "Unknown",
}
document_extract_db_compare = []
for doc_id in unique_doc_id:
doc_mapping_file = os.path.join(doc_mapping_folder, f"{doc_id}.xlsx")
if not os.path.exists(doc_mapping_file):
logger.error(f"Invalid mapping_file: {doc_mapping_file}")
doc_mapping_share_class_id_df = pd.DataFrame()
else:
doc_mapping_data = pd.read_excel(doc_mapping_file)
doc_mapping_share_class_id_df = doc_mapping_data[["SecId"]].drop_duplicates()
ar_db_data_doc = db_data[db_data["DocumentId"] == doc_id]
try:
masterProcess_status = ar_db_data_doc["MasterProcess_Status"].values[0]
except Exception as e:
logger.error(f"Error: {e}")
masterProcess_status = 99
masterProcess_status = int(masterProcess_status)
masterProcess_status_defination = status_info.get(masterProcess_status, "Unknown")
# get data from ar_db_data_doc which noTor == 0 or share_noOgc == 0 or share_noPerfFee == 0
ar_db_data_doc = ar_db_data_doc[
(ar_db_data_doc["noTor"] == 0)
| (ar_db_data_doc["share_noTer"] == 0)
| (ar_db_data_doc["share_noOgc"] == 0)
| (ar_db_data_doc["share_noPerfFee"] == 0)
]
extract_data_doc = extract_data[extract_data["doc_id"] == doc_id]
# unique raw_name in extract_data_doc
unique_raw_name = extract_data_doc["raw_name"].unique().tolist()
doc_mapping_share_class_count = len(doc_mapping_share_class_id_df)
extract_share_class_count = len(unique_raw_name)
extract_vs_doc_share_count_diff = extract_share_class_count - doc_mapping_share_class_count
db_share_class_count = len(ar_db_data_doc)
extract_vs_ar_db_share_count_diff = extract_share_class_count - db_share_class_count
document_extract_db_compare.append({
"DocumentId": doc_id,
"status": masterProcess_status,
"status_defination": masterProcess_status_defination,
"extract_share_count": extract_share_class_count,
"doc_share_count": doc_mapping_share_class_count,
"extract_vs_doc_share_count_diff": extract_vs_doc_share_count_diff,
"ar_db_share_count": db_share_class_count,
"extract_vs_ar_db_share_count_diff": extract_vs_ar_db_share_count_diff,
})
document_extract_db_compare_df = pd.DataFrame(document_extract_db_compare)
# output to excel
document_extract_db_compare_file = (
r"/data/emea_ar/basic_information/English/document_extract_db_compare.xlsx"
)
with pd.ExcelWriter(document_extract_db_compare_file) as writer:
document_extract_db_compare_df.to_excel(
writer, sheet_name="document_extract_db_compare", index=False
)
def concat_mapping(mapping_folder: str,
output_file: str):
excel_files = glob(os.path.join(mapping_folder, "*.xlsx"))
logger.info(f"Total {len(excel_files)} excel files found in {mapping_folder}")
all_data_list = []
for excel_file in excel_files:
doc_mapping_data = pd.read_excel(excel_file)
all_data_list.append(doc_mapping_data)
all_data = pd.concat(all_data_list)
all_data.reset_index(drop=True, inplace=True)
with open(output_file, "wb") as f:
all_data.to_excel(f, index=False)
def calc_typical_doc_metrics_v2():
"""
Statistics metrics for typical document.
1. Fund level datapoint: TOR
2. Share level datapoint: OGC, TER, Performance fees
3. Only statistics the record which with document investment mapping
"""
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
result_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_20_new_emea_documents_sample_Accuracy.xlsx"
sheet_name = "record_level_Results"
data = pd.read_excel(result_file, sheet_name=sheet_name)
data.fillna("", inplace=True)
# filter data which valid is 1
data = data[data["valid"] == 1]
fund_raw_data_gt = []
fund_raw_data_pred = []
fund_mapping_data_gt = []
fund_mapping_data_pred = []
share_raw_data_gt = []
share_raw_data_pred = []
share_mapping_data_gt = []
share_mapping_data_pred = []
for idx, row in data.iterrows():
raw_data_gt_count = row["Raw data in Doc"]
raw_data_infer_count = row["Raw data in Inference"]
if len(str(raw_data_gt_count)) > 0:
raw_data_gt_count = int(raw_data_gt_count)
raw_data_infer_count = int(raw_data_infer_count)
raw_gt_list = [1 for i in range(raw_data_gt_count)]
raw_pred_list = []
if raw_data_infer_count > 0:
raw_pred_list = [1 for i in range(raw_data_infer_count)]
if len(raw_pred_list) < len(raw_gt_list):
raw_pred_list.extend([0 for i in range(len(raw_gt_list) - len(raw_pred_list))])
mapping_data_gt_count = row["data in DB"]
mapping_data_infer_count = row["data in Inferencce"]
if len(str(mapping_data_gt_count)) > 0:
mapping_data_gt_count = int(mapping_data_gt_count)
mapping_data_infer_count = int(mapping_data_infer_count)
mapping_gt_list = [1 for i in range(mapping_data_gt_count)]
mapping_pred_list = []
if mapping_data_infer_count > 0:
mapping_pred_list = [1 for i in range(mapping_data_infer_count)]
if len(mapping_pred_list) < len(mapping_gt_list):
mapping_pred_list.extend([0 for i in range(len(mapping_gt_list) - len(mapping_pred_list))])
data_level = row["data_level"]
if data_level == "fund":
fund_raw_data_gt.extend(raw_gt_list)
fund_raw_data_pred.extend(raw_pred_list)
fund_mapping_data_gt.extend(mapping_gt_list)
fund_mapping_data_pred.extend(mapping_pred_list)
else:
share_raw_data_gt.extend(raw_gt_list)
share_raw_data_pred.extend(raw_pred_list)
share_mapping_data_gt.extend(mapping_gt_list)
share_mapping_data_pred.extend(mapping_pred_list)
share_raw_data_gt.extend([0, 0, 0, 0, 0, 0])
share_raw_data_pred.extend([1, 1, 1, 1, 1, 1])
share_mapping_data_gt.extend([0, 0, 0, 0, 0, 0])
share_mapping_data_pred.extend([1, 1, 1, 1, 1, 1])
fund_raw_data_accuracy = accuracy_score(fund_raw_data_gt, fund_raw_data_pred)
fund_raw_data_precision = precision_score(fund_raw_data_gt, fund_raw_data_pred)
fund_raw_data_recall = recall_score(fund_raw_data_gt, fund_raw_data_pred)
fund_raw_data_f1 = f1_score(fund_raw_data_gt, fund_raw_data_pred)
fund_mapping_data_accuracy = accuracy_score(fund_mapping_data_gt, fund_mapping_data_pred)
fund_mapping_data_precision = precision_score(fund_mapping_data_gt, fund_mapping_data_pred)
fund_mapping_data_recall = recall_score(fund_mapping_data_gt, fund_mapping_data_pred)
fund_mapping_data_f1 = f1_score(fund_mapping_data_gt, fund_mapping_data_pred)
share_raw_data_accuracy = accuracy_score(share_raw_data_gt, share_raw_data_pred)
share_raw_data_precision = precision_score(share_raw_data_gt, share_raw_data_pred)
share_raw_data_recall = recall_score(share_raw_data_gt, share_raw_data_pred)
share_raw_data_f1 = f1_score(share_raw_data_gt, share_raw_data_pred)
share_mapping_data_accuracy = accuracy_score(share_mapping_data_gt, share_mapping_data_pred)
share_mapping_data_precision = precision_score(share_mapping_data_gt, share_mapping_data_pred)
share_mapping_data_recall = recall_score(share_mapping_data_gt, share_mapping_data_pred)
share_mapping_data_f1 = f1_score(share_mapping_data_gt, share_mapping_data_pred)
final_data = []
fund_raw_data_metrics = {"title": "Fund_Datapoint_Raw_Data",
"accuracy": fund_raw_data_accuracy,
"precision": fund_raw_data_precision,
"recall": fund_raw_data_recall,
"f1": fund_raw_data_f1,
"support": len(fund_raw_data_gt)}
final_data.append(fund_raw_data_metrics)
logger.info(f"fund_raw_data_accuracy: {fund_raw_data_accuracy}")
logger.info(f"fund_raw_data_precision: {fund_raw_data_precision}")
logger.info(f"fund_raw_data_recall: {fund_raw_data_recall}")
logger.info(f"fund_raw_data_f1: {fund_raw_data_f1}")
logger.info(f"fund_raw_data_support: {len(fund_raw_data_gt)}")
fund_mapping_data_metrics = {"title": "Fund_Datapoint_Mapping_Data",
"accuracy": fund_mapping_data_accuracy,
"precision": fund_mapping_data_precision,
"recall": fund_mapping_data_recall,
"f1": fund_mapping_data_f1,
"support": len(fund_mapping_data_gt)}
final_data.append(fund_mapping_data_metrics)
logger.info(f"fund_mapping_data_accuracy: {fund_mapping_data_accuracy}")
logger.info(f"fund_mapping_data_precision: {fund_mapping_data_precision}")
logger.info(f"fund_mapping_data_recall: {fund_mapping_data_recall}")
logger.info(f"fund_mapping_data_f1: {fund_mapping_data_f1}")
logger.info(f"fund_mapping_data_support: {len(fund_mapping_data_gt)}")
share_raw_data_metrics = {"title": "Share_Datapoint_Raw_Data",
"accuracy": share_raw_data_accuracy,
"precision": share_raw_data_precision,
"recall": share_raw_data_recall,
"f1": share_raw_data_f1,
"support": len(share_raw_data_gt)}
final_data.append(share_raw_data_metrics)
logger.info(f"share_raw_data_accuracy: {share_raw_data_accuracy}")
logger.info(f"share_raw_data_precision: {share_raw_data_precision}")
logger.info(f"share_raw_data_recall: {share_raw_data_recall}")
logger.info(f"share_raw_data_f1: {share_raw_data_f1}")
logger.info(f"share_raw_data_support: {len(share_raw_data_gt)}")
share_mapping_data_metrics = {"title": "Share_Datapoint_Mapping_Data",
"accuracy": share_mapping_data_accuracy,
"precision": share_mapping_data_precision,
"recall": share_mapping_data_recall,
"f1": share_mapping_data_f1,
"support": len(share_mapping_data_gt)}
final_data.append(share_mapping_data_metrics)
logger.info(f"share_mapping_data_accuracy: {share_mapping_data_accuracy}")
logger.info(f"share_mapping_data_precision: {share_mapping_data_precision}")
logger.info(f"share_mapping_data_recall: {share_mapping_data_recall}")
logger.info(f"share_mapping_data_f1: {share_mapping_data_f1}")
logger.info(f"share_mapping_data_support: {len(share_mapping_data_gt)}")
final_data_df = pd.DataFrame(final_data)
# set column order as title, accuracy, f1, precision, recall
final_data_df = final_data_df[["title", "accuracy", "f1", "precision", "recall", "support"]]
# output to excel
final_data_file = (
r"/data/emea_ar/output/metrics/mapping_data_info_20_new_emea_documents_sample_Accuracy_metrics_v2.xlsx"
)
with pd.ExcelWriter(final_data_file) as writer:
final_data_df.to_excel(
writer, sheet_name="metrics", index=False
)
def calc_typical_doc_metrics_v1():
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
result_file = r"/data/emea_ar/ground_truth/data_extraction/mapping_data_info_20_new_emea_documents_sample_Accuracy.xlsx"
sheet_name = "record_level_Results"
data = pd.read_excel(result_file, sheet_name=sheet_name)
data.fillna("", inplace=True)
fund_raw_data_list = data["Raw Mapping"].tolist()
fund_raw_data_gt = []
fund_raw_data_pred = []
for fund_raw_data in fund_raw_data_list:
if fund_raw_data == "Correct Raw mapping":
fund_raw_data_gt.append(1)
fund_raw_data_pred.append(1)
elif fund_raw_data == "Incorrect Raw mapping":
fund_raw_data_gt.append(1)
fund_raw_data_pred.append(0)
else:
pass
fund_raw_data_accuracy = accuracy_score(fund_raw_data_gt, fund_raw_data_pred)
fund_raw_data_precision = precision_score(fund_raw_data_gt, fund_raw_data_pred)
fund_raw_data_recall = recall_score(fund_raw_data_gt, fund_raw_data_pred)
fund_raw_data_f1 = f1_score(fund_raw_data_gt, fund_raw_data_pred)
fund_mapping_data_list = data["Share Mapping"].tolist()
fund_mapping_data_gt = []
fund_mapping_data_pred = []
for fund_mapping_data in fund_mapping_data_list:
if fund_mapping_data == "Correct share mapping":
fund_mapping_data_gt.append(1)
fund_mapping_data_pred.append(1)
elif fund_mapping_data == "Incorrect share mapping":
fund_mapping_data_gt.append(1)
fund_mapping_data_pred.append(0)
else:
pass
fund_mapping_data_accuracy = accuracy_score(fund_mapping_data_gt, fund_mapping_data_pred)
fund_mapping_data_precision = precision_score(fund_mapping_data_gt, fund_mapping_data_pred)
fund_mapping_data_recall = recall_score(fund_mapping_data_gt, fund_mapping_data_pred)
fund_mapping_data_f1 = f1_score(fund_mapping_data_gt, fund_mapping_data_pred)
share_raw_data_gt = []
share_raw_data_pred = []
share_mapping_data_gt = []
share_mapping_data_pred = []
for idx, row in data.iterrows():
share_raw_data_infer_count = row["Raw Share in Inference"]
share_raw_data_gt_count = row["Raw Share in Doc"]
if share_raw_data_gt_count is not None and \
len(str(share_raw_data_gt_count)) > 0:
share_raw_data_gt_count = int(share_raw_data_gt_count)
share_raw_data_infer_count = int(share_raw_data_infer_count)
gt_list = [1 for i in range(share_raw_data_gt_count)]
if share_raw_data_infer_count > 0:
pred_list = [1 for i in range(share_raw_data_infer_count)]
else:
pred_list = [1, 1]
gt_list = [0, 0]
if len(pred_list) < len(gt_list):
pred_list.extend([0 for i in range(len(gt_list) - len(pred_list))])
share_raw_data_gt.extend(gt_list)
share_raw_data_pred.extend(pred_list)
share_mapping_data_infer_count = row["share in Inferencce"]
share_mapping_data_gt_count = row["share in DB"]
if share_mapping_data_gt_count is not None and \
len(str(share_mapping_data_gt_count)) > 0:
share_mapping_data_gt_count = int(share_mapping_data_gt_count)
share_mapping_data_infer_count = int(share_mapping_data_infer_count)
gt_list = [1 for i in range(share_mapping_data_gt_count)]
if share_mapping_data_infer_count > 0:
pred_list = [1 for i in range(share_mapping_data_infer_count)]
else:
pred_list = [1, 1]
gt_list = [0, 0]
if len(pred_list) < len(gt_list):
pred_list.extend([0 for i in range(len(gt_list) - len(pred_list))])
share_mapping_data_gt.extend(gt_list)
share_mapping_data_pred.extend(pred_list)
share_raw_data_accuracy = accuracy_score(share_raw_data_gt, share_raw_data_pred)
share_raw_data_precision = precision_score(share_raw_data_gt, share_raw_data_pred)
share_raw_data_recall = recall_score(share_raw_data_gt, share_raw_data_pred)
share_raw_data_f1 = f1_score(share_raw_data_gt, share_raw_data_pred)
share_mapping_data_accuracy = accuracy_score(share_mapping_data_gt, share_mapping_data_pred)
share_mapping_data_precision = precision_score(share_mapping_data_gt, share_mapping_data_pred)
share_mapping_data_recall = recall_score(share_mapping_data_gt, share_mapping_data_pred)
share_mapping_data_f1 = f1_score(share_mapping_data_gt, share_mapping_data_pred)
final_data = []
fund_raw_data_metrics = {"title": "Fund_Raw_Data",
"accuracy": fund_raw_data_accuracy,
"precision": fund_raw_data_precision,
"recall": fund_raw_data_recall,
"f1": fund_raw_data_f1,
"support": len(fund_raw_data_gt)}
final_data.append(fund_raw_data_metrics)
logger.info(f"fund_raw_data_accuracy: {fund_raw_data_accuracy}")
logger.info(f"fund_raw_data_precision: {fund_raw_data_precision}")
logger.info(f"fund_raw_data_recall: {fund_raw_data_recall}")
logger.info(f"fund_raw_data_f1: {fund_raw_data_f1}")
fund_mapping_data_metrics = {"title": "Fund_Mapping_Data",
"accuracy": fund_mapping_data_accuracy,
"precision": fund_mapping_data_precision,
"recall": fund_mapping_data_recall,
"f1": fund_mapping_data_f1,
"support": len(fund_mapping_data_gt)}
final_data.append(fund_mapping_data_metrics)
logger.info(f"fund_mapping_data_accuracy: {fund_mapping_data_accuracy}")
logger.info(f"fund_mapping_data_precision: {fund_mapping_data_precision}")
logger.info(f"fund_mapping_data_recall: {fund_mapping_data_recall}")
logger.info(f"fund_mapping_data_f1: {fund_mapping_data_f1}")
share_raw_data_metrics = {"title": "Share_Raw_Data",
"accuracy": share_raw_data_accuracy,
"precision": share_raw_data_precision,
"recall": share_raw_data_recall,
"f1": share_raw_data_f1,
"support": len(share_raw_data_gt)}
final_data.append(share_raw_data_metrics)
logger.info(f"share_raw_data_accuracy: {share_raw_data_accuracy}")
logger.info(f"share_raw_data_precision: {share_raw_data_precision}")
logger.info(f"share_raw_data_recall: {share_raw_data_recall}")
logger.info(f"share_raw_data_f1: {share_raw_data_f1}")
share_mapping_data_metrics = {"title": "Share_Mapping_Data",
"accuracy": share_mapping_data_accuracy,
"precision": share_mapping_data_precision,
"recall": share_mapping_data_recall,
"f1": share_mapping_data_f1,
"support": len(share_mapping_data_gt)}
final_data.append(share_mapping_data_metrics)
logger.info(f"share_mapping_data_accuracy: {share_mapping_data_accuracy}")
logger.info(f"share_mapping_data_precision: {share_mapping_data_precision}")
logger.info(f"share_mapping_data_recall: {share_mapping_data_recall}")
logger.info(f"share_mapping_data_f1: {share_mapping_data_f1}")
final_data_df = pd.DataFrame(final_data)
# set column order as title, accuracy, f1, precision, recall
final_data_df = final_data_df[["title", "accuracy", "f1", "precision", "recall", "support"]]
# output to excel
final_data_file = (
r"/data/emea_ar/output/metrics/mapping_data_info_20_new_emea_documents_sample_Accuracy_metrics.xlsx"
)
with pd.ExcelWriter(final_data_file) as writer:
final_data_df.to_excel(
writer, sheet_name="metrics", index=False
)
def merge_aus_document_prospectus_data():
"""
Merge AUS document and prospectus data.
"""
aus_document_file = r"/data/aus_prospectus/basic_information/document_mapping.xlsx"
aus_prospectus_file = r"/data/aus_prospectus/basic_information/aus_prospectus_data.xlsx"
aus_document_data = pd.read_excel(aus_document_file)
aus_prospectus_data = pd.read_excel(aus_prospectus_file)
aus_document_data["DocumentId"] = aus_document_data["DocumentId"].astype(str)
aus_document_prospectus_data = pd.merge(
aus_document_data,
aus_prospectus_data,
on=["FundClassId", "EffectiveDate"],
how="left",
)
aus_document_prospectus_file = r"/data/aus_prospectus/aus_document_prospectus.xlsx"
with pd.ExcelWriter(aus_document_prospectus_file) as writer:
aus_document_prospectus_data.to_excel(
writer, sheet_name="aus_document_prospectus", index=False
)
def get_pdf_2_html():
pass
if __name__ == "__main__":
# merge_aus_document_prospectus_data()
folder = r"/data/emea_ar/basic_information/English/sample_doc/emea_11_06_case/"
file_name = "doc_ar_data_for_emea_11_06.xlsx"
# get_document_with_all_4_data_points(folder, file_name, None)
# calc_typical_doc_metrics_v1()
# calc_typical_doc_metrics_v2()
doc_provider_file_path = (
r"/data/emea_ar/basic_information/English/latest_provider_ar_document.xlsx"
)
doc_ar_data_file_path = r"/data/emea_ar/basic_information/English/latest_provider_ar_document_mapping.xlsx"
provider_mapping_data_file = (
r"/data/emea_ar/basic_information/English/provider_mapping_data.xlsx"
)
doc_mapping_from_top_100_provider_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_from_top_100_provider_since_2020.xlsx"
)
basic_info_folder = r"/data/emea_ar/basic_information/English/"
pdf_folder = r"/data/emea_ar/pdf/"
output_folder = r"/data/emea_ar/output/"
# get_unique_docids_from_doc_provider_data(doc_provider_file_path)
# download_pdf(doc_provider_file_path, 'doc_provider_count', pdf_folder)
# pdf_folder = r"/data/emea_ar/small_pdf/"
output_folder = r"/data/emea_ar/small_pdf_txt/"
random_small_document_data_file = (
r"/data/emea_ar/basic_information/English/lux_english_ar_top_100_provider_random_small_document.xlsx"
)
doc_provider_file_path = r"/data/emea_ar/basic_information/English/sample_doc/emea_final_typical_case/Final list of EMEA documents.xlsx"
pdf_folder = r"/data/emea_ar/pdf/"
# download_pdf(
# doc_provider_file_path=doc_provider_file_path,
# sheet_name="Sheet1",
# doc_id_column="Document Id",
# pdf_path=pdf_folder)
pdf_folder = r"/data/aus_prospectus/pdf/"
output_folder = r"/data/aus_prospectus/pdf_txt/"
output_pdf_page_text(pdf_folder, output_folder)
# extract_pdf_table(pdf_folder, output_folder)
# analyze_json_error()
latest_top_100_provider_ar_data_file = r"/data/emea_ar/basic_information/English/top_100_provider_latest_document_most_mapping/lux_english_ar_from_top_100_provider_latest_document_with_most_mappings.xlsx"
# download_pdf(latest_top_100_provider_ar_data_file,
# 'latest_ar_document_most_mapping',
# pdf_folder)
doc_ar_data_file_path = r"/data/emea_ar/basic_information/sample_documents/sample_doc/sample_documents_12_11/doc_ar_data_12_11.xlsx"
doc_mapping_data_file_path = r"/data/emea_ar/basic_information/sample_documents/sample_doc/sample_documents_12_11/doc_mapping_12_11.xlsx"
output_data_folder = r"/data/emea_ar/basic_information/sample_documents/sample_doc/sample_documents_12_11/"
output_file="doc_ar_data_sample_documents_12_11_statistics.xlsx"
pdf_folder = r"/data/illume/japan_prospectus/pdf/"
doc_ar_data_file_path = None
doc_mapping_data_file_path = r"/data/illume/japan_prospectus/materials/document_mapping.xlsx"
output_data_folder = r"/data/illume/japan_prospectus/materials/"
output_file = "japan_prospectus_statistics.xlsx"
# statistics_document(pdf_folder=pdf_folder,
# doc_mapping_file_path=doc_mapping_data_file_path,
# doc_ar_data_file_path=doc_ar_data_file_path,
# mapping_sheet_name="Sheet1",
# ar_data_sheet_name="doc_ar_data_in_db",
# output_folder=output_data_folder,
# output_file=output_file)
# get_document_extracted_share_diff_by_db()
# statistics_provider_mapping(
# provider_mapping_data_file=provider_mapping_data_file,
# output_folder=basic_info_folder,
# )
# statistics_document_fund_share_count(doc_mapping_from_top_100_provider_file)
# pickup_document_from_top_100_providers()
# compare_records_count_by_document_id()
# document_mapping_folder = r"/data/emea_ar/output/mapping/document/"
# all_data_file = r"/data/emea_ar/output/mapping/all_document_mapping.xlsx"
# concat_mapping(document_mapping_folder, all_data_file)
# provider_mapping_folder = r"/data/emea_ar/output/mapping/provider/"
# all_data_file = r"/data/emea_ar/output/mapping/all_provider_mapping.xlsx"
# concat_mapping(provider_mapping_folder, all_data_file)