dc-ml-emea-ar/utils/sql_query_util.py

111 lines
4.5 KiB
Python
Raw Normal View History

import json
import time
from urllib import request
import pandas as pd
import os
import dotenv
# loads .env file with your OPENAI_API_KEY
dotenv.load_dotenv()
def query_document_fund_mapping(doc_id, rerun=True, output_folder=r"./data/emea_ar/output/db_mapping/document/"):
count = 1
while True:
try:
document_mapping_info_df = pd.DataFrame()
if rerun is False and output_folder is not None and len(output_folder) > 0 and os.path.exists(output_folder):
output_file = os.path.join(output_folder, f"{doc_id}.xlsx")
if os.path.exists(output_file):
document_mapping_info_df = pd.read_excel(output_file)
if len(document_mapping_info_df) == 0:
document_mapping_info_df = query_data_by_biz_type(
biztype="getFundInfoByDocId", para=doc_id, return_df=True
).drop_duplicates()
if len(document_mapping_info_df) == 0:
return document_mapping_info_df
document_mapping_info_df = document_mapping_info_df.sort_values(
by=["FundName", "ShareClassName"]
).reset_index(drop=True)
if output_folder is not None and len(output_folder) > 0:
try:
os.makedirs(output_folder, exist_ok=True)
output_file = os.path.join(output_folder, f"{doc_id}.xlsx")
with pd.ExcelWriter(output_file) as writer:
document_mapping_info_df.to_excel(writer, index=False)
except:
pass
return document_mapping_info_df
except Exception as e:
print(e)
time.sleep(3)
if count == 5:
break
count += 1
def query_investment_by_provider(company_id: str, rerun=True, output_folder=r"./data/emea_ar/output/db_mapping/provider/"):
count = 1
while True:
try:
investment_by_provider_df = pd.DataFrame()
if rerun is False and output_folder is not None and len(output_folder) > 0 and os.path.exists(output_folder):
output_file = os.path.join(output_folder, f"{company_id}.xlsx")
if os.path.exists(output_file):
investment_by_provider_df = pd.read_excel(output_file)
if len(investment_by_provider_df) == 0:
investment_by_provider_df = query_data_by_biz_type(biztype='getInvestmentByProvider',
para=company_id,
return_df=True).drop_duplicates()
if len(investment_by_provider_df) == 0:
return investment_by_provider_df
investment_by_provider_df = investment_by_provider_df \
.sort_values(by=['FundName', 'ShareClassName']) \
.reset_index(drop=True)
if output_folder is not None and len(output_folder) > 0:
try:
os.makedirs(output_folder, exist_ok=True)
output_file = os.path.join(output_folder, f"{company_id}.xlsx")
with pd.ExcelWriter(output_file) as writer:
investment_by_provider_df.to_excel(writer, index=False)
except:
pass
return investment_by_provider_df
except Exception as e:
print(e)
time.sleep(3)
if count == 5:
break
count += 1
def query_data_by_biz_type(biztype: str, para, return_df: bool):
sqlpass_url = "https://api.morningstar.com/sqlpassapi/v1/sql"
url = sqlpass_url + "?sqlName={0}&params={1}".format(biztype, str(para))
headers = {"ApiKey": os.getenv("SQL_PASS_KEY")}
if return_df:
return pd.DataFrame(query_data_by_url(url, headers))
else:
return query_data_by_url(url, headers)
def query_data_by_url(url, headers):
res = None
count = 1
while True:
try:
req = request.Request(url=url, headers=headers)
res = request.urlopen(req)
res = res.read().decode(encoding="utf-8", errors="ignore")
break
except Exception as e:
print(e)
time.sleep(3)
if count == 5:
break
count += 1
if res is not None:
dic = json.loads(res)
return dic["result"]
else:
return None