crypto_quant/market_data_from_itick_main.py

60 lines
1.7 KiB
Python
Raw Permalink Normal View History

2025-08-31 03:20:59 +00:00
import requests
import core.logger as logging
from datetime import datetime, timedelta
2025-09-16 06:31:15 +00:00
from core.utils import get_current_date_time
2025-08-31 03:20:59 +00:00
logger = logging.logger
def main():
# 配置参数
symbol = "QQQ"
start_date = "2025-08-01 00:00:00"
end_date = "2025-08-20 00:00:00"
interval = "5m"
segment_days = 5 # 减少每段天数,降低单次请求的数据量
print(f"开始下载 {symbol} 数据")
print(f"时间范围: {start_date}{end_date}")
print(f"数据间隔: {interval}")
print(f"分段天数: {segment_days}")
print("-" * 50)
try:
market_data_from_futu = MarketDataFromAlphaVantage(
symbol=symbol,
start_date=start_date,
end_date=end_date,
interval=interval,
segment_days=segment_days,
)
# 下载数据
market_data_from_futu.download_all()
# 处理数据
processed_data = market_data_from_futu.process_data()
if not processed_data.empty:
logger.info(f"成功下载 {len(processed_data)} 条数据")
# 保存数据
2025-09-16 09:09:00 +00:00
filename = f"{symbol}_{interval}_{get_current_date_time(format="%Y%m%d%H%M%S")}.csv"
2025-08-31 03:20:59 +00:00
market_data_from_futu.save_to_csv(filename)
# 显示数据统计
market_data_from_futu.show_head()
market_data_from_futu.show_stats()
else:
logger.warning("未获取到任何数据")
except Exception as e:
logger.error(f"下载过程中发生错误: {e}")
print(f"错误详情: {e}")
if __name__ == "__main__":
main()