diff --git a/core/db/db_huge_volume_data.py b/core/db/db_huge_volume_data.py index 0479544..2a4407e 100644 --- a/core/db/db_huge_volume_data.py +++ b/core/db/db_huge_volume_data.py @@ -24,9 +24,6 @@ class DBHugeVolumeData: "high", "low", "close", - "pre_close", - "close_change", - "pct_chg", "volume", "volCcy", "volCCyQuote", diff --git a/huge_volume_main.py b/huge_volume_main.py index 548d8d5..04557c2 100644 --- a/huge_volume_main.py +++ b/huge_volume_main.py @@ -138,7 +138,12 @@ class HugeVolumeMain: symbol, bar, window_size ) if latest_huge_volume_data is None or len(latest_huge_volume_data) == 0: - self.detect_volume_spike(symbol, bar, only_output_huge_volume=False) + self.detect_volume_spike( + symbol=symbol, + bar=bar, + window_size=window_size, + only_output_huge_volume=False, + ) return else: earliest_date_time = latest_huge_volume_data["date_time"] @@ -280,7 +285,7 @@ class HugeVolumeMain: data=volume_statistics_data, window_size=window_size, periods=periods ) return huge_volume_data, result_data - + def send_huge_volume_data_to_wechat(self, start: str = None, end: str = None): if start is None: start = MONITOR_CONFIG.get("volume_monitor", {}).get( @@ -288,16 +293,15 @@ class HugeVolumeMain: ) if end is None: end = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - + start_timestamp = transform_date_time_to_timestamp(start) end_timestamp = transform_date_time_to_timestamp(end) - + start_date_time = timestamp_to_datetime(start_timestamp) end_date_time = timestamp_to_datetime(end_timestamp) logging.info(f"开始获取巨量交易数据: {start} 到 {end}") huge_volume_data = self.db_huge_volume_data.query_huge_volume_records( - start=start_timestamp, - end=end_timestamp + start=start_timestamp, end=end_timestamp ) if huge_volume_data is None or len(huge_volume_data) == 0: logging.warning(f"获取巨量交易数据为空: {start} 到 {end}") @@ -311,13 +315,15 @@ class HugeVolumeMain: huge_volume_data = huge_volume_data[huge_volume_data["huge_volume"] == 1] # 过滤huge_volume_data,要求(price_80_high == 1 or price_90_high == 1 or price_20_low == 1 or price_10_low == 1) huge_volume_data = huge_volume_data[ - (huge_volume_data["price_90_high"] == 1) | - (huge_volume_data["price_10_low"] == 1) + (huge_volume_data["price_90_high"] == 1) + | (huge_volume_data["price_10_low"] == 1) ] # 过滤huge_volume_data,要求volume_ratio > 10 huge_volume_data = huge_volume_data[huge_volume_data["volume_ratio"] > 10] # 根据symbol, bar, window_size, timestamp排序 - huge_volume_data = huge_volume_data.sort_values(by=["symbol", "bar", "window_size", "timestamp"], ascending=True) + huge_volume_data = huge_volume_data.sort_values( + by=["symbol", "bar", "window_size", "timestamp"], ascending=True + ) huge_volume_data = huge_volume_data.reset_index(drop=True) logging.info(f"获取巨量交易数据: {len(huge_volume_data)}条") contents = [] @@ -330,39 +336,52 @@ class HugeVolumeMain: contents.append(f"# 放量交易数据: {start_date_time} 到 {end_date_time}") contents.append(f"## 币种: {symbol}") symbol_data = huge_volume_data[huge_volume_data["symbol"] == symbol] - symbol_data = symbol_data.sort_values(by=["bar", "window_size", "timestamp"], ascending=True) + symbol_data = symbol_data.sort_values( + by=["bar", "window_size", "timestamp"], ascending=True + ) symbol_data = symbol_data.reset_index(drop=True) for index, row in symbol_data.iterrows(): - if row['huge_volume'] == 1 and (row['price_80_high'] == 1 or row['price_90_high'] == 1 or row['price_20_low'] == 1 or row['price_10_low'] == 1): - if row['price_90_high'] == 1: + if row["huge_volume"] == 1 and ( + row["price_80_high"] == 1 + or row["price_90_high"] == 1 + or row["price_20_low"] == 1 + or row["price_10_low"] == 1 + ): + if row["price_90_high"] == 1: price_position_text = "90%分位数高点" - elif row['price_80_high'] == 1: + elif row["price_80_high"] == 1: price_position_text = "80%分位数高点" else: price_position_text = "" if price_position_text == "": - if row['price_10_low'] == 1: + if row["price_10_low"] == 1: price_position_text = "10%分位数低点" - elif row['price_20_low'] == 1: + elif row["price_20_low"] == 1: price_position_text = "20%分位数低点" else: price_position_text = "" - open_price = str(round(row['open'], 6)) - high = str(round(row['high'], 6)) - low = str(round(row['low'], 6)) - close = str(round(row['close'], 6)) - volume = str(round(row['volume'], 6)) - volCCyQuote = str(round(row['volCCyQuote'], 6)) - volume_ratio = str(round(row['volume_ratio'], 6)) - contents.append(f"交易周期: {row['bar']}, 滑动窗口: {row['window_size']} , 发生时间: {row['date_time']}") - contents.append(f"开盘价: {open_price}, 最高价: {high}, 最低价: {low}, 收盘价: {close}") - contents.append(f"成交量: {volume}, 成交量USDT: {volCCyQuote}, 交易量比率: {volume_ratio}") + open_price = str(round(row["open"], 6)) + high = str(round(row["high"], 6)) + low = str(round(row["low"], 6)) + close = str(round(row["close"], 6)) + volume = str(round(row["volume"], 6)) + volCCyQuote = str(round(row["volCCyQuote"], 6)) + volume_ratio = str(round(row["volume_ratio"], 6)) + contents.append( + f"交易周期: {row['bar']}, 滑动窗口: {row['window_size']} , 发生时间: {row['date_time']}" + ) + contents.append( + f"开盘价: {open_price}, 最高价: {high}, 最低价: {low}, 收盘价: {close}" + ) + contents.append( + f"成交量: {volume}, 成交量USDT: {volCCyQuote}, 交易量比率: {volume_ratio}" + ) contents.append(f"价格分位: {price_position_text}") contents.append(f"--------------------------------") text = "\n\n".join(contents) # 获得text的字节数 - text_length = len(text.encode('utf-8')) + text_length = len(text.encode("utf-8")) logging.info(f"发送巨量交易数据到微信,字节数: {text_length}") # with open(os.path.join(self.output_folder, "huge_volume_data.md"), "w", encoding="utf-8") as f: @@ -447,10 +466,13 @@ def batch_initial_detect_volume_spike(threshold: float = 2.0): ): window_sizes = [50, 80, 100, 120] huge_volume_main = HugeVolumeMain(threshold) + start_date = MONITOR_CONFIG.get("volume_monitor", {}).get( + "initial_date", "2025-05-01 00:00:00" + ) for window_size in window_sizes: huge_volume_main.batch_initial_detect_volume_spike( window_size=window_size, - start="2025-05-01 00:00:00", + start=start_date, ) @@ -479,8 +501,8 @@ def test_send_huge_volume_data_to_wechat(): if __name__ == "__main__": - test_send_huge_volume_data_to_wechat() - # batch_initial_detect_volume_spike(threshold=2.0) + # test_send_huge_volume_data_to_wechat() + batch_initial_detect_volume_spike(threshold=2.0) # batch_update_volume_spike(threshold=2.0) # huge_volume_main = HugeVolumeMain(threshold=2.0) # huge_volume_main.batch_next_periods_rise_or_fall(output_excel=True) diff --git a/market_data_main.py b/market_data_main.py index f1e294f..3fd520b 100644 --- a/market_data_main.py +++ b/market_data_main.py @@ -224,14 +224,16 @@ class MarketDataMain: before_data = self.db_market_data.query_data_before_timestamp( symbol, bar, min_start_time_ts, 30 ) + latest_before_timestamp = None if before_data is not None and len(before_data) > 0: earliest_timestamp = before_data[-1]["timestamp"] + latest_before_timestamp = before_data[0]["timestamp"] else: earliest_timestamp = min_start_time_ts handle_data = self.db_market_data.query_market_data_by_symbol_bar( symbol=symbol, bar=bar, start=earliest_timestamp, end=None ) - if handle_data is not None and len(handle_data) > 0: + if handle_data is not None and len(handle_data) > len(before_data): if isinstance(handle_data, list): handle_data = pd.DataFrame(handle_data) elif isinstance(handle_data, dict): @@ -243,6 +245,9 @@ class MarketDataMain: return None handle_data = self.calculate_metrics(handle_data) + if latest_before_timestamp is not None: + handle_data = handle_data[handle_data["timestamp"] > latest_before_timestamp] + handle_data.reset_index(drop=True, inplace=True) logging.info(f"开始保存技术指标数据: {symbol} {bar}") self.db_market_data.insert_data_to_mysql(handle_data) return data diff --git a/sql/query/sql_playground.sql b/sql/query/sql_playground.sql index edf506b..af4b3f0 100644 --- a/sql/query/sql_playground.sql +++ b/sql/query/sql_playground.sql @@ -2,6 +2,11 @@ select date_time, open, high, low, close, k_shape from crypto_market_data WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-08-04 15:00:00' order by timestamp ; +select * from crypto_market_data +WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-08-04 15:00:00' +order by timestamp desc; + + delete FROM crypto_market_data where symbol != 'XCH-USDT'; select * from crypto_trade_data diff --git a/sql/table/crypto_huge_volume.sql b/sql/table/crypto_huge_volume.sql index ff477e5..c98b4e7 100644 --- a/sql/table/crypto_huge_volume.sql +++ b/sql/table/crypto_huge_volume.sql @@ -9,9 +9,6 @@ CREATE TABLE IF NOT EXISTS crypto_huge_volume ( high DECIMAL(20,10) NOT NULL COMMENT '最高价', low DECIMAL(20,10) NOT NULL COMMENT '最低价', close DECIMAL(20,10) NOT NULL COMMENT '收盘价', - pre_close DECIMAL(20,10) NOT NULL COMMENT '前收盘价', - close_change DECIMAL(20,10) NOT NULL COMMENT '涨跌额', - pct_chg DECIMAL(20,10) NOT NULL COMMENT '涨跌幅', volume DECIMAL(30,10) NOT NULL COMMENT '交易量', volCcy DECIMAL(30,10) NOT NULL COMMENT '交易量(基础货币)', volCCyQuote DECIMAL(30,10) NOT NULL COMMENT '交易量(计价货币)',