Compare commits

..

10 Commits

Author SHA1 Message Date
blade 18168010ce QUANT optimize 2025-10-09 11:29:00 +08:00
blade 11c6e25490 support quant by A market 2025-09-25 12:28:43 +08:00
blade f3b98bcc22 fix issue for 1m notification 2025-09-18 14:44:18 +08:00
blade adba888bb2 1. optimize binance ma quant strategy
2. support monitor 1m volume
2025-09-17 19:33:37 +08:00
blade 23e533ddf3 update code for date time format issue 2025-09-16 17:09:00 +08:00
blade f9f4874b7e update mysql configuration to server 2025-09-16 15:37:22 +08:00
blade 1963d2720b updae for datetime 2025-09-16 14:31:15 +08:00
blade e29b1cef51 add mysql install progressing 2025-09-15 17:42:49 +08:00
blade 7b132cddb2 support send btc-usdt isolately 2025-09-15 15:02:49 +08:00
blade ff2c35e1b3 optimize ma_break quant logic 2025-09-15 14:12:47 +08:00
47 changed files with 2605 additions and 638 deletions

View File

@ -1,25 +1,32 @@
import schedule
import time
import datetime
from core.utils import get_current_date_time
import core.logger as logging
import subprocess
import os
import sys
logger = logging.logger
# 定义要执行的任务
def run_script():
start_time = time.time()
logger.info(f"Executing script at: {datetime.datetime.now()}")
logger.info(f"Executing script at: {get_current_date_time()}")
output_file = r'./output/auto_schedule.txt'
with open(output_file, 'a') as f:
f.write(f"Task ran at {datetime.datetime.now()}\n")
python_path = r"D:\miniconda3\envs\okx\python.exe"
script_path = r"D:\python_projects\crypto_quant\monitor_schedule.py"
f.write(f"Task ran at {get_current_date_time()}\n")
current_dir = os.getcwd()
python_path = sys.executable
if current_dir.endswith('crypto_quant'):
script_path = r'./monitor_schedule.py'
elif current_dir.endswith(r'python_projects'):
script_path = f'{current_dir}/crypto_quant/monitor_schedule.py'
else:
script_path = f'{current_dir}/monitor_schedule.py'
subprocess.run([python_path, script_path])
end_time = time.time()
logger.info(f"Script execution time: {end_time - start_time} seconds")
# 设置每20秒运行一次
schedule.every(20).seconds.do(run_script)
schedule.every(10).seconds.do(run_script)
# 保持程序运行并检查调度
logger.info("Scheduler started. Press Ctrl+C to stop.")

View File

@ -1,20 +1,27 @@
import schedule
import time
import datetime
from core.utils import get_current_date_time
import core.logger as logging
import subprocess
import os
import sys
logger = logging.logger
# 定义要执行的任务
def run_script():
start_time = time.time()
logger.info(f"Executing script at: {datetime.datetime.now()}")
logger.info(f"Executing script at: {get_current_date_time()}")
output_file = r'./output/auto_schedule.txt'
with open(output_file, 'a') as f:
f.write(f"Task ran at {datetime.datetime.now()}\n")
python_path = r"D:\miniconda3\envs\okx\python.exe"
script_path = r"D:\python_projects\crypto_quant\huge_volume_main.py"
f.write(f"Task ran at {get_current_date_time()}\n")
python_path = sys.executable
current_dir = os.getcwd()
if current_dir.endswith('crypto_quant'):
script_path = r'./huge_volume_main.py'
elif current_dir.endswith(r'python_projects'):
script_path = f'{current_dir}/crypto_quant/huge_volume_main.py'
else:
script_path = f'{current_dir}/huge_volume_main.py'
subprocess.run([python_path, script_path])
end_time = time.time()
logger.info(f"Script execution time: {end_time - start_time} seconds")

View File

@ -12,6 +12,11 @@ SECRET_KEY = "F7AD69272FBF7C44E69CC110D2EDDB7A"
PASSPHRASE = "Bengbu!2001"
SANDBOX = False
# API_KEY = "12d911b6-f28a-4595-b177-70cb8fbdc369"
# SECRET_KEY = "C32DF88A975EE08631D0BE6B6B5A33E7"
# PASSPHRASE = "Bengbu@2001"
# SANDBOX = False
# 模拟盘API密钥配置
# API_KEY = "f309e789-3497-4ed3-896f-d18bdc4d9817"
# SECRET_KEY = "9152809391B110E2E647FDE12A37E96D"
@ -66,6 +71,28 @@ OKX_MONITOR_CONFIG = {
},
}
OKX_REALTIME_MONITOR_CONFIG = {
"volume_monitor": {
"symbols": [
"XCH-USDT",
"BTC-USDT",
"SOL-USDT",
"ETH-USDT",
"DOGE-USDT",
],
"bars": ["1m", "5m", "15m", "30m", "1H"],
"initial_date": "2025-05-15 00:00:00",
},
"price_monitor": {
"symbols": ["XCH-USDT"],
"bats": [
{"bar": "5m", "threshold": 0.025},
{"bar": "15m", "threshold": 0.5},
{"bar": "1H", "threshold": 0.1},
],
},
}
BINANCE_MONITOR_CONFIG = {
"volume_monitor": {
"symbols": [
@ -107,9 +134,47 @@ US_STOCK_MONITOR_CONFIG = {
}
}
A_STOCK_MONITOR_CONFIG = {
"volume_monitor": {
"symbols": [
"600276.SH",
"002714.SZ",
"600111.SH",
"603019.SH",
"600036.SH",
"300474.SZ",
"600519.SH",
"300750.SZ",
"000858.SZ",
"000651.SZ",
"000333.SZ",
"002230.SZ",
"300308.SZ",
"002475.SZ"
],
"bars": ["1D", "1W", "1M"],
"initial_date": "2015-01-01 00:00:00",
},
}
A_INDEX_MONITOR_CONFIG = {
"volume_monitor": {
"symbols": [
"000001.SH",
"399006.SZ",
"000300.SH",
"399001.SZ",
"000852.SH",
],
"bars": ["1D", "1W", "1M"],
"initial_date": "2015-01-01 00:00:00",
},
}
WINDOW_SIZE = {"window_sizes": [50, 80, 100, 120]}
BAR_THRESHOLD = {
"1m": 1000 * 60,
"5m": 1000 * 60 * 5,
"15m": 1000 * 60 * 15,
"30m": 1000 * 60 * 30,
@ -118,14 +183,33 @@ BAR_THRESHOLD = {
"1D": 1000 * 60 * 60 * 24,
}
MYSQL_CONFIG = {
"host": "localhost",
"port": 3306,
# COIN_MYSQL_CONFIG = {
# "host": "localhost",
# "port": 3306,
# "user": "xch",
# "password": "xch_okx_2025",
# "database": "okx",
# }
COIN_MYSQL_CONFIG = {
"host": "218.17.89.43",
"port": 11013,
"user": "xch",
"password": "xch_okx_2025",
"database": "okx",
}
WECHAT_CONFIG = {"key": "11e6f7ac-efa9-418a-904c-9325a9f5d324"}
A_MYSQL_CONFIG = {
"host": "43.139.95.249",
"port": 3306,
"user": "root",
"password": "bengbu_200!",
"database": "astock",
}
WECHAT_CONFIG = {
"general_key": "11e6f7ac-efa9-418a-904c-9325a9f5d324",
"btc_key": "529e135d-843b-43dc-8aca-677a860f4b4b",
}
ITICK_API_KEY = "dfd4bc0caed148d6bc03b960224754ffb5356349e389431f828702b3a27e8a2b"

View File

@ -7,7 +7,8 @@ from openpyxl import Workbook
from openpyxl.drawing.image import Image
from PIL import Image as PILImage
import core.logger as logging
from datetime import datetime
from datetime import datetime, timezone, timedelta
from core.utils import get_current_date_time
import pandas as pd
import os
import re
@ -403,7 +404,7 @@ class HugeVolumeChart:
}
"""
logger.info(f"输出Excel文件包含所有{chart_type}图表")
file_name = f"huge_volume_{chart_type}_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx"
file_name = f"huge_volume_{chart_type}_{get_current_date_time(format="%Y%m%d%H%M%S")}.xlsx"
file_path = os.path.join(self.output_folder, file_name)
# Create Excel file and worksheet

View File

@ -1,12 +1,17 @@
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from core.utils import get_current_date_time
from typing import Optional
import pandas as pd
import requests
import json
import okx.MarketData as Market
import okx.TradingData as TradingData
from core.utils import transform_date_time_to_timestamp, timestamp_to_datetime
from core.biz.market_data_from_itick import MarketDataFromItick
import core.logger as logging
import sys
logger = logging.logger
@ -23,9 +28,18 @@ class MarketData:
self.passphrase = passphrase
self.market_api = Market.MarketAPI(
api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
flag=self.flag
flag=self.flag,
)
self.is_us_stock = is_us_stock
# 当前操作系统是windows还是linux
self.is_windows = sys.platform == "win32"
if not self.is_us_stock:
# 如果当前操作系统是windows则is_binance为False否则为True
# 因为LINUX服务器目前访问不了欧易的API
if self.is_windows:
self.is_binance = False
else:
self.is_binance = True
# self.trade_api = TradingData.TradingDataAPI(
# api_key=api_key, api_secret_key=secret_key, passphrase=passphrase,
@ -48,11 +62,21 @@ class MarketData:
if end_time is None:
logger.error(f"end_time参数解析失败: {end_time}")
return None
response = self.get_realtime_candlesticks_from_api(symbol, bar, end_time, limit)
if self.is_binance:
if symbol == "XCH-USDT":
return None
response = self.get_realtime_candlesticks_from_binance(symbol, bar, end_time, limit)
else:
response = self.get_realtime_candlesticks_from_api(symbol, bar, end_time, limit)
if response:
candles = response["data"]
from_time = int(candles[-1][0])
to_time = int(candles[0][0])
if self.is_binance:
from_time = int(candles[0][0])
to_time = int(candles[-1][0])
else:
from_time = int(candles[-1][0])
to_time = int(candles[0][0])
from_time_str = pd.to_datetime(from_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
to_time_str = pd.to_datetime(to_time, unit='ms', utc=True).tz_convert('Asia/Shanghai')
logger.info(f"已获取{symbol}, 周期:{bar} {len(candles)} 条数据,从: {from_time_str} 到: {to_time_str}")
@ -68,7 +92,7 @@ class MarketData:
candles_pd['symbol'] = symbol
# 添加bar列内容为bar
candles_pd['bar'] = bar
candles_pd['create_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
candles_pd['create_time'] = get_current_date_time()
candles_pd = candles_pd[['symbol', 'bar', 'timestamp', 'date_time', 'open', 'high', 'low', 'close', 'volume', 'volCcy', 'volCCyQuote', 'create_time']]
candles_pd.sort_values('timestamp', inplace=True)
candles_pd.reset_index(drop=True, inplace=True)
@ -197,7 +221,7 @@ class MarketData:
df['symbol'] = symbol
# 添加bar列内容为bar
df['bar'] = bar
df['create_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
df['create_time'] = get_current_date_time()
if self.is_us_stock:
# 如果是美股数据则仅保留date_time_us字中在开盘时间内的数据即开盘时间为美国时间9:30到16:00
@ -280,6 +304,54 @@ class MarketData:
time.sleep(10)
return response
def get_realtime_candlesticks_from_binance(self, symbol, bar, end_time, limit):
# API 端点:币安现货 K 线数据
base_url = "https://api.binance.com"
endpoint = "/api/v3/klines"
params = {
"symbol": symbol.replace("-", ""),
"interval": bar,
"limit": limit
}
response = None
count = 0
data = None
while True:
try:
# 发送 GET 请求
response = requests.get(base_url + endpoint, params=params)
if response.status_code == 200:
data = response.json()
break
except Exception as e:
logger.error(f"请求出错: {e}")
count += 1
if count > 3:
break
time.sleep(10)
if data:
# 每条数据格式:[开盘时间, 开盘价, 最高价, 最低价, 收盘价, 成交量, 收盘时间, 报价成交量, 成交笔数, 主动买入成交量, 主动买入报价成交量, 忽略]
columns = ["timestamp", "open", "high", "low", "close", "volume", "volCcy", "volCCyQuote", "confirm"]
data_list = []
# [开盘时间, 开盘价, 最高价, 最低价, 收盘价, 成交量, 报价成交量, 是否收盘]
for record in data:
record = [
int(record[6]),
float(record[1]),
float(record[2]),
float(record[3]),
float(record[4]),
float(record[5]),
float(record[7]),
float(record[7]),
"1"
]
data_list.append(record)
return {"data": data_list}
else:
logger.warning(f"未获取到{symbol}, {bar} 最新数据,请稍后再试")
return None
def get_realtime_candlesticks_from_api(self, symbol, bar, end_time, limit):
response = None
count = 0

View File

@ -63,7 +63,7 @@ class MarketDataFromItick:
# 设置默认时间范围
if end_time is None:
end_time = int(datetime.now().strftime('%Y-%m-%d %H:%M:%S').timestamp())
end_time = int(datetime.now().timestamp())
if isinstance(end_time, str):
end_time = int(datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S').timestamp())
self.end_time = end_time

View File

@ -16,6 +16,7 @@ def create_metrics_report(
only_output_huge_volume: bool = False,
only_output_rise: bool = False,
now_datetime_str: str = None,
is_binance: bool = False,
):
"""
创建指标报告
@ -61,6 +62,8 @@ def create_metrics_report(
brief = f"{symbol} {bar} 量率: {volume_ratio} {change}: {pct_chg}% 异动 价: {close}"
else:
brief = f"{symbol} {bar} 量率: {volume_ratio} {change}: {pct_chg}% 价: {close}"
if is_binance:
brief += " 币安"
if huge_volume == 1:
contents.append(f"## {brief} 交易巨量报告")

View File

@ -49,6 +49,7 @@ import core.logger as logging
import pandas as pd
import numpy as np
import talib as tb
from typing import List, Tuple
from talib import MA_Type
logger = logging.logger
@ -306,6 +307,40 @@ class MetricsCalculation:
return data
def calculate_percentile_indicators(
self,
data: pd.DataFrame,
window_size: int = 50,
price_column: str = "close",
percentiles: List[Tuple[float, str]] = [(0.8, "80"), (0.2, "20"), (0.9, "90"), (0.1, "10")]
) -> pd.DataFrame:
"""
计算分位数指标
:param data: 数据DataFrame
:param window_size: 窗口大小
:param percentiles: 分位数配置列表格式为[(分位数, 名称后缀)]
:return: 包含分位数指标的DataFrame
"""
for percentile, suffix in percentiles:
# 计算分位数
data[f"{price_column}_{suffix}_percentile"] = (
data[price_column].rolling(window=window_size, min_periods=1).quantile(percentile)
)
# 判断价格是否达到分位数
if suffix in ["80", "90"]:
# 高点分位数
data[f"{price_column}_{suffix}_high"] = (
data[price_column] >= data[f"{price_column}_{suffix}_percentile"]
).astype(int)
else:
# 低点分位数
data[f"{price_column}_{suffix}_low"] = (
data[price_column] <= data[f"{price_column}_{suffix}_percentile"]
).astype(int)
return data
def update_macd_divergence_column(self, df: pd.DataFrame):
"""
更新整个DataFrame的macd_divergence列
@ -1217,3 +1252,159 @@ class MetricsCalculation:
avg_spacing = (spacing_5_10 + spacing_10_20 + spacing_20_30) / 3
return avg_spacing
def get_peaks_valleys_mean(self, data: pd.DataFrame):
"""计算上涨波峰和下跌波谷的均值与中位数"""
# 确保输入数据包含必要的列
if not all(col in data.columns for col in ["open", "high", "low", "close"]):
raise ValueError(
"DataFrame must contain 'open', 'high', 'low', 'close' columns"
)
if len(data) < 100:
return None, None
window = 5
# 初始化结果列表
peaks_valleys = []
# 检测波峰基于high价格
highs = data["high"]
for i in range(window, len(data) - window):
if i + window >= len(data):
break
# 当前K线的high价格
current_high = highs.iloc[i]
# 窗口内的前后K线的high价格
window_highs = highs.iloc[i - window : i + window + 1]
# 如果当前high是窗口内的最大值标记为波峰
if (
current_high == window_highs.max()
and current_high > highs.iloc[i - 1]
and current_high > highs.iloc[i + 1]
):
peaks_valleys.append(
{
"symbol": data.iloc[i]["symbol"],
"bar": data.iloc[i]["bar"],
"timestamp": data.iloc[i]["timestamp"],
"date_time": data.iloc[i]["date_time"],
"price": current_high,
"type": "peak",
}
)
# 检测波谷基于low价格
lows = data["low"]
for i in range(window, len(data) - window):
if i + window >= len(data):
break
# 当前K线的low价格
current_low = lows.iloc[i]
# 窗口内的前后K线的low价格
window_lows = lows.iloc[i - window : i + window + 1]
# 如果当前low是窗口内的最小值标记为波谷
if (
current_low == window_lows.min()
and current_low < lows.iloc[i - 1]
and current_low < lows.iloc[i + 1]
):
peaks_valleys.append(
{
"symbol": data.iloc[i]["symbol"],
"bar": data.iloc[i]["bar"],
"timestamp": data.iloc[i]["timestamp"],
"date_time": data.iloc[i]["date_time"],
"price": current_low,
"type": "valley",
}
)
# 转换为DataFrame并按时间排序
result_df = pd.DataFrame(peaks_valleys)
if not result_df.empty:
result_df = result_df.sort_values(by="timestamp").reset_index(drop=True)
else:
result_df = pd.DataFrame(
columns=["symbol", "timestamp", "date_time", "bar", "price", "type"]
)
# 检查result_df如果type为peak时下一条数据type依然为peak则删除当前数据
if not result_df.empty:
# 使用布尔索引来标记要删除的行
to_drop_peaks = []
handled_indexes = []
for i in range(len(result_df) - 1):
if i in handled_indexes:
continue
if result_df.iloc[i]["type"] == "peak":
current_peak_value = result_df.iloc[i]["price"]
current_peak_index = i
# 如type连续为peak只应该保留price最大的行删除其他行
# 如type连续为peak且存在price为8 7 10 9 8 11 10的情况只应该保留price为11的行
for j in range(i + 1, len(result_df)):
if result_df.iloc[j]["type"] == "peak":
next_peak_value = result_df.iloc[j]["price"]
if current_peak_value > next_peak_value:
to_drop_peaks.append(j)
else:
to_drop_peaks.append(current_peak_index)
current_peak_value = next_peak_value
current_peak_index = j
handled_indexes.append(j)
else:
break
# 删除标记的行
result_df = result_df.drop(to_drop_peaks).reset_index(drop=True)
# 如type连续为valley只应该保留price最小的行删除其他行
# 如type连续为valley且存在price为8 7 10 9 8的情况只应该保留price为7的行
to_drop_valleys = []
handled_indexes = []
for i in range(len(result_df) - 1):
if i in handled_indexes:
continue
if result_df.iloc[i]["type"] == "valley":
current_valley_value = result_df.iloc[i]["price"]
current_valley_index = i
for j in range(i + 1, len(result_df)):
if result_df.iloc[j]["type"] == "valley":
next_valley_value = result_df.iloc[j]["price"]
if current_valley_value < next_valley_value:
to_drop_valleys.append(j)
else:
to_drop_valleys.append(current_valley_index)
current_valley_value = next_valley_value
current_valley_index = j
handled_indexes.append(j)
else:
break
# 删除标记的行
result_df = result_df.drop(to_drop_valleys).reset_index(drop=True)
# 初始化价格变化列
result_df["price_change"] = 0.0
result_df["price_change_ratio"] = 0.0
# 计算下一条数据与当前数据之间的价格差,并计算价格差与当前数据价格的比率
peaks_mean = None
valleys_mean = None
if len(result_df) > 1:
for i in range(len(result_df) - 1):
result_df.iloc[i + 1, result_df.columns.get_loc("price_change")] = (
result_df.iloc[i + 1]["price"] - result_df.iloc[i]["price"]
)
result_df.iloc[
i + 1, result_df.columns.get_loc("price_change_ratio")
] = (
result_df.iloc[i + 1]["price_change"] / result_df.iloc[i]["price"]
) * 100
# peaks mean为result_df中price_change_ratio > 0的price_change_ratio的均值与中位数
peaks_mean = abs(float(result_df[result_df["price_change_ratio"] > 0]["price_change_ratio"].mean()))
peaks_median = abs(float(result_df[result_df["price_change_ratio"] > 0]["price_change_ratio"].median()))
# valleys mean为result_df中price_change_ratio < 0的price_change_ratio的均值与中位数
valleys_mean = abs(float(result_df[result_df["price_change_ratio"] < 0]["price_change_ratio"].mean()))
valleys_median = abs(float(result_df[result_df["price_change_ratio"] < 0]["price_change_ratio"].median()))
result = {"peaks_valleys_data": result_df, "peaks_mean": peaks_mean, "peaks_median": peaks_median, "valleys_mean": valleys_mean, "valleys_median": valleys_median}
return result

View File

@ -1,5 +1,6 @@
import time
from datetime import datetime
from datetime import datetime, timezone, timedelta
from core.utils import get_current_date_time
import core.logger as logging
from typing import Optional
import pandas as pd
@ -177,7 +178,7 @@ class QuantStrategy:
logger.info(f"开始运行{strategy}策略,间隔{interval}")
while True:
try:
logger.info(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(get_current_date_time())
try:
self.quant_trader.get_account_balance()
except Exception as e:

View File

@ -1,5 +1,6 @@
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from core.utils import get_current_date_time
from typing import Optional
import pandas as pd
import okx.MarketData as Market
@ -87,7 +88,7 @@ class TradeData:
df.rename(columns={"instId": "symbol"}, inplace=True)
df["date_time"] = df["ts"].apply(lambda x: timestamp_to_datetime(x))
df["tradeId"] = df["tradeId"].astype(str)
df["create_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
df["create_time"] = get_current_date_time()
df = df[["symbol", "ts", "date_time", "tradeId", "side", "sz", "px", "create_time"]]
self.db_trade_data.insert_data_to_mysql(df)

125
core/db/db_astock.py Normal file
View File

@ -0,0 +1,125 @@
import pandas as pd
from sqlalchemy import create_engine, exc, text
import re
from core.utils import get_current_date_time
import core.logger as logging
from core.utils import transform_data_type
logger = logging.logger
class DBAStockData:
def __init__(
self,
db_url: str,
):
self.db_url = db_url
self.db_engine = create_engine(
self.db_url,
pool_size=25, # 连接池大小
max_overflow=10, # 允许的最大溢出连接
pool_timeout=30, # 连接超时时间(秒)
pool_recycle=60, # 连接回收时间(秒),避免长时间闲置
)
def query_data(self, sql: str, condition_dict: dict, return_multi: bool = True):
"""
查询数据
:param sql: 查询SQL
:param db_url: 数据库连接URL
"""
try:
with self.db_engine.connect() as conn:
result = conn.execute(text(sql), condition_dict)
if return_multi:
result = result.fetchall()
if result:
result_list = [
transform_data_type(dict(row._mapping)) for row in result
]
return result_list
else:
return None
else:
result = result.fetchone()
if result:
result_dict = transform_data_type(dict(result._mapping))
return result_dict
else:
return None
except Exception as e:
logger.error(f"查询数据出错: {e}")
return None
def query_market_data_by_symbol_bar(
self,
symbol: str,
bar: str,
fields: list = None,
start: str = None,
end: str = None,
table_name: str = "index_daily_price_from_2021",
):
"""
根据交易对和K线周期查询数据
:param symbol: 交易对
:param bar: K线周期
:param fields: 字段列表
:param start: 开始时间
:param end: 结束时间
"""
if fields is None:
fields = ["*"]
fields_str = ", ".join(fields)
if table_name is None:
table_name = "index_daily_price_from_2021"
join_table = "all_index"
if table_name.startswith("index"):
join_table = "all_index"
else:
join_table = "all_stock"
if start is None and end is None:
sql = f"""
SELECT {fields_str} FROM {table_name} a
INNER JOIN {join_table} b ON a.ts_code = b.ts_code
WHERE a.ts_code = :symbol
ORDER BY a.trade_date ASC
"""
condition_dict = {"symbol": symbol}
else:
if start is not None and end is not None:
start = start.replace("-", "")
end = end.replace("-", "")
if start > end:
start, end = end, start
sql = f"""
SELECT {fields_str} FROM {table_name} a
INNER JOIN {join_table} b ON a.ts_code = b.ts_code
WHERE a.ts_code = :symbol AND a.trade_date BETWEEN :start AND :end
ORDER BY a.trade_date ASC
"""
condition_dict = {
"symbol": symbol,
"start": start,
"end": end,
}
elif start is not None:
start = start.replace("-", "")
sql = f"""
SELECT {fields_str} FROM {table_name} a
INNER JOIN {join_table} b ON a.ts_code = b.ts_code
WHERE a.ts_code = :symbol AND a.trade_date >= :start
ORDER BY a.trade_date ASC
"""
condition_dict = {"symbol": symbol, "start": start}
elif end is not None:
end = end.replace("-", "")
sql = f"""
SELECT {fields_str} FROM {table_name} a
INNER JOIN {join_table} b ON a.ts_code = b.ts_code
WHERE a.ts_code = :symbol AND a.trade_date <= :end
ORDER BY a.trade_date ASC
"""
condition_dict = {"symbol": symbol, "end": end}
return self.query_data(sql, condition_dict, return_multi=True)

View File

@ -480,6 +480,7 @@ class DBBinanceData:
fields: list = None,
start: str = None,
end: str = None,
table_name: str = "crypto_binance_data",
):
"""
根据交易对和K线周期查询数据
@ -494,7 +495,7 @@ class DBBinanceData:
fields_str = ", ".join(fields)
if start is None and end is None:
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp ASC
"""
@ -514,7 +515,7 @@ class DBBinanceData:
if start > end:
start, end = end, start
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end
ORDER BY timestamp ASC
"""
@ -526,14 +527,14 @@ class DBBinanceData:
}
elif start is not None:
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start}
elif end is not None:
sql = f"""
SELECT {fields_str} FROM crypto_binance_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end
ORDER BY timestamp ASC
"""

View File

@ -1,6 +1,7 @@
import pandas as pd
from sqlalchemy import create_engine, exc, text
import re, datetime
import re
from core.utils import get_current_date_time
import core.logger as logging
from core.utils import transform_data_type
@ -12,7 +13,7 @@ class DBData:
):
self.table_name = table_name
self.temp_table_name = (
f"temp_{table_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
f"temp_{table_name}_{get_current_date_time(format="%Y%m%d_%H%M%S")}"
)
self.columns = columns
if self.columns is None:

View File

@ -7,68 +7,65 @@ logger = logging.logger
class DBMarketData:
def __init__(
self,
db_url: str
):
def __init__(self, db_url: str):
self.db_url = db_url
self.table_name = "crypto_market_data"
self.columns = [
"symbol",
"bar",
"timestamp",
"date_time",
"date_time_us",
"open",
"high",
"low",
"close",
"pre_close",
"close_change",
"pct_chg",
"volume",
"volCcy",
"volCCyQuote",
"buy_sz",
"sell_sz",
# 技术指标字段
"ma1",
"ma2",
"dif",
"dea",
"macd",
"macd_signal",
"macd_divergence",
"kdj_k",
"kdj_d",
"kdj_j",
"kdj_signal",
"kdj_pattern",
"sar",
"sar_signal",
"ma5",
"ma10",
"ma20",
"ma30",
"ma_cross",
"ma5_close_diff",
"ma10_close_diff",
"ma20_close_diff",
"ma30_close_diff",
"ma_close_avg",
"ma_long_short",
"ma_divergence",
"rsi_14",
"rsi_signal",
"boll_upper",
"boll_middle",
"boll_lower",
"boll_signal",
"boll_pattern",
"k_length",
"k_shape",
"k_up_down",
"create_time",
"symbol",
"bar",
"timestamp",
"date_time",
"date_time_us",
"open",
"high",
"low",
"close",
"pre_close",
"close_change",
"pct_chg",
"volume",
"volCcy",
"volCCyQuote",
"buy_sz",
"sell_sz",
# 技术指标字段
"ma1",
"ma2",
"dif",
"dea",
"macd",
"macd_signal",
"macd_divergence",
"kdj_k",
"kdj_d",
"kdj_j",
"kdj_signal",
"kdj_pattern",
"sar",
"sar_signal",
"ma5",
"ma10",
"ma20",
"ma30",
"ma_cross",
"ma5_close_diff",
"ma10_close_diff",
"ma20_close_diff",
"ma30_close_diff",
"ma_close_avg",
"ma_long_short",
"ma_divergence",
"rsi_14",
"rsi_signal",
"boll_upper",
"boll_middle",
"boll_lower",
"boll_signal",
"boll_pattern",
"k_length",
"k_shape",
"k_up_down",
"create_time",
]
self.db_manager = DBData(db_url, self.table_name, self.columns)
@ -143,7 +140,9 @@ class DBMarketData:
condition_dict = {"symbol": symbol, "bar": bar}
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_data_before_timestamp(self, symbol: str, bar: str, timestamp: int, limit: int = 100):
def query_data_before_timestamp(
self, symbol: str, bar: str, timestamp: int, limit: int = 100
):
"""
根据时间戳查询之前的数据
:param symbol: 交易对
@ -157,7 +156,12 @@ class DBMarketData:
ORDER BY timestamp DESC
LIMIT :limit
"""
condition_dict = {"symbol": symbol, "bar": bar, "timestamp": timestamp, "limit": limit}
condition_dict = {
"symbol": symbol,
"bar": bar,
"timestamp": timestamp,
"limit": limit,
}
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def query_data_by_technical_indicators(
@ -170,7 +174,7 @@ class DBMarketData:
kdj_signal: str = None,
rsi_signal: str = None,
boll_signal: str = None,
ma_cross: str = None
ma_cross: str = None,
):
"""
根据技术指标查询数据
@ -230,7 +234,7 @@ class DBMarketData:
bar: str,
signal: str = None,
start: str = None,
end: str = None
end: str = None,
):
"""
查询MACD信号数据
@ -275,7 +279,7 @@ class DBMarketData:
signal: str = None,
pattern: str = None,
start: str = None,
end: str = None
end: str = None,
):
"""
查询KDJ信号数据
@ -325,7 +329,7 @@ class DBMarketData:
long_short: str = None,
divergence: str = None,
start: str = None,
end: str = None
end: str = None,
):
"""
查询均线信号数据
@ -378,7 +382,7 @@ class DBMarketData:
signal: str = None,
pattern: str = None,
start: str = None,
end: str = None
end: str = None,
):
"""
查询布林带信号数据
@ -421,11 +425,7 @@ class DBMarketData:
return self.db_manager.query_data(sql, condition_dict, return_multi=True)
def get_technical_statistics(
self,
symbol: str,
bar: str,
start: str = None,
end: str = None
self, symbol: str, bar: str, start: str = None, end: str = None
):
"""
获取技术指标统计信息
@ -473,7 +473,15 @@ class DBMarketData:
return self.db_manager.query_data(sql, condition_dict, return_multi=False)
def query_market_data_by_symbol_bar(self, symbol: str, bar: str, fields: list = None, start: str = None, end: str = None):
def query_market_data_by_symbol_bar(
self,
symbol: str,
bar: str,
fields: list = None,
start: str = None,
end: str = None,
table_name: str = "crypto_market_data",
):
"""
根据交易对和K线周期查询数据
:param symbol: 交易对
@ -487,7 +495,7 @@ class DBMarketData:
fields_str = ", ".join(fields)
if start is None and end is None:
sql = f"""
SELECT {fields_str} FROM crypto_market_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar
ORDER BY timestamp ASC
"""
@ -507,21 +515,26 @@ class DBMarketData:
if start > end:
start, end = end, start
sql = f"""
SELECT {fields_str} FROM crypto_market_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar AND timestamp BETWEEN :start AND :end
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start, "end": end}
condition_dict = {
"symbol": symbol,
"bar": bar,
"start": start,
"end": end,
}
elif start is not None:
sql = f"""
SELECT {fields_str} FROM crypto_market_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar AND timestamp >= :start
ORDER BY timestamp ASC
"""
condition_dict = {"symbol": symbol, "bar": bar, "start": start}
elif end is not None:
sql = f"""
SELECT {fields_str} FROM crypto_market_data
SELECT {fields_str} FROM {table_name}
WHERE symbol = :symbol AND bar = :bar AND timestamp <= :end
ORDER BY timestamp ASC
"""

View File

@ -12,11 +12,11 @@ class Logger:
# log文件存储路径
current_dir = os.getcwd()
if current_dir.endswith('crypto_quant'):
output_folder = r'./output/log/'
elif current_dir.startswith(r'/root/'):
output_folder = r'/root/crypto_quant/output/log/'
output_folder = f'{current_dir}/output/log/'
elif current_dir.endswith(r'python_projects'):
output_folder = f'{current_dir}/crypto_quant/output/log/'
else:
output_folder = r'./output/log/'
output_folder = f'{current_dir}/output/log/'
os.makedirs(output_folder, exist_ok=True)
# add self._log_filename to be adata_yyyyMMddHHmm.log
self._log_filename = os.path.join(output_folder, 'crypto_monitor_{}.log'.format(time.strftime("%Y%m%d%H%M%S", time.localtime())))

View File

@ -4,14 +4,15 @@ import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from datetime import datetime, timezone, timedelta
from core.utils import get_current_date_time
import re
from openpyxl import Workbook
from openpyxl.drawing.image import Image
import openpyxl
from openpyxl.styles import Font
from PIL import Image as PILImage
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from config import OKX_MONITOR_CONFIG, COIN_MYSQL_CONFIG, WINDOW_SIZE
from core.db.db_market_data import DBMarketData
from core.db.db_huge_volume_data import DBHugeVolumeData
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
@ -24,13 +25,13 @@ logger = logging.logger
class PriceVolumeStats:
def __init__(self):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_market_data = DBMarketData(self.db_url)
@ -41,7 +42,7 @@ class PriceVolumeStats:
self.initial_date = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-05-15 00:00:00"
)
self.end_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.end_date = get_current_date_time()
self.window_size = 100
self.stats_output_dir = "./output/statistics/excel/"
os.makedirs(self.stats_output_dir, exist_ok=True)

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -11,7 +11,7 @@ from openpyxl.drawing.image import Image
import openpyxl
from openpyxl.styles import Font
from PIL import Image as PILImage
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from config import OKX_MONITOR_CONFIG, COIN_MYSQL_CONFIG, WINDOW_SIZE
import core.logger as logging
from core.db.db_merge_market_huge_volume import DBMergeMarketHugeVolume
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
@ -24,13 +24,13 @@ logger = logging.logger
class MeanReversionSandbox:
def __init__(self, solution: str):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_merge_market_huge_volume = DBMergeMarketHugeVolume(self.db_url)

View File

@ -12,7 +12,7 @@ from PIL import Image as PILImage
from datetime import datetime, timedelta
import core.logger as logging
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from config import OKX_MONITOR_CONFIG, COIN_MYSQL_CONFIG, WINDOW_SIZE
from core.db.db_market_data import DBMarketData
from core.db.db_binance_data import DBBinanceData
from core.db.db_huge_volume_data import DBHugeVolumeData
@ -92,13 +92,13 @@ class ORBStrategy:
self.data = None # 存储K线数据
self.trades = [] # 存储交易记录
self.equity_curve = None # 存储账户净值曲线
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.is_us_stock = is_us_stock
self.is_binance = is_binance

View File

@ -74,3 +74,10 @@ def transform_date_time_to_timestamp(date_time: int | str):
except Exception as e:
logger.error(f"start参数解析失败: {e}")
return None
def get_current_date_time(format: str = "%Y-%m-%d %H:%M:%S") -> str:
"""
获取当前日期时间
"""
return datetime.now(timezone(timedelta(hours=8))).strftime(format)

View File

@ -6,15 +6,14 @@
"""
import core.logger as logging
import requests
from config import WECHAT_CONFIG
logger = logging.logger
class Wechat:
def __init__(self):
def __init__(self, key: str):
# 虽然config在根目录但是取决于调用代码在哪
# 只要启动代码文件在根目录config就能找到
self.key = WECHAT_CONFIG["key"]
self.key = key
self.url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.key}"
def send_text(self, text: str):

View File

@ -0,0 +1,62 @@
- core/biz/huge_volume.py
- 作用: 放量(巨量)检测与后续涨跌统计的核心逻辑。
- 要点:
- 基于滑窗计算 volume_ma、volume_std、volume_threshold=均值+N倍标准差生成 huge_volume、volume_ratio、spike_intensity。
- 可选价格分位检查:对 close/high/low 计算 80/20、90/10 分位并标注高低点命中位。
- next_periods_rise_or_fall: 以未来 N 周期涨跌结果做分组统计,输出明细与汇总。
- core/biz/huge_volume_chart.py
- 作用: 将“巨量后走势统计”数据绘图(热力图/折线图)。
- 要点:
- 输入统计 DataFrame支持是否包含热力图/折线图。
- 被 `huge_volume_main.plot_huge_volume_data` 调用,输出到 `./output/huge_volume_statistics/`
- core/biz/market_data.py
- 作用: 行情获取统一封装OKX 为主Linux 环境下支持 Binance且支持美股模式
- 要点:
- get_realtime_kline_data / get_historical_kline_data统一返回 DataFrame`timestamp/date_time/date_time_us/symbol/bar/...`,自动数值化与排序。
- 历史数据分页向后抓取,含时间边界、去重与交易时段过滤(美股)。
- 提供 `get_realtime_candlesticks_from_binance/okx` 与基本 trade 聚合辅助buy_sz/sell_sz 预留)。
- core/biz/market_data_from_itick.py
- 作用: 从 iTick 等源拉取美股K线`market_data_from_itick_main.py` 使用)。
- 要点:
- 封装美股数据下载,适配 `MarketData` 历史数据流程,统一列结构。
- core/biz/market_monitor.py
- 作用: 实时监控报表生成面向企业微信推送的Markdown文案
- 要点:
- create_metrics_report: 基于一根最新K线及全量数据汇总价量、分位、MACD/KDJ/RSI/BOLL、均线多空/发散等信号,生成可读文本。
- get_last_huge_volume_record: 最近一次巨量回溯与十周期内巨量次数。
- get_long_short_over_buy_sell: 跨周期或对标BTC的多空/超买超卖对比说明。
- 依赖 `METRICS_CONFIG` 的权重与阈值映射。
- core/biz/metrics_calculation.py
- 作用: 技术指标与形态计算的总入口。
- 要点:
- 指标: pre_close/pct_chg、MACD(含金叉死叉)、KDJ(K/D/J+信号)、RSI、BOLL(上下轨与形态)、SAR(多/空/观望)。
- 均线: ma5/10/20/30、交叉组合信号、价格-均线相对百分比、`ma_long_short`(多/空/震荡)与 `ma_divergence`(发散/粘合等提供多策略判定weighted_voting/trend_strength/ma_alignment/statistical/hybrid。
- K线形态: k_length短/中/长/超长、k_shape吊锤线、倒T、十字星、超大实体、光头光脚等基于统计分布和Z-score自适应阈值。
- MACD 背离: 标准版与滑窗版两套检测。
- core/biz/quant_trader.py
- 作用: OKX 交易封装(账户、下单、行情、公共数据)。
- 要点:
- 余额查询USDT/现货币/合约、当前价格、K线拉取。
- 现货市价单买卖;合约侧设置杠杆、开空(卖出)与平空(买入)流程。
- 计算合约所需保证金与推荐保证金(含缓冲比例)。
- core/biz/strategy.py
- 作用: 策略抽象/占位(被 `trade_main.py` 中的 `QuantStrategy` 引用)。
- 要点:
- 用于承载策略接口或具体策略实现(与 `quant_trader` 协作下单)。
- core/biz/trade_data.py
- 作用: 交易明细获取与存储封装(供 `TradeDataMain` 使用)。
- 要点:
- 负责对接交易API、落库与查询支持按时间段增量补齐。
- core/biz/market_data_from_itick.py若存在
- 作用: iTick 美股数据源适配器。
- 要点:
- 输出结构与 OKX/Binance 对齐,便于统一后续指标/巨量检测流程。

View File

@ -0,0 +1,78 @@
- market_monitor_main.py
- 功能: 实时监控市场K线优先OKXLinux下可切换Binance计算技术指标与巨量信号生成监控报告并推送企业微信。
- 要点:
- 直接API拉取最近K线不访问DB以保证速度。
- 固定滑窗window_size=100实时判定huge_volume与价格分位异常。
- 通过最新时间戳与本地记录去重,避免重复推送。
- 支持过滤条件:仅巨量、仅超过均量、仅上涨。
- 依赖 Wechat 推送、DBMarketMonitor 记录、OKX_REALTIME_MONITOR_CONFIG 配置。
- trade_ma_strategy_main.py
- 功能: 批量运行“均线突破/相关”策略的统计回测主要是MACD命名策略
- 要点:
- 入口类 TradeMaStrategyMain -> MaBreakStatistics 批量统计。
- 聚合多个策略结果输出合并的资金曲线/收益数据。
- 可配置是否美股/是否Binance、佣金参数等。
- trade_sandbox_main.py
- 功能: 均值回归策略沙盒回测批量跑不同方案并输出Excel和图表。
- 要点:
- 批量维度symbols × bars × solutions。
- 统计指标:止盈/止损次数与占比、收益分布、均值等,按 `symbol, bar` 分组。
- 自动绘制2×2面板图不同bar嵌入Excel文件。
- 可仅跑5m也可多周期。
- market_data_from_itick_main.py
- 功能: 按时间段分片下载美股/ETF数据示例使用AlphaVantage类命名处理并保存CSV展示统计。
- 要点:
- 配置symbol/interval/分段天数,降低单次请求压力。
- 下载→处理→保存→打印统计的串行流程。
- 作为离线数据拉取脚本模版使用。
- auto_schedule.py
- 功能: 简易定时调度器,周期性运行 `huge_volume_main.py`
- 要点:
- 使用 schedule 每小时执行一次;记录执行时间与耗时。
- 兼容不同当前工作目录定位脚本路径。
- 适合本地常驻调度。
- auto_update_market_data.py
- 功能: 同上,定时运行 `huge_volume_main.py`(与 auto_schedule.py 功能基本一致)。
- 要点:
- 同样是每小时执行,日志与输出一致。
- 可按需要二选一保留,避免重复。
- update_data_main.py
- 功能: 批量更新数据库中行情数据的技术指标与美东时间字段。
- 要点:
- 从DB读取全量数据→按timestamp排序→更新 `date_time_us``SAR` 指标→回写DB。
- 支持美股/加密两套symbols与bars配置。
- 严格校验MySQL配置是否存在密码。
- trade_data_main.py
- 功能: 交易明细拉取与补齐API与DB结合返回时间段内整理过的交易数据。
- 要点:
- 依据DB现有最早/最新时间决定是否调用API补齐前段或后段。
- 默认时间范围从配置初始时间到当前最终结果从DB聚合、排序、去重。
- 依赖 TradeData 实现API交互与DB写入。
- statistics_main.py
- 功能: 批量价格/成交量统计。
- 要点:
- 调用 PriceVolumeStats 批处理,返回价格统计、成交量统计与联动统计结果。
- 用作一次性统计入口,便于离线分析。
- trade_main.py
- 功能: 交易流程演示脚本(三段式示例:开空→现货卖出→平空)。
- 要点:
- 封装 QuantTrader对接实盘/模拟由配置SANDBOX决定
- 展示下单参数:逐仓/全仓、张数、杠杆、缓冲比例;以及余额检查、下单、平仓流程。
- 以日志形式串联完整交易生命周期适合作为交易API联通性验证与流程Demo。
- huge_volume_main.py
- 功能: 巨量成交检测与统计分析的核心入口OKX与Binance均支持
- 要点:
- 从行情表读取K线计算滑窗放量、价格分位等支持初始化、按窗口增量更新。
- Binance 支持CSV历史导入导入后联动更新巨量表。
- 提供后续N周期涨跌统计、Excel导出与可视化以及企业微信推送过滤如volume_ratio>10且极值价位
- 多窗口50/80/100/120与多周期1m~1D批量处理能力MySQL落库去重。

View File

@ -0,0 +1,23 @@
- ma_break_statistics.py
- 功能: 统计“均线突破”后的收益表现,批量跑不同标的/周期,生成统计与图表/Excel。
- 要点:
- 数据源可切换OKX/Binance/美股;时间范围从配置读取。
- 关注 MA5/10/20/30 多组“上穿/下穿”组合,度量突破后的区间收益。
- 可配置手续费率,输出多策略合并结果,落地到指定 output 目录。
- 与数据库类 `DBMarketData/DBBinanceData/DBHugeVolumeData` 协同读取K线、过滤区间。
- mean_reversion_sandbox.py
- 功能: 均值回归策略沙盒(回测器),按多种“买入/止损/止盈方案”跑批评估,生成统计+图表+Excel。
- 要点:
- 条件以“价格分位+巨量”触发为主:如 close_10_low=1 或 close_80/90_high=1 且近2根K线任一巨量。
- 多个方案solution_1/2/3策略化定义止盈可用波段中位数、分位、高位形态等
- 读自合并视图 `DBMergeMarketHugeVolume`(含价量与巨量事件),统一回测窗口与分组汇总。
- 自动出图seaborn/matplotlib并将图贴入 Excel结果分 symbol×bar 汇总。
- orb_trade.py
- 功能: ORBOpening Range Breakout日内策略回测与可视化。
- 要点:
- 以开盘第一根5分钟K线的高低High1/Low1作为区间第二根K线产生多空信号入场价=第二根开盘价,止损价=第一根极值;盈亏基于 $Rentry-stop
- 支持参数:账户初始资金、最大杠杆、单笔风险比例、佣金、盈利目标倍数、仅做多/仅做空/双向、是否参考 SAR、是否参考 1H 形态等。
- 数据获取两路优先本地DBOKX/Binance也提供 yfinance 拉取美股数据的流程;自动调整初始资金规模以适配价格量级。
- 回测输出交易清单、资金曲线生成图表与Excel摘要到 output 目录。

View File

@ -4,18 +4,18 @@ from core.db.db_market_data import DBMarketData
from core.db.db_huge_volume_data import DBHugeVolumeData
from core.db.db_binance_data import DBBinanceData
from core.db.db_binance_huge_volume_data import DBBinanceHugeVolumeData
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp, get_current_date_time
from market_data_main import MarketDataMain
from core.wechat import Wechat
import core.logger as logging
from config import (
OKX_MONITOR_CONFIG,
US_STOCK_MONITOR_CONFIG,
MYSQL_CONFIG,
COIN_MYSQL_CONFIG,
WINDOW_SIZE,
BINANCE_MONITOR_CONFIG,
)
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import pandas as pd
import os
import re
@ -30,13 +30,13 @@ class HugeVolumeMain:
is_us_stock: bool = False,
is_binance: bool = False,
):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.huge_volume = HugeVolume()
@ -108,12 +108,12 @@ class HugeVolumeMain:
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end = get_current_date_time()
logger.info(
f"开始处理巨量交易数据: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
)
data = self.db_market_data.query_market_data_by_symbol_bar(
symbol, bar, start, end
symbol=symbol, bar=bar, start=start, end=end
)
if data is None:
logger.warning(
@ -344,7 +344,7 @@ class HugeVolumeMain:
is_update=True,
)
logger.info(
f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size}{earliest_date_time}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
f"更新巨量交易数据: {symbol} {bar} 窗口大小: {window_size}{earliest_date_time}{get_current_date_time()}"
)
if data is not None and len(data) > 0:
logger.info(f"此次更新巨量交易数据: {len(data)}")
@ -352,7 +352,7 @@ class HugeVolumeMain:
logger.info(f"此次更新巨量交易数据为空")
except Exception as e:
logger.error(
f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}"
f"更新巨量交易数据失败: {symbol} {bar} 窗口大小: {window_size}{get_current_date_time()}: {e}"
)
def get_seconds_by_bar(self, bar: str):
@ -413,7 +413,7 @@ class HugeVolumeMain:
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end = get_current_date_time()
periods_text = ", ".join([str(period) for period in periods])
logger.info(
f"开始计算巨量出现后,之后{periods_text}个周期,上涨或下跌的比例: {symbol} {bar} 窗口大小: {window_size}{start}{end}"
@ -473,7 +473,7 @@ class HugeVolumeMain:
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end = get_current_date_time()
start_timestamp = transform_date_time_to_timestamp(start)
end_timestamp = transform_date_time_to_timestamp(end)
@ -582,7 +582,7 @@ class HugeVolumeMain:
"initial_date", "2025-05-01 00:00:00"
)
if end is None:
end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end = get_current_date_time()
huge_volume_data_list = []
result_data_list = []
window_size_list = WINDOW_SIZE.get("window_sizes", None)
@ -606,7 +606,7 @@ class HugeVolumeMain:
if output_excel:
total_huge_volume_data = total_huge_volume_data.reset_index(drop=True)
total_result_data = total_result_data.reset_index(drop=True)
current_date = datetime.now().strftime("%Y%m%d%H%M%S")
current_date = get_current_date_time()
file_name = f"next_periods_rise_or_fall_{current_date}.xlsx"
try:
with pd.ExcelWriter(
@ -687,18 +687,18 @@ def test_import_binance_data_by_csv():
def test_send_huge_volume_data_to_wechat():
huge_volume_main = HugeVolumeMain(threshold=2.0)
# 获得昨天日期
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
yesterday = (datetime.now(timezone(timedelta(hours=8))) - timedelta(days=1)).strftime("%Y-%m-%d")
logger.info(f"昨天日期: {yesterday}")
# 获得今天日期
today = datetime.now().strftime("%Y-%m-%d")
today = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d")
logger.info(f"今天日期: {today}")
huge_volume_main.send_huge_volume_data_to_wechat(start=yesterday, end=today)
if __name__ == "__main__":
# test_import_binance_data_by_csv()
batch_import_binance_data_by_csv()
# batch_update_volume_spike(threshold=2.0, is_us_stock=True)
# batch_import_binance_data_by_csv()
batch_update_volume_spike(threshold=2.0, is_us_stock=False)
# test_send_huge_volume_data_to_wechat()
# batch_initial_detect_volume_spike(threshold=2.0)

View File

@ -2,8 +2,7 @@ import requests
import core.logger as logging
from datetime import datetime, timedelta
from futu import KLType
from core.utils import get_current_date_time
logger = logging.logger
@ -41,7 +40,7 @@ def main():
logger.info(f"成功下载 {len(processed_data)} 条数据")
# 保存数据
filename = f"{symbol}_{interval}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
filename = f"{symbol}_{interval}_{get_current_date_time(format="%Y%m%d%H%M%S")}.csv"
market_data_from_futu.save_to_csv(filename)
# 显示数据统计

View File

@ -1,5 +1,6 @@
import core.logger as logging
from datetime import datetime, timedelta, timezone
from core.utils import get_current_date_time
from time import sleep
import pandas as pd
from core.biz.market_data import MarketData
@ -20,7 +21,7 @@ from config import (
OKX_MONITOR_CONFIG,
BINANCE_MONITOR_CONFIG,
US_STOCK_MONITOR_CONFIG,
MYSQL_CONFIG,
COIN_MYSQL_CONFIG,
BAR_THRESHOLD,
)
@ -66,13 +67,13 @@ class MarketDataMain:
self.initial_date = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"initial_date", "2025-07-01 00:00:00"
)
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
if is_binance:
@ -107,7 +108,7 @@ class MarketDataMain:
"""
获取保存数据
"""
end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end_time = get_current_date_time()
end_time_ts = transform_date_time_to_timestamp(end_time)
if end_time_ts is None:
logger.error(f"结束时间格式错误: {end_time}")
@ -225,7 +226,7 @@ class MarketDataMain:
data['date_time'] = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S')
dt_us_series = pd.to_datetime(data['timestamp'].astype(int), unit='ms', utc=True, errors='coerce').dt.tz_convert('America/New_York')
data['date_time_us'] = dt_us_series.dt.strftime('%Y-%m-%d %H:%M:%S')
data['create_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
data['create_time'] = get_current_date_time()
data["date_time"] = data["date_time"].astype(str)
data["date_time_us"] = data["date_time_us"].astype(str)

View File

@ -4,13 +4,14 @@ from huge_volume_main import HugeVolumeMain
from core.biz.market_monitor import create_metrics_report
from core.db.db_market_monitor import DBMarketMonitor
from core.wechat import Wechat
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG
from config import OKX_MONITOR_CONFIG, OKX_REALTIME_MONITOR_CONFIG, COIN_MYSQL_CONFIG, WECHAT_CONFIG
from core.utils import timestamp_to_datetime, transform_date_time_to_timestamp
import core.logger as logging
import os
import sys
import pandas as pd
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import json
import re
@ -20,7 +21,6 @@ class MarketMonitorMain:
def __init__(self):
self.market_data_main = MarketDataMain()
self.huge_volume_main = HugeVolumeMain()
self.wechat = Wechat()
self.monitor_config = OKX_MONITOR_CONFIG
self.window_size = 100
self.start_date = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
@ -31,18 +31,35 @@ class MarketMonitorMain:
self.output_folder = "./output/report/market_monitor/"
os.makedirs(self.output_folder, exist_ok=True)
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_market_monitor = DBMarketMonitor(self.db_url)
# 当前操作系统是windows还是linux
self.is_windows = sys.platform == "win32"
# 如果当前操作系统是windows则is_binance为False否则为True
# 因为LINUX服务器目前访问不了欧易的API
if self.is_windows:
self.is_binance = False
else:
self.is_binance = True
self.symbols = OKX_REALTIME_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["XCH-USDT"]
)
self.bars = OKX_REALTIME_MONITOR_CONFIG.get("volume_monitor", {}).get(
"bars", ["1m", "5m", "15m", "30m", "1H"]
)
def get_latest_record(self):
"""
获取最新记录
@ -72,8 +89,8 @@ class MarketMonitorMain:
监控最新市场数据
考虑到速度暂不与数据库交互直接从api获取数据
"""
# 获得当前时间字符串
now_datetime = datetime.now()
# 获得当前时间字符串,转为北京时间
now_datetime = datetime.now(timezone(timedelta(hours=8)))
now_datetime_str = now_datetime.strftime("%Y-%m-%d %H:%M:%S")
end_time = transform_date_time_to_timestamp(now_datetime_str)
real_time_data = self.market_data_main.market_data.get_realtime_kline_data(
@ -175,11 +192,16 @@ class MarketMonitorMain:
only_output_huge_volume,
only_output_rise,
now_datetime_str,
self.is_binance,
)
text_length = len(report.encode("utf-8"))
logger.info(f"发送报告到企业微信,字节数: {text_length}")
self.wechat.send_markdown(report)
if symbol == "BTC-USDT":
wechat = Wechat(WECHAT_CONFIG["btc_key"])
else:
wechat = Wechat(WECHAT_CONFIG["general_key"])
wechat.send_markdown(report)
# remove punction in latest_reatime_datetime
file_datetime = re.sub(r"[\:\-\s]", "", latest_reatime_datetime)
@ -238,13 +260,13 @@ class MarketMonitorMain:
获取下一个长周期实时数据
"""
if next:
# 获得bar在self.market_data_main.bars中的索引
bar_index = self.market_data_main.bars.index(bar)
if bar_index == len(self.market_data_main.bars) - 1:
# 获得bar在self.bars中的索引
bar_index = self.bars.index(bar)
if bar_index == len(self.bars) - 1:
logger.error(f"已经是最后一个bar: {bar}")
return None
# 获得下一个bar
bar = self.market_data_main.bars[bar_index + 1]
bar = self.bars[bar_index + 1]
# 获得下一个bar的实时数据
data = self.market_data_main.market_data.get_realtime_kline_data(
symbol=symbol, bar=bar, end_time=end_time, limit=100
@ -264,8 +286,11 @@ class MarketMonitorMain:
only_output_over_mean_volume: bool = True,
only_output_rise: bool = True,
):
for symbol in self.market_data_main.symbols:
for bar in self.market_data_main.bars:
for symbol in self.symbols:
if self.is_binance and symbol == "XCH-USDT":
logger.info(f"币安交易所无: {symbol}")
continue
for bar in self.bars:
logger.info(
f"开始监控: {symbol} {bar} 窗口大小: {self.window_size} 行情数据"
)
@ -287,8 +312,8 @@ class MarketMonitorMain:
if __name__ == "__main__":
market_monitor_main = MarketMonitorMain()
market_monitor_main.monitor_realtime_market(
symbol="PUMP-USDT",
bar="5m",
symbol="SOL-USDT",
bar="1m",
only_output_huge_volume=False,
only_output_rise=False,
)

View File

@ -1,7 +1,8 @@
from core.trade.orb_trade import ORBStrategy
from config import US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG, BINANCE_MONITOR_CONFIG
import core.logger as logging
from datetime import datetime
from datetime import datetime, timezone, timedelta
from core.utils import get_current_date_time
from openpyxl import Workbook
from openpyxl.drawing.image import Image
import openpyxl
@ -27,12 +28,21 @@ def main():
start_date = start_date[:10]
else:
start_date = "2024-01-01"
end_date = datetime.now().strftime("%Y-%m-%d")
end_date = get_current_date_time()
# 原值 盈利目标倍数默认10倍$R即10R
profit_target_multiple = 10
# 新值 盈利目标倍数默认20倍$R即10R -- 20250909
# profit_target_multiple = 20
initial_capital = 25000
max_leverage = 4
risk_per_trade = 0.01
commission_per_share = 0.0005
# if is_us_stock:
# commission_per_share = 0.0005
# else:
# commission_per_share = 0
# commission_per_share = 0
trades_df_list = []
trades_summary_df_list = []
@ -47,6 +57,7 @@ def main():
symbols = US_STOCK_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["QQQ"]
)
commission_per_share = 0.0005
else:
if is_binance:
symbols = BINANCE_MONITOR_CONFIG.get("volume_monitor", {}).get(
@ -56,6 +67,7 @@ def main():
symbols = OKX_MONITOR_CONFIG.get("volume_monitor", {}).get(
"symbols", ["BTC-USDT"]
)
commission_per_share = 0
for symbol in symbols:
logger.info(
f"开始回测 {symbol}, 交易周期:{bar}, 开始日期:{start_date}, 结束日期:{end_date}, 是否是美股:{is_us_stock}, 交易方向:{direction}, 是否使用SAR:{by_sar}, 是否使用R为entry减stop:{price_range_mean_as_R}, 是否使用K线实体过50%:{by_big_k}"
@ -109,7 +121,7 @@ def main():
statitics_dict = statistics_summary(total_trades_summary_df)
output_excel_folder = r"./output/trade_sandbox/orb_strategy/excel/summary/"
os.makedirs(output_excel_folder, exist_ok=True)
now_str = datetime.now().strftime("%Y%m%d%H%M%S")
now_str = get_current_date_time(format="%Y%m%d%H%M%S")
excel_file_name = f"orb_strategy_summary_{now_str}.xlsx"
output_file_path = os.path.join(output_excel_folder, excel_file_name)
with pd.ExcelWriter(output_file_path) as writer:

12
play.py
View File

@ -2,7 +2,7 @@ import logging
from core.biz.quant_trader import QuantTrader
from core.biz.strategy import QuantStrategy
from config import MYSQL_CONFIG
from config import COIN_MYSQL_CONFIG
from sqlalchemy import create_engine, exc, text
import pandas as pd
@ -100,13 +100,13 @@ def main() -> None:
def test_query():
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
db_engine = create_engine(
db_url,

View File

@ -10,3 +10,5 @@ schedule >= 1.2.2
xlsxwriter >= 3.2.5
openpyxl >= 3.1.5
cryptography >= 3.4.8
mplfinance
schedule

4
sql/backup_restore.txt Normal file
View File

@ -0,0 +1,4 @@
mysqldump -u xch -p --single-transaction --quick --routines --triggers --events --default-character-set=utf8mb4 okx | gzip > ./okx_backup_20250915.sql.gz
mysql -u xch -p -e "CREATE DATABASE okx;"
gunzip < ./okx_backup_20250915.sql.gz | mysql -u xch -p okx

236
sql/mysql_install.md Normal file
View File

@ -0,0 +1,236 @@
# MySql安装
以下是针对 Ubuntu 系统,在 `/mnt/0/mysql/` 目录下正确安装 MySQL 并解决权限和路径问题的完整步骤:
### 步骤 1准备目录与依赖
```bash
# 创建安装主目录及子目录保持MySQL原生结构
sudo mkdir -p /mnt/0/mysql/{data,log,conf}
sudo chown -R $USER:$USER /mnt/0/mysql/
sudo chmod -R 755 /mnt/0/mysql/
# 安装Ubuntu必需依赖
sudo apt update
sudo apt install -y libaio1 libncurses6
```
### 步骤 2下载并正确放置MySQL二进制包
```bash
# 下载MySQL 8.0二进制包以8.0.36为例,可替换最新版本)
wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.36-linux-glibc2.28-x86_64.tar.xz
# 解压到当前目录
tar -xvf mysql-8.0.36-linux-glibc2.28-x86_64.tar.xz
# 关键步骤:将解压后的所有文件直接移动到/mnt/0/mysql/(保持原生目录结构)
# 注意解压目录中的bin、share等文件夹将直接成为/mnt/0/mysql/的子目录
mv mysql-8.0.36-linux-glibc2.28-x86_64/* /mnt/0/mysql/
```
此时目录结构应为(正确结构):
```
/mnt/0/mysql/
├── bin/ # MySQL可执行文件mysqld、mysql等
├── share/ # 错误消息、字符集文件包含errmsg.sys
├── lib/ # 依赖库
├── data/ # 数据存储目录(后续使用)
├── conf/ # 配置文件目录
└── log/ # 日志目录
```
### 步骤 3创建MySQL配置文件
```bash
nano /mnt/0/mysql/conf/my.cnf
```
添加以下配置(路径需与实际目录结构匹配):
```ini
[mysqld]
# 安装主目录(直接指向/mnt/0/mysql
basedir = /mnt/0/mysql
# 数据目录
datadir = /mnt/0/mysql/data
# socket文件路径
socket = /mnt/0/mysql/mysql.sock
# 错误日志
log-error = /mnt/0/mysql/log/error.log
# PID文件
pid-file = /mnt/0/mysql/mysql.pid
port = 3306
user = mysql
# 允许远程连接(可选,生产环境谨慎开启)
bind-address = 0.0.0.0
```
保存退出(`Ctrl+O` → 回车 → `Ctrl+X`)。
### 步骤 4创建MySQL系统用户并修复权限链
```bash
# 创建mysql用户组和用户禁止登录shell
sudo groupadd mysql
sudo useradd -r -g mysql -s /bin/false mysql
# 关键:修复权限链(包括父目录)
sudo chmod 755 /mnt/0 # 确保父目录可访问
sudo chown -R mysql:mysql /mnt/0/mysql/ # 递归设置所有权
sudo chmod 700 /mnt/0/mysql/data/ # 数据目录权限(仅所有者可访问)
```
### 步骤 5初始化MySQL数据库解决路径和权限问题
```bash
# 清除可能的残留文件(若之前初始化失败)
sudo rm -rf /mnt/0/mysql/data/*
# 执行初始化(路径参数必须正确)
sudo /mnt/0/mysql/bin/mysqld --initialize --user=mysql \
--basedir=/mnt/0/mysql \
--datadir=/mnt/0/mysql/data
```
初始化成功后记录临时root密码
```bash
cat /mnt/0/mysql/log/error.log | grep 'temporary password'
```
### 步骤 6配置环境变量可选
```bash
echo 'export PATH=$PATH:/mnt/0/mysql/bin' >> ~/.bashrc
source ~/.bashrc
```
### 步骤 7注册为systemd服务
```bash
sudo nano /etc/systemd/system/mysql.service
```
添加以下内容:
```ini
[Unit]
Description=MySQL Server
After=network.target
[Service]
User=mysql
Group=mysql
ExecStart=/mnt/0/mysql/bin/mysqld --defaults-file=/mnt/0/mysql/conf/my.cnf
Restart=always
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
```
刷新配置并启动:
```bash
sudo systemctl daemon-reload
sudo systemctl start mysql
sudo systemctl enable mysql # 开机自启
```
### 步骤 8验证安装
```bash
# 检查服务状态
sudo systemctl status mysql # 应显示active (running)
# 登录MySQL验证
mysql -u root -p -S /mnt/0/mysql/mysql.sock
# 输入临时密码后修改密码
ALTER USER 'root'@'localhost' IDENTIFIED BY 'NewPassword@123';
FLUSH PRIVILEGES;
```
```bash
ln -s /mnt/0/mysql/mysql.sock /tmp/mysql.sock
chown mysql:mysql /tmp/mysql.sock
```
### 关键修复点说明
1. **目录结构修正**将MySQL解压文件直接放在 `/mnt/0/mysql/` 下,避免多嵌套一层 `bin` 目录,确保 `share/errmsg.sys` 等核心文件能被正确找到。
2. **权限链修复**:不仅设置 `/mnt/0/mysql/` 的权限,还确保父目录 `/mnt/0` 有执行权限(`755`),解决 "Permission denied" 问题。
3. **路径参数修正**`--basedir` 指向 `/mnt/0/mysql`主目录而非子目录与MySQL原生目录结构匹配。
按照以上步骤操作可彻底解决之前出现的路径错误和权限问题确保MySQL正常安装和运行。
# 创建账号与数据库
要创建 MySQL 管理员账号 `xch` 并创建名为 `okx` 的数据库schema可以按照以下步骤操作
### 步骤 1登录 MySQL 服务器
首先使用 root 账号登录 MySQL
```bash
# 如果配置了环境变量
mysql -u root -p
# 如果未配置环境变量,使用完整路径和 socket
/mnt/0/mysql/bin/bin/mysql -u root -p -S /mnt/0/mysql/mysql.sock
```
输入 root 密码后进入 MySQL 命令行界面。
### 步骤 2创建管理员账号 `xch`
执行以下 SQL 命令创建具有全部权限的管理员账号 `xch`
```sql
-- 创建账号并允许本地连接(仅本机可登录)
-- CREATE USER 'xch'@'localhost' IDENTIFIED BY '你的密码';
-- 授予管理员权限(所有数据库的所有操作权限)
-- GRANT ALL PRIVILEGES ON *.* TO 'xch'@'localhost' WITH GRANT OPTION;
-- 如果需要允许远程连接谨慎使用生产环境建议限制IP
CREATE USER 'xch'@'%' IDENTIFIED BY '你的密码';
GRANT ALL PRIVILEGES ON *.* TO 'xch'@'%' WITH GRANT OPTION;
-- 刷新权限使配置生效
FLUSH PRIVILEGES;
```
说明:
- `'xch'@'localhost'` 表示仅允许从本机登录
- `'xch'@'%'` 表示允许从任何 IP 登录(不推荐直接使用,可替换为具体 IP 如 `'192.168.1.%'`
- `WITH GRANT OPTION` 允许该账号授予权限给其他用户
### 步骤 3创建 `okx` 数据库schema
执行以下命令创建名为 `okx` 的数据库:
```sql
-- 创建数据库,指定字符集为 utf8mb4支持 emoji 和所有 Unicode 字符)
CREATE DATABASE IF NOT EXISTS okx
CHARACTER SET utf8mb4
COLLATE utf8mb4_unicode_ci;
```
### 步骤 4验证配置
```sql
-- 查看所有数据库(确认 okx 已创建)
SHOW DATABASES;
-- 查看用户列表(确认 xch 已创建)
SELECT user, host FROM mysql.user;
```
### 步骤 5退出 MySQL
```sql
exit;
```
### 测试新账号登录
使用新创建的 `xch` 账号登录验证:
```bash
# 本地登录
mysql -u xch -p
# 登录后查看是否有权限访问 okx 数据库
USE okx;
```
这样就完成了管理员账号 `xch` 的创建和 `okx` 数据库的创建。`xch` 账号拥有与 root 类似的全部权限,可以管理所有数据库。

39
test_binance.py Normal file
View File

@ -0,0 +1,39 @@
import requests
import json
from datetime import datetime
# API 端点:币安现货 K 线数据
base_url = "https://api.binance.com"
endpoint = "/api/v3/klines"
# 参数设置
symbol = "ETHUSDT" # 交易对ETH-USDT
interval = "5m" # 时间间隔5 分钟
limit = 100 # 获取最近 100 条 K 线(可根据需要调整,最大 1000
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
# 发送 GET 请求
response = requests.get(base_url + endpoint, params=params)
# 检查响应状态
if response.status_code == 200:
data = response.json()
# 输出最近 5 条 K 线数据作为示例(每条数据格式:[开盘时间, 开盘价, 最高价, 最低价, 收盘价, 成交量, 收盘时间, 报价成交量, 成交笔数, 主动买入成交量, 主动买入报价成交量, 忽略]
print("最近 5 条 5 分钟 K 线数据 (ETH-USDT):")
print(json.dumps(data[-5:], indent=4, ensure_ascii=False))
# 可选:转换为更易读的格式
print("\n转换为易读格式(时间为 UTC")
for kline in data[-5:]:
open_time = datetime.fromtimestamp(kline[0] / 1000).strftime('%Y-%m-%d %H:%M:%S')
close_time = datetime.fromtimestamp(kline[6] / 1000).strftime('%Y-%m-%d %H:%M:%S')
print(f"时间: {open_time} - {close_time}, "
f"开: {kline[1]}, 高: {kline[2]}, 低: {kline[3]}, 收: {kline[4]}, 量: {kline[5]}")
else:
print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")

View File

@ -10,7 +10,7 @@ import matplotlib.pyplot as plt
from core.db.db_market_data import DBMarketData
from core.biz.metrics_calculation import MetricsCalculation
import logging
from config import OKX_MONITOR_CONFIG, MYSQL_CONFIG, WINDOW_SIZE
from config import OKX_MONITOR_CONFIG, COIN_MYSQL_CONFIG, WINDOW_SIZE
# plt支持中文
plt.rcParams['font.family'] = ['SimHei']
@ -18,13 +18,13 @@ plt.rcParams['font.family'] = ['SimHei']
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_real_data(symbol, bar, start, end):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
db_market_data = DBMarketData(db_url)

View File

@ -10,7 +10,7 @@ from config import (
PASSPHRASE,
SANDBOX,
OKX_MONITOR_CONFIG,
MYSQL_CONFIG,
COIN_MYSQL_CONFIG,
)
logger = logging.logger
@ -18,13 +18,13 @@ logger = logging.logger
class TradeDataMain:
def __init__(self):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.trade_data = TradeData(

View File

@ -2,6 +2,7 @@ import core.logger as logging
from datetime import datetime
from time import sleep
import pandas as pd
import os
from core.biz.market_data import MarketData
from core.trade.ma_break_statistics import MaBreakStatistics
from core.db.db_market_data import DBMarketData
@ -18,15 +19,39 @@ from config import (
PASSPHRASE,
SANDBOX,
OKX_MONITOR_CONFIG,
MYSQL_CONFIG,
BAR_THRESHOLD,
)
logger = logging.logger
class TradeMaStrategyMain:
def __init__(self, is_us_stock: bool = False):
self.ma_break_statistics = MaBreakStatistics(is_us_stock=is_us_stock)
def __init__(
self,
is_us_stock: bool = False,
is_astock: bool = False,
is_aindex: bool = True,
is_binance: bool = False,
commission_per_share: float = 0,
buy_by_long_period: dict = {"by_week": False, "by_month": False},
long_period_condition: dict = {
"ma5>ma10": False,
"ma10>ma20": False,
"macd_diff>0": False,
"macd>0": False,
},
cut_loss_by_valleys_median: bool = False,
):
self.ma_break_statistics = MaBreakStatistics(
is_us_stock=is_us_stock,
is_astock=is_astock,
is_aindex=is_aindex,
is_binance=is_binance,
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
cut_loss_by_valleys_median=cut_loss_by_valleys_median,
)
def batch_ma_break_statistics(self):
"""
@ -34,30 +59,225 @@ class TradeMaStrategyMain:
"""
logger.info("开始批量计算MA突破统计")
strategy_dict = self.ma_break_statistics.main_strategy
pct_chg_df_list = []
account_value_chg_df_list = []
for strategy_name, strategy_info in strategy_dict.items():
pct_chg_df = self.ma_break_statistics.batch_statistics(strategy_name=strategy_name)
pct_chg_df_list.append(pct_chg_df)
if "macd" in strategy_name:
# 只计算macd策略
account_value_chg_df = self.ma_break_statistics.batch_statistics(
strategy_name=strategy_name
)
account_value_chg_df_list.append(account_value_chg_df)
pct_chg_df = pd.concat(pct_chg_df_list)
total_account_value_chg_df = pd.concat(account_value_chg_df_list)
return total_account_value_chg_df
def statistics_pct_chg(self, pct_chg_df: pd.DataFrame):
"""
1. 将各个symbol, 各个bar, 各个策略的pct_chg_total构建为新的数据结构,
symbol, bar, stratege_name_1, stratege_name_2, stratege_name_3, ...
stratege_name_1的值, 为该策略的pct_chg_total的值
2. 构建新的数据结构: symbol, bar, max_pct_chg_total_strategy_name, min_pct_chg_total_strategy_name
: BCT-USDT, 15m, 均线macd结合策略2, 全均线策略
3. 构建新的数据结构, bar, max_pct_chg_total_strategy_name, min_pct_chg_total_strategy_name
: 15m, 均线macd结合策略2, 全均线策略
4. 构建新的数据结构, symbol, max_pct_chg_total_strategy_name, min_pct_chg_total_strategy_name
: BCT-USDT, 均线macd结合策略2, 全均线策略
"""
logger.info("开始统计pct_chg")
def statistics_account_value_chg(self, account_value_chg_df: pd.DataFrame):
logger.info("开始统计account_value_chg")
def test_single_symbol():
ma_break_statistics = MaBreakStatistics(
is_us_stock=False,
is_astock=True,
is_aindex=False,
is_binance=False,
commission_per_share=0,
)
symbol = "600111.SH"
bar = "1D"
ma_break_statistics.trade_simulate(
symbol=symbol, bar=bar, strategy_name="均线macd结合策略2"
)
def batch_run_strategy():
commission_per_share_list = [0, 0.0008]
# cut_loss_by_valleys_median_list = [True, False]
cut_loss_by_valleys_median_list = [False]
buy_by_long_period_list = [
# {"by_week": True, "by_month": True, "buy_by_10_percentile": True},
{"by_week": False, "by_month": True, "buy_by_10_percentile": True},
{"by_week": True, "by_month": False, "buy_by_10_percentile": True},
{"by_week": False, "by_month": False, "buy_by_10_percentile": False},
]
# buy_by_long_period_list = [{"by_week": False, "by_month": False}]
long_period_condition_list = [
{"ma5>ma10": True, "ma10>ma20": True, "macd_diff>0": True, "macd>0": True},
{"ma5>ma10": True, "ma10>ma20": False, "macd_diff>0": True, "macd>0": True},
{"ma5>ma10": False, "ma10>ma20": True, "macd_diff>0": True, "macd>0": True},
]
for commission_per_share in commission_per_share_list:
for cut_loss_by_valleys_median in cut_loss_by_valleys_median_list:
for buy_by_long_period in buy_by_long_period_list:
for long_period_condition in long_period_condition_list:
logger.info(
f"开始计算, 主要参数commission_per_share: {commission_per_share}, buy_by_long_period: {buy_by_long_period}, long_period_condition: {long_period_condition}"
)
trade_ma_strategy_main = TradeMaStrategyMain(
is_us_stock=False,
is_astock=False,
is_aindex=True,
is_binance=False,
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
cut_loss_by_valleys_median=cut_loss_by_valleys_median,
)
trade_ma_strategy_main.batch_ma_break_statistics()
trade_ma_strategy_main = TradeMaStrategyMain(
is_us_stock=False,
is_astock=True,
is_aindex=False,
is_binance=False,
commission_per_share=commission_per_share,
buy_by_long_period=buy_by_long_period,
long_period_condition=long_period_condition,
cut_loss_by_valleys_median=cut_loss_by_valleys_median,
)
trade_ma_strategy_main.batch_ma_break_statistics()
def pickup_data_from_excel():
main_path = r"./output/trade_sandbox/ma_strategy"
sub_main_paths = ["aindex", "astock"]
fix_sub_path = "no_cut_loss_by_valleys_median"
sub_folder = r"excel/均线macd结合策略2/"
file_feature_name = "with_commission"
original_df_columns = [
"strategy_name",
"symbol",
"symbol_name",
"bar",
"total_buy_commission",
"total_sell_commission",
"total_commission",
"initial_account_value",
"final_account_value",
"account_value_chg",
"market_pct_chg",
]
original_df_list = []
for sub_main_path in sub_main_paths:
logger.info(f"开始读取{sub_main_path}数据")
full_sub_main_path = os.path.join(main_path, sub_main_path, fix_sub_path)
# 读取sub_main_path下的所有文件夹
folder_list = os.listdir(full_sub_main_path)
for folder in folder_list:
logger.info(f"开始读取{folder}数据")
folder_path = os.path.join(full_sub_main_path, folder)
properties = get_properties_by_folder_name(folder, sub_main_path)
logger.info(f"开始读取{folder}数据")
# 读取folder_path的sub_folder下的所有文件
sub_folder_path = os.path.join(folder_path, sub_folder)
file_list = os.listdir(sub_folder_path)
for file in file_list:
logger.info(f"开始读取{file}数据")
if file_feature_name in file:
file_path = os.path.join(sub_folder_path, file)
df = pd.read_excel(file_path, sheet_name="资产价值变化")
logger.info(f"开始读取{file}数据")
# 向df添加properties
df = df.assign(**properties)
df = df[list(properties.keys()) + original_df_columns]
# 将df添加到original_df_list
original_df_list.append(df)
final_df = pd.concat(original_df_list)
excel_folder_path = os.path.join(main_path, "aindex_astock_均线macd结合策略2")
os.makedirs(excel_folder_path, exist_ok=True)
excel_file_path = os.path.join(
excel_folder_path, "all_strategy_with_commission.xlsx"
)
with pd.ExcelWriter(excel_file_path) as writer:
final_df.to_excel(
writer, sheet_name="all_strategy_with_commission", index=False
)
def get_properties_by_folder_name(folder_name: str, symbol_type: str):
properties = {}
sub_properties = folder_name.split("_")
properties["symbol_type"] = symbol_type
properties["buy_by_long_period"] = "no_long_period"
properties["long_period_ma5gtma10"] = False
properties["long_period_ma10gtma20"] = False
properties["long_period_macd_diffgt0"] = False
properties["long_period_macdgt0"] = False
properties["buy_by_long_period_10_percentile"] = False
if "1M" in sub_properties:
properties["buy_by_long_period"] = "1M"
if "1W" in sub_properties:
properties["buy_by_long_period"] = "1W"
if "1W1M" in sub_properties:
properties["buy_by_long_period"] = "1W1M"
if "ma5gtma10" in sub_properties:
properties["long_period_ma5gtma10"] = True
if "ma10gtma20" in sub_properties:
properties["long_period_ma10gtma20"] = True
if "macd_diffgt0" in sub_properties:
properties["long_period_macd_diffgt0"] = True
if "macdgt0" in sub_properties:
properties["long_period_macdgt0"] = True
if "10percentile" in sub_properties:
properties["buy_by_long_period_10_percentile"] = True
return properties
def profit_loss_ratio():
"""
计算利润损失比
公式盈利/盈利交易次数 : 亏损/亏损交易次数
"""
folder = r"./output/trade_sandbox/ma_strategy/binance/excel/均线macd结合策略2/"
prefix = ["无交易费用", "有交易费用"]
for prefix in prefix:
excel_file_path = os.path.join(
folder, f"{prefix}_趋势投资_from_201708181600_to_202509020600.xlsx"
)
df = pd.read_excel(excel_file_path, sheet_name="买卖记录明细")
symbol_list = list(df["symbol"].unique())
bar_list = list(df["bar"].unique())
data_list = []
for symbol in symbol_list:
for bar in bar_list:
df_symbol_bar = df[df["symbol"] == symbol][df["bar"] == bar]
start_date = df_symbol_bar["begin_date_time"].min()
end_date = df_symbol_bar["end_date_time"].max()
profit_df = df_symbol_bar[df_symbol_bar["profit_loss"] > 0]
loss_df = df_symbol_bar[df_symbol_bar["profit_loss"] < 0]
profit_amount = sum(profit_df["profit_loss"])
loss_amount = abs(sum(loss_df["profit_loss"]))
profit_count = len(profit_df)
loss_count = len(loss_df)
if profit_count == 0 or loss_count == 0:
continue
profit_loss_ratio = round(
(profit_amount / profit_count) / (loss_amount / loss_count) * 100, 4
)
data_list.append(
{
"币种": symbol,
"交易周期": bar,
"开始时间": start_date,
"结束时间": end_date,
"盈利金额": profit_amount,
"盈利次数": profit_count,
"亏损金额": loss_amount,
"亏损次数": loss_count,
"盈亏比": profit_loss_ratio,
"盈亏比公式": f"盈利金额/盈利次数 : 亏损金额/亏损次数",
}
)
final_df = pd.DataFrame(data_list)
final_df.to_excel(os.path.join(folder, f"{prefix}时虚拟货币利润损失比.xlsx"), index=False)
if __name__ == "__main__":
trade_ma_strategy_main = TradeMaStrategyMain(is_us_stock=True)
trade_ma_strategy_main.batch_ma_break_statistics()
# batch_run_strategy()
profit_loss_ratio()

View File

@ -5,7 +5,8 @@ import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.ticker import PercentFormatter
from datetime import datetime
from datetime import datetime, timezone, timedelta
from core.utils import get_current_date_time
import re
from openpyxl import Workbook
from openpyxl.drawing.image import Image
@ -68,7 +69,7 @@ class MeanReversionSandboxMain:
stat_data = self.statistic_data(total_data)
excel_save_path = os.path.join(self.save_path, solution, "excel")
os.makedirs(excel_save_path, exist_ok=True)
date_time_str = datetime.now().strftime("%Y%m%d%H%M%S")
date_time_str = get_current_date_time(format="%Y%m%d%H%M%S")
excel_file_path = os.path.join(
excel_save_path, f"{solution}_{date_time_str}.xlsx"
)
@ -301,7 +302,7 @@ class MeanReversionSandboxMain:
if __name__ == "__main__":
start_date = "2025-05-15 00:00:00"
end_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
end_date = get_current_date_time()
solution_list = ["solution_3"]
mean_reversion_sandbox_main = MeanReversionSandboxMain(
start_date=start_date, end_date=end_date, window_size=100, only_5m=True, solution_list=solution_list

View File

@ -2,20 +2,20 @@ import pandas as pd
from core.db.db_market_data import DBMarketData
from core.biz.metrics_calculation import MetricsCalculation
from config import MYSQL_CONFIG, US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG
from config import COIN_MYSQL_CONFIG, US_STOCK_MONITOR_CONFIG, OKX_MONITOR_CONFIG
import core.logger as logging
logger = logging.logger
class UpdateDataMain:
def __init__(self):
mysql_user = MYSQL_CONFIG.get("user", "xch")
mysql_password = MYSQL_CONFIG.get("password", "")
mysql_user = COIN_MYSQL_CONFIG.get("user", "xch")
mysql_password = COIN_MYSQL_CONFIG.get("password", "")
if not mysql_password:
raise ValueError("MySQL password is not set")
mysql_host = MYSQL_CONFIG.get("host", "localhost")
mysql_port = MYSQL_CONFIG.get("port", 3306)
mysql_database = MYSQL_CONFIG.get("database", "okx")
mysql_host = COIN_MYSQL_CONFIG.get("host", "localhost")
mysql_port = COIN_MYSQL_CONFIG.get("port", 3306)
mysql_database = COIN_MYSQL_CONFIG.get("database", "okx")
self.db_url = f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_database}"
self.db_market_data = DBMarketData(self.db_url)