diff --git a/core/biz/__pycache__/market_monitor.cpython-312.pyc b/core/biz/__pycache__/market_monitor.cpython-312.pyc new file mode 100644 index 0000000..5e3939c Binary files /dev/null and b/core/biz/__pycache__/market_monitor.cpython-312.pyc differ diff --git a/core/biz/market_monitor.py b/core/biz/market_monitor.py index c25f22c..8ebaace 100644 --- a/core/biz/market_monitor.py +++ b/core/biz/market_monitor.py @@ -8,7 +8,13 @@ import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") -def create_metrics_report(row: pd.Series, only_output_rise: bool = False): +def create_metrics_report( + row: pd.Series, + next_bar_row: pd.Series, + btc_bar_row: pd.Series, + only_output_huge_volume: bool = False, + only_output_rise: bool = False +): """ 创建指标报告 """ @@ -18,15 +24,20 @@ def create_metrics_report(row: pd.Series, only_output_rise: bool = False): bar = row["bar"] window_size = row["window_size"] date_time = row["date_time"] - if huge_volume == 1: - logging.info( - f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 巨量" - ) + if only_output_huge_volume: + if huge_volume == 1: + logging.info( + f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 巨量" + ) + else: + logging.info( + f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 非巨量,此次不发送相关数据" + ) + return else: logging.info( - f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 非巨量,此次不发送相关数据" - ) - return + f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time}" + ) # fill -1 to nan row = row.fillna(1) @@ -41,8 +52,10 @@ def create_metrics_report(row: pd.Series, only_output_rise: bool = False): f"symbol: {symbol} {bar} window_size: {window_size} date_time: {date_time} 下跌,不发送相关数据" ) return - - contents.append(f"# 交易巨量报告") + if huge_volume == 1: + contents.append(f"## 交易巨量报告") + else: + contents.append(f"## 交易量报告") contents.append(f"## {symbol} {bar} 滑动窗口: {window_size} 时间: {date_time}") contents.append(f"### 价格信息") contents.append(f"当前价格: {close}, 开盘价: {open}, 最高价: {high}, 最低价: {low}") @@ -221,29 +234,84 @@ def create_metrics_report(row: pd.Series, only_output_rise: bool = False): long_short_info["多"].append(f"K线形态: {k_shape}") if k_shape_value < 1: long_short_info["空"].append(f"K线形态: {k_shape}") - - + if k_up_down == "阳线": if is_long and not is_over_buy: long_short_info["多"].append(f"量价关系: 非超买且放量上涨") if is_short and is_over_sell: - long_short_info["多"].append(f"量价关系: 空头态势且超卖,但出现放量上涨,可能反转") + long_short_info["多"].append( + f"量价关系: 空头态势且超卖,但出现放量上涨,可能反转" + ) if k_up_down == "阴线": if is_long and is_over_buy: if close_80_high or close_90_high or high_80_high or high_90_high: - long_short_info["空"].append(f"量价关系: 多头态势且超买, 目前是价位高点,但出现放量下跌,可能反转") + long_short_info["空"].append( + f"量价关系: 多头态势且超买, 目前是价位高点,但出现放量下跌,可能反转" + ) if is_short and not is_over_sell: long_short_info["空"].append(f"量价关系: 空头态势且非超卖,出现放量下跌") contents.append(f"### 技术指标信息") + if ma_long_short_value == 1: + contents.append(f"均线势头: 震荡") long_info_list = long_short_info["多"] short_info_list = long_short_info["空"] + contents.append(f"#### 多头指标信号") if len(long_info_list) > 0: - contents.append(f"#### 多头指标信号") contents.append(f"{"\n".join(long_info_list)}") - if len(short_info_list) > 0: - contents.append(f"#### 空头指标信号") - contents.append(f"{"\n".join(short_info_list)}") + else: + contents.append(f"无多头指标信号") + contents.append(f"#### 空头指标信号") + if len(short_info_list) > 0: + contents.append(f"{"\n".join(short_info_list)}") + else: + contents.append(f"无空头指标信号") + + if next_bar_row is not None: + contents.append(f"## {symbol} 与更长周期技术形态对比") + contents.extend(get_long_short_over_buy_sell(next_bar_row)) + if btc_bar_row is not None: + contents.append(f"## {symbol} 与BTC相同周期技术形态对比") + contents.extend(get_long_short_over_buy_sell(btc_bar_row)) + mark_down_text = "\n\n".join(contents) return mark_down_text + + +def get_long_short_over_buy_sell( + row: pd.Series, +): + result = {} + symbol = row["symbol"] + bar = row["bar"] + ma_long_short = str(row["ma_long_short"]) + macd_signal = str(row["macd_signal"]) + macd_divergence = str(row["macd_divergence"]) + kdj_signal = str(row["kdj_signal"]) + kdj_pattern = str(row["kdj_pattern"]) + rsi_signal = str(row["rsi_signal"]) + boll_signal = str(row["boll_signal"]) + boll_pattern = str(row["boll_pattern"]) + contents = [] + contents.append(f"### {symbol} {bar} 对比形态") + if ma_long_short in ["多", "空"]: + contents.append(f"均线形态: {ma_long_short}") + else: + contents.append(f"均线形态: 震荡") + if macd_signal in ["金叉", "死叉"]: + contents.append(f"MACD信号: {macd_signal}") + if macd_divergence in ["顶背离", "底背离"]: + contents.append(f"MACD背离: {macd_divergence}") + if kdj_signal in ["金叉", "死叉"]: + contents.append(f"KDJ信号: {kdj_signal}") + if kdj_pattern in ["超超买", "超买", "超超卖", "超卖"]: + contents.append(f"KDJ形态: {kdj_pattern}") + if rsi_signal in ["超超买", "超买", "超超卖", "超卖"]: + contents.append(f"RSI形态: {rsi_signal}") + if boll_signal in ["突破下轨", "击穿上轨"]: + contents.append(f"BOLL信号: {boll_signal}") + if boll_pattern in ["超超买", "超买", "超超卖", "超卖"]: + contents.append(f"BOLL形态: {boll_pattern}") + return contents + diff --git a/core/db/__pycache__/db_market_monitor.cpython-312.pyc b/core/db/__pycache__/db_market_monitor.cpython-312.pyc new file mode 100644 index 0000000..9642ab7 Binary files /dev/null and b/core/db/__pycache__/db_market_monitor.cpython-312.pyc differ diff --git a/market_monitor_main.py b/market_monitor_main.py index 3904b14..9da3420 100644 --- a/market_monitor_main.py +++ b/market_monitor_main.py @@ -27,7 +27,7 @@ class MarketMonitorMain: self.start_date = MONITOR_CONFIG.get("volume_monitor", {}).get( "initial_date", "2025-05-01 00:00:00" ) - self.latest_record_file_path = "./output/latest_record.json" + self.latest_record_file_path = "./output/record/latest_record.json" self.latest_record = self.get_latest_record() self.output_folder = "./output/report/market_monitor/" os.makedirs(self.output_folder, exist_ok=True) @@ -48,12 +48,17 @@ class MarketMonitorMain: """ 获取最新记录 """ - if os.path.exists(self.latest_record_file_path): - with open(self.latest_record_file_path, "r", encoding="utf-8") as f: - return json.load(f) - else: - with open(self.latest_record_file_path, "w", encoding="utf-8") as f: - json.dump({}, f, ensure_ascii=False, indent=4) + os.makedirs(os.path.dirname(self.latest_record_file_path), exist_ok=True) + try: + if os.path.exists(self.latest_record_file_path): + with open(self.latest_record_file_path, "r", encoding="utf-8") as f: + return json.load(f) + else: + with open(self.latest_record_file_path, "w", encoding="utf-8") as f: + json.dump({}, f, ensure_ascii=False, indent=4) + return {} + except Exception as e: + logging.error(f"获取最后一次报表生成记录失败: {e}") return {} def monitor_realtime_market( @@ -67,10 +72,14 @@ class MarketMonitorMain: 监控最新市场数据 考虑到速度,暂不与数据库交互,直接从api获取数据 """ + # 获得当前时间字符串 + now_datetime = datetime.now() + now_datetime_str = now_datetime.strftime("%Y-%m-%d %H:%M:%S") + end_time = transform_date_time_to_timestamp(now_datetime_str) real_time_data = self.market_data_main.market_data.get_realtime_kline_data( symbol=symbol, bar=bar, - end_time=None, + end_time=end_time, limit=50, ) @@ -79,28 +88,28 @@ class MarketMonitorMain: return latest_realtime_timestamp = real_time_data["timestamp"].iloc[-1] + latest_realtime_timestamp = int(latest_realtime_timestamp) latest_record_timestamp = ( - self.latest_record.get(symbol, {}).get(bar, {}).get("timestamp", 0) + self.latest_record.get(symbol, {}).get(bar, {}).get("timestamp", None) ) latest_reatime_datetime = timestamp_to_datetime(latest_realtime_timestamp) - latest_record_datetime = timestamp_to_datetime(latest_record_timestamp) - if ( - latest_record_timestamp is not None - and latest_realtime_timestamp <= latest_record_timestamp - ): + if latest_record_timestamp is not None: + latest_record_timestamp = int(latest_record_timestamp) + + latest_record_datetime = timestamp_to_datetime(latest_record_timestamp) + if ( + latest_record_timestamp is not None + and latest_realtime_timestamp <= latest_record_timestamp + ): + logging.info( + f"最新市场数据时间戳 {latest_reatime_datetime} 小于等于最新记录时间戳 {latest_record_datetime}, 不进行监控" + ) + return logging.info( - f"最新市场数据时间戳 {latest_reatime_datetime} 小于等于最新记录时间戳 {latest_record_datetime}, 不进行监控" - ) - return + f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间 {latest_record_datetime}") else: - self.latest_record[symbol] = {bar: {"timestamp": latest_realtime_timestamp}} - with open(self.latest_record_file_path, "w", encoding="utf-8") as f: - json.dump(self.latest_record, f, ensure_ascii=False, indent=4) - - - logging.info( - f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间 {latest_record_datetime}" - ) + logging.info( + f"最新市场数据时间 {latest_reatime_datetime}, 上一次记录时间为空") real_time_data = self.market_data_main.add_new_columns(real_time_data) logging.info(f"开始计算技术指标: {symbol} {bar}") @@ -111,7 +120,7 @@ class MarketMonitorMain: window_size=self.window_size, threshold=self.huge_volume_main.threshold, check_price=True, - only_output_huge_volume=only_output_huge_volume, + only_output_huge_volume=False, output_excel=False, ) if real_time_data is None or len(real_time_data) == 0: @@ -119,20 +128,42 @@ class MarketMonitorMain: f"计算大成交量失败: {symbol} {bar} 窗口大小: {self.window_size}" ) return + realtime_row = real_time_data.iloc[-1] - report = create_metrics_report(real_time_data, only_output_rise) + if only_output_huge_volume: + if realtime_row["huge_volume"] == 1: + logging.info(f"监控到巨量: {symbol} {bar} 窗口大小: {self.window_size}") + else: + logging.info(f"监控到非巨量: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控") + return + if only_output_rise: + if realtime_row["pct_change"] > 0: + logging.info(f"监控到上涨: {symbol} {bar} 窗口大小: {self.window_size}") + else: + logging.info(f"监控到下跌: {symbol} {bar} 窗口大小: {self.window_size},退出本次监控") + return + + next_bar_row = self.get_other_realtime_data(symbol, bar, end_time, next=True) + if 'BTC-USDT' in symbol: + btc_bar_row = None + else: + btc_bar_row = self.get_other_realtime_data('BTC-USDT', bar, end_time, next=False) + + + report = create_metrics_report(realtime_row, next_bar_row, btc_bar_row, only_output_huge_volume, only_output_rise) text_length = len(report.encode("utf-8")) + logging.info(f"发送报告到企业微信,字节数: {text_length}") self.wechat.send_markdown(report) - self.latest_record[symbol][bar]["timestamp"] = latest_realtime_timestamp - with open(self.latest_record_file_path, "w", encoding="utf-8") as f: - json.dump(self.latest_record, f, ensure_ascii=False, indent=4) + # remove punction in latest_reatime_datetime - latest_reatime_datetime = re.sub(r"[\:\-\s]", "", latest_reatime_datetime) - report_file_name = f"{symbol}_{bar}_{self.window_size}_{latest_reatime_datetime}.md" + file_datetime = re.sub(r"[\:\-\s]", "", latest_reatime_datetime) + report_file_name = ( + f"{symbol}_{bar}_{self.window_size}_{file_datetime}.md" + ) report_file_path = os.path.join(self.output_folder, report_file_name) with open(report_file_path, "w", encoding="utf-8") as f: - f.write(report.replace(":", "_")) + f.write(report) report_file_byte_size = os.path.getsize(report_file_path) report_data = { "symbol": symbol, @@ -143,22 +174,68 @@ class MarketMonitorMain: "report": report, "report_file_path": report_file_path, "report_file_name": report_file_name, - "report_file_byte_size": report_file_byte_size + "report_file_byte_size": report_file_byte_size, } report_data = pd.DataFrame([report_data]) logging.info(f"插入数据到数据库") self.db_market_monitor.insert_data_to_mysql(report_data) + if self.latest_record.get(symbol, None) is None: + self.latest_record[symbol] = {bar: {"timestamp": latest_realtime_timestamp}} + else: + self.latest_record[symbol][bar]["timestamp"] = latest_realtime_timestamp + with open(self.latest_record_file_path, "w", encoding="utf-8") as f: + json.dump(self.latest_record, f, ensure_ascii=False, indent=4) + + def get_other_realtime_data(self, symbol: str, bar: str, end_time: int, next: bool = True): + """ + 获取下一个长周期实时数据 + """ + if next: + # 获得bar在self.market_data_main.bars中的索引 + bar_index = self.market_data_main.bars.index(bar) + if bar_index == len(self.market_data_main.bars) - 1: + logging.error(f"已经是最后一个bar: {bar}") + return None + # 获得下一个bar + bar = self.market_data_main.bars[bar_index + 1] + # 获得下一个bar的实时数据 + data = self.market_data_main.market_data.get_realtime_kline_data( + symbol=symbol, bar=bar, end_time=end_time, limit=50 + ) + if data is None or len(data) == 0: + logging.error(f"获取实时数据失败: {symbol}, {bar}") + return None + data = self.market_data_main.add_new_columns(data) + logging.info(f"开始计算技术指标: {symbol} {bar}") + data = self.market_data_main.calculate_metrics(data) + row = data.iloc[-1] + return row def batch_monitor_realtime_market( self, only_output_huge_volume: bool = True, - only_output_rise: bool = False, + only_output_rise: bool = True, ): for symbol in self.market_data_main.symbols: for bar in self.market_data_main.bars: - self.monitor_realtime_market( - symbol, - bar, - only_output_huge_volume, - only_output_rise, - ) + logging.info(f"开始监控: {symbol} {bar} 窗口大小: {self.window_size} 行情数据") + try: + self.monitor_realtime_market( + symbol, + bar, + only_output_huge_volume, + only_output_rise, + ) + except Exception as e: + logging.error(f"监控失败: {symbol} {bar} 窗口大小: {self.window_size} 行情数据: {e}") + continue + + +if __name__ == "__main__": + market_monitor_main = MarketMonitorMain() + market_monitor_main.monitor_realtime_market( + symbol="ETH-USDT", + bar="5m", + only_output_huge_volume=True, + only_output_rise=False, + ) diff --git a/monitor_schedule.py b/monitor_schedule.py new file mode 100644 index 0000000..67c5fb5 --- /dev/null +++ b/monitor_schedule.py @@ -0,0 +1,20 @@ +from market_monitor_main import MarketMonitorMain +import logging +import time + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") + + +def monitor_schedule(): + market_monitor_main = MarketMonitorMain() + logging.info("开始监控") + while True: # 每分钟监控一次 + market_monitor_main.batch_monitor_realtime_market( + only_output_huge_volume=True, + only_output_rise=False, + ) + logging.info("本次循环监控结束,等待1分钟") + time.sleep(60) + +if __name__ == "__main__": + monitor_schedule() \ No newline at end of file diff --git a/sql/query/sql_playground.sql b/sql/query/sql_playground.sql index 45f9454..d07bb41 100644 --- a/sql/query/sql_playground.sql +++ b/sql/query/sql_playground.sql @@ -1,3 +1,5 @@ +select * from crypto_market_monitor; +delete from crypto_market_monitor where timestamp=1754382900000; select date_time, open, high, low, close, k_shape from crypto_market_data WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-08-04 15:00:00' order by timestamp ; @@ -6,6 +8,10 @@ select * from crypto_market_data WHERE symbol='XCH-USDT' and bar='5m' and date_time > '2025-08-04 15:00:00' order by timestamp asc; +select * from crypto_huge_volume +WHERE symbol='XCH-USDT' and bar='5m' #and date_time > '2025-08-04 15:00:00' +order by timestamp asc; + delete FROM crypto_market_data where symbol != 'XCH-USDT';