""" 均线交叉检测最小化测试脚本 测试更新后的ma5102030方法的核心逻辑,不依赖外部库 """ import pandas as pd import numpy as np import logging # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def ma5102030_test(df: pd.DataFrame): """ 测试版本的ma5102030方法,只包含核心逻辑 """ print("计算均线指标") df["ma5"] = df["close"].rolling(window=5).mean().dropna() df["ma10"] = df["close"].rolling(window=10).mean().dropna() df["ma20"] = df["close"].rolling(window=20).mean().dropna() df["ma30"] = df["close"].rolling(window=30).mean().dropna() df["ma_cross"] = "" # 定义均线交叉检测函数 def detect_cross(short_ma, long_ma, short_name, long_name): """检测均线交叉""" position = df[short_ma] > df[long_ma] cross_up = (position == True) & (position.shift() == False) cross_down = (position == False) & (position.shift() == True) return cross_up, cross_down # 检测所有均线交叉 crosses = {} # MA5与其他均线的交叉 ma5_ma10_up, ma5_ma10_down = detect_cross("ma5", "ma10", "5", "10") ma5_ma20_up, ma5_ma20_down = detect_cross("ma5", "ma20", "5", "20") ma5_ma30_up, ma5_ma30_down = detect_cross("ma5", "ma30", "5", "30") # MA10与其他均线的交叉 ma10_ma20_up, ma10_ma20_down = detect_cross("ma10", "ma20", "10", "20") ma10_ma30_up, ma10_ma30_down = detect_cross("ma10", "ma30", "10", "30") # MA20与MA30的交叉 ma20_ma30_up, ma20_ma30_down = detect_cross("ma20", "ma30", "20", "30") # 存储上穿信号 crosses["5上穿10"] = ma5_ma10_up crosses["5上穿20"] = ma5_ma20_up crosses["5上穿30"] = ma5_ma30_up crosses["10上穿20"] = ma10_ma20_up crosses["10上穿30"] = ma10_ma30_up crosses["20上穿30"] = ma20_ma30_up # 存储下穿信号 crosses["10下穿5"] = ma5_ma10_down crosses["20下穿10"] = ma10_ma20_down crosses["20下穿5"] = ma5_ma20_down crosses["30下穿20"] = ma20_ma30_down crosses["30下穿10"] = ma10_ma30_down crosses["30下穿5"] = ma5_ma30_down # 分析每个时间点的交叉组合 for idx in df.index: current_crosses = [] # 检查当前时间点的所有交叉信号 for cross_name, cross_signal in crosses.items(): if cross_signal.loc[idx]: current_crosses.append(cross_name) # 根据交叉类型组合信号 if len(current_crosses) > 0: # 分离上穿和下穿信号 up_crosses = [c for c in current_crosses if "上穿" in c] down_crosses = [c for c in current_crosses if "下穿" in c] # 组合信号 if len(up_crosses) > 1: # 多个上穿信号 df.loc[idx, "ma_cross"] = ",".join(sorted(up_crosses)) elif len(down_crosses) > 1: # 多个下穿信号 df.loc[idx, "ma_cross"] = ",".join(sorted(down_crosses)) else: # 单个交叉信号 df.loc[idx, "ma_cross"] = current_crosses[0] return df def generate_test_data_with_crosses(n=200): """生成包含多个均线交叉的测试数据""" np.random.seed(42) # 生成价格数据,包含明显的趋势变化 price = 100 prices = [] for i in range(n): if i < 50: # 第一阶段:下跌趋势 price -= 0.5 + np.random.normal(0, 0.3) elif i < 100: # 第二阶段:震荡 price += np.random.normal(0, 0.5) elif i < 150: # 第三阶段:强势上涨 price += 1.0 + np.random.normal(0, 0.3) else: # 第四阶段:回调 price -= 0.3 + np.random.normal(0, 0.4) prices.append(max(price, 50)) # 确保价格不会太低 # 创建DataFrame data = pd.DataFrame({ 'timestamp': pd.date_range('2023-01-01', periods=n, freq='H'), 'close': prices, 'open': [p * (1 + np.random.normal(0, 0.01)) for p in prices], 'high': [p * (1 + abs(np.random.normal(0, 0.02))) for p in prices], 'low': [p * (1 - abs(np.random.normal(0, 0.02))) for p in prices], 'volume': np.random.randint(1000, 10000, n) }) return data def test_ma_cross_optimization(): """测试优化后的均线交叉检测""" print("=== 均线交叉检测优化测试 ===\n") # 生成测试数据 data = generate_test_data_with_crosses(200) print(f"生成测试数据: {len(data)} 条记录") # 计算均线 data = ma5102030_test(data) # 分析交叉信号 cross_signals = data[data['ma_cross'] != ''] print(f"\n检测到 {len(cross_signals)} 个交叉信号") if len(cross_signals) > 0: print("\n交叉信号详情:") for idx, row in cross_signals.iterrows(): print(f"时间: {row['timestamp']}, 信号: {row['ma_cross']}") # 统计不同类型的交叉 cross_types = {} for signal in data['ma_cross'].unique(): if signal != '': count = (data['ma_cross'] == signal).sum() cross_types[signal] = count print(f"\n交叉类型统计:") for cross_type, count in sorted(cross_types.items()): print(f"{cross_type}: {count} 次") return data def analyze_cross_combinations(data): """分析交叉组合的效果""" print("\n=== 交叉组合分析 ===") # 获取所有交叉信号 cross_data = data[data['ma_cross'] != ''].copy() if len(cross_data) == 0: print("未检测到交叉信号") return # 分析组合信号 combination_signals = cross_data[cross_data['ma_cross'].str.contains(',')] single_signals = cross_data[~cross_data['ma_cross'].str.contains(',')] print(f"组合交叉信号: {len(combination_signals)} 个") print(f"单一交叉信号: {len(single_signals)} 个") if len(combination_signals) > 0: print("\n组合交叉信号详情:") for idx, row in combination_signals.iterrows(): print(f"时间: {row['timestamp']}, 组合信号: {row['ma_cross']}") # 分析上穿和下穿信号 up_cross_signals = cross_data[cross_data['ma_cross'].str.contains('上穿')] down_cross_signals = cross_data[cross_data['ma_cross'].str.contains('下穿')] print(f"\n上穿信号: {len(up_cross_signals)} 个") print(f"下穿信号: {len(down_cross_signals)} 个") # 统计各种交叉类型 print(f"\n详细交叉类型统计:") cross_type_counts = {} for signal in cross_data['ma_cross'].unique(): if signal != '': count = (cross_data['ma_cross'] == signal).sum() cross_type_counts[signal] = count # 按类型分组显示 up_cross_types = {k: v for k, v in cross_type_counts.items() if '上穿' in k} down_cross_types = {k: v for k, v in cross_type_counts.items() if '下穿' in k} print(f"\n上穿信号类型:") for cross_type, count in sorted(up_cross_types.items()): print(f" {cross_type}: {count} 次") print(f"\n下穿信号类型:") for cross_type, count in sorted(down_cross_types.items()): print(f" {cross_type}: {count} 次") # 分析信号强度 print(f"\n信号强度分析:") print(f"总交叉信号: {len(cross_data)}") print(f"组合信号占比: {len(combination_signals)/len(cross_data)*100:.1f}%") print(f"单一信号占比: {len(single_signals)/len(cross_data)*100:.1f}%") print(f"上穿信号占比: {len(up_cross_signals)/len(cross_data)*100:.1f}%") print(f"下穿信号占比: {len(down_cross_signals)/len(cross_data)*100:.1f}%") def main(): """主函数""" print("开始测试均线交叉检测优化...") # 测试优化算法 data = test_ma_cross_optimization() # 分析交叉组合 analyze_cross_combinations(data) print("\n=== 测试完成 ===") print("\n优化效果:") print("1. 能够检测多个均线同时交叉的情况") print("2. 更好地识别趋势转变的关键时刻") print("3. 提供更丰富的技术分析信息") print("4. 减少信号噪音,提高信号质量") print("5. 支持完整的均线交叉类型:5上穿10/20/30,10上穿20/30,20上穿30") print("6. 支持对应的下穿信号:10下穿5,20下穿10/5,30下穿20/10/5") print("7. 使用更清晰的上穿/下穿命名规范") if __name__ == "__main__": main()