crypto_quant/core/utils.py

71 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timezone, timedelta
from decimal import Decimal
import re
import core.logger as logging
logger = logging.logger
def datetime_to_timestamp(date_str: str) -> int:
"""
将日期字符串(如 '2023-01-01 12:00:00')直接转换为毫秒级时间戳
:param date_str: 形如 '2023-01-01 12:00:00' 的日期时间字符串
:return: 毫秒级时间戳
"""
dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
return int(dt.timestamp() * 1000)
def timestamp_to_datetime(timestamp: int) -> str:
"""
将时间戳转换为日期字符串
请考虑日期字符串位于北京时区
:param timestamp: 以毫秒为单位的时间戳
:return: 形如 '2023-01-01 12:00:00' 的日期时间字符串
"""
dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=8)))
return dt.strftime('%Y-%m-%d %H:%M:%S')
def transform_data_type(data: dict):
"""
遍历字典将所有Decimal类型的值转换为float类型
"""
for key, value in data.items():
if isinstance(value, Decimal):
data[key] = float(value)
return data
def check_date_time_format(date_time: str) -> str | None:
"""
检查日期时间格式是否正确
"""
if re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', date_time):
return date_time
elif re.match(r'^\d{4}-\d{2}-\d{2}$', date_time):
return date_time + " 00:00:00"
elif re.match(r'^\d{4}\d{2}\d{2}$', date_time):
return f"{date_time[0:4]}-{date_time[4:6]}-{date_time[6:8]} 00:00:00"
else:
return None
def transform_date_time_to_timestamp(date_time: int | str):
"""
将日期时间转换为毫秒级timestamp
"""
try:
# 判断是否就是timestamp整型数据
if isinstance(date_time, int):
date_time = date_time
# 判断是否为纯数字UTC毫秒级timestamp
elif date_time.isdigit():
date_time = int(date_time)
else:
date_time = check_date_time_format(date_time)
if date_time is None:
logger.error(f"日期时间格式错误: {date_time}")
return None
# 按北京时间字符串处理转换为毫秒级timestamp
date_time = datetime_to_timestamp(date_time)
return date_time
except Exception as e:
logger.error(f"start参数解析失败: {e}")
return None