83 lines
2.9 KiB
Python
83 lines
2.9 KiB
Python
from datetime import datetime, timezone, timedelta
|
||
from decimal import Decimal
|
||
import re
|
||
import core.logger as logging
|
||
|
||
logger = logging.logger
|
||
|
||
def datetime_to_timestamp(date_str: str, is_utc: bool = False) -> int:
|
||
"""
|
||
将日期字符串(如 '2023-01-01 12:00:00')直接转换为毫秒级时间戳
|
||
:param date_str: 形如 '2023-01-01 12:00:00' 的日期时间字符串
|
||
:param is_utc: 是否为 UTC 时间;False 时按北京时间(UTC+8)解析
|
||
:return: 毫秒级时间戳
|
||
"""
|
||
if is_utc:
|
||
dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
|
||
else:
|
||
# 默认按北京时间(UTC+8)解析
|
||
dt = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone(timedelta(hours=8)))
|
||
return int(dt.timestamp() * 1000)
|
||
|
||
def timestamp_to_datetime(timestamp: int) -> str:
|
||
"""
|
||
将时间戳转换为日期字符串
|
||
请考虑日期字符串位于北京时区
|
||
:param timestamp: 以毫秒为单位的时间戳
|
||
:return: 形如 '2023-01-01 12:00:00' 的日期时间字符串
|
||
"""
|
||
dt = datetime.fromtimestamp(timestamp / 1000, timezone(timedelta(hours=8)))
|
||
return dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
def transform_data_type(data: dict):
|
||
"""
|
||
遍历字典,将所有Decimal类型的值转换为float类型
|
||
"""
|
||
for key, value in data.items():
|
||
if isinstance(value, Decimal):
|
||
data[key] = float(value)
|
||
return data
|
||
|
||
|
||
def check_date_time_format(date_time: str) -> str | None:
|
||
"""
|
||
检查日期时间格式是否正确
|
||
"""
|
||
if re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', date_time):
|
||
return date_time
|
||
elif re.match(r'^\d{4}-\d{2}-\d{2}$', date_time):
|
||
return date_time + " 00:00:00"
|
||
elif re.match(r'^\d{4}\d{2}\d{2}$', date_time):
|
||
return f"{date_time[0:4]}-{date_time[4:6]}-{date_time[6:8]} 00:00:00"
|
||
else:
|
||
return None
|
||
|
||
def transform_date_time_to_timestamp(date_time: int | str):
|
||
"""
|
||
将日期时间转换为毫秒级timestamp
|
||
"""
|
||
try:
|
||
# 判断是否就是timestamp整型数据
|
||
if isinstance(date_time, int):
|
||
date_time = date_time
|
||
# 判断是否为纯数字(UTC毫秒级timestamp)
|
||
elif date_time.isdigit():
|
||
date_time = int(date_time)
|
||
else:
|
||
date_time = check_date_time_format(date_time)
|
||
if date_time is None:
|
||
logger.error(f"日期时间格式错误: {date_time}")
|
||
return None
|
||
# 按北京时间字符串处理,转换为毫秒级timestamp
|
||
date_time = datetime_to_timestamp(date_time)
|
||
return date_time
|
||
except Exception as e:
|
||
logger.error(f"start参数解析失败: {e}")
|
||
return None
|
||
|
||
|
||
def get_current_date_time(format: str = "%Y-%m-%d %H:%M:%S") -> str:
|
||
"""
|
||
获取当前日期时间
|
||
"""
|
||
return datetime.now(timezone(timedelta(hours=8))).strftime(format) |