crypto_quant/core/utils.py

83 lines
2.9 KiB
Python
Raw Permalink Normal View History

2025-07-24 10:23:00 +00:00
from datetime import datetime, timezone, timedelta
2025-07-25 08:12:52 +00:00
from decimal import Decimal
import re
import core.logger as logging
logger = logging.logger
2025-07-24 10:23:00 +00:00
2025-09-04 10:15:30 +00:00
def datetime_to_timestamp(date_str: str, is_utc: bool = False) -> int:
2025-07-24 10:23:00 +00:00
"""
将日期字符串 '2023-01-01 12:00:00'直接转换为毫秒级时间戳
:param date_str: 形如 '2023-01-01 12:00:00' 的日期时间字符串
2025-09-04 10:15:30 +00:00
:param is_utc: 是否为 UTC 时间False 时按北京时间UTC+8解析
2025-07-24 10:23:00 +00:00
:return: 毫秒级时间戳
"""
2025-09-04 10:15:30 +00:00
if is_utc:
dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
else:
# 默认按北京时间UTC+8解析
dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone(timedelta(hours=8)))
2025-07-24 10:23:00 +00:00
return int(dt.timestamp() * 1000)
def timestamp_to_datetime(timestamp: int) -> str:
"""
将时间戳转换为日期字符串
请考虑日期字符串位于北京时区
:param timestamp: 以毫秒为单位的时间戳
:return: 形如 '2023-01-01 12:00:00' 的日期时间字符串
"""
dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=8)))
2025-07-25 08:12:52 +00:00
return dt.strftime('%Y-%m-%d %H:%M:%S')
def transform_data_type(data: dict):
"""
遍历字典将所有Decimal类型的值转换为float类型
"""
for key, value in data.items():
if isinstance(value, Decimal):
data[key] = float(value)
return data
def check_date_time_format(date_time: str) -> str | None:
"""
检查日期时间格式是否正确
"""
if re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', date_time):
return date_time
elif re.match(r'^\d{4}-\d{2}-\d{2}$', date_time):
return date_time + " 00:00:00"
elif re.match(r'^\d{4}\d{2}\d{2}$', date_time):
return f"{date_time[0:4]}-{date_time[4:6]}-{date_time[6:8]} 00:00:00"
else:
return None
def transform_date_time_to_timestamp(date_time: int | str):
"""
将日期时间转换为毫秒级timestamp
"""
try:
# 判断是否就是timestamp整型数据
if isinstance(date_time, int):
date_time = date_time
# 判断是否为纯数字UTC毫秒级timestamp
elif date_time.isdigit():
date_time = int(date_time)
else:
date_time = check_date_time_format(date_time)
if date_time is None:
logger.error(f"日期时间格式错误: {date_time}")
return None
# 按北京时间字符串处理转换为毫秒级timestamp
date_time = datetime_to_timestamp(date_time)
return date_time
except Exception as e:
logger.error(f"start参数解析失败: {e}")
2025-09-16 06:31:15 +00:00
return None
2025-09-16 09:09:00 +00:00
def get_current_date_time(format: str = "%Y-%m-%d %H:%M:%S") -> str:
2025-09-16 06:31:15 +00:00
"""
获取当前日期时间
"""
2025-09-16 09:09:00 +00:00
return datetime.now(timezone(timedelta(hours=8))).strftime(format)