Lean ConnectQuant针对A股的量化数据库搭建【量化数据篇】

本文基于 SQLite 轻量级数据库与 QuantConnect Lean 框架,构建了一套​​支持高频数据回测、策略无缝实盘部署​​的本地化解决方案。

0x01获取所有股票

import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import datetime

def save_to_sqlite(df, db_path, table_name):
    """将DataFrame存储到SQLite数据库"""
    try:
        conn = sqlite3.connect(db_path)
        df.to_sql(table_name, conn, if_exists='replace', index=False)
        print(f"数据成功保存至 {db_path} 的 {table_name} 表")
    except Exception as e:
        print(f"数据库操作失败:{str(e)}")
    finally:
        if 'conn' in locals() and conn:
            conn.close()

def validate_stock_code(code):
    """规范股票代码格式为交易所.6位数字(增强创业板/科创板处理)[7,12](@ref)"""
    if not isinstance(code, str):
        code = str(code)
    
    code = code.strip().lower().replace(" ", "")
    
    if code.startswith(("sh.", "sz.", "bj.")):
        prefix, num = code.split(".")
        return f"{prefix}.{num.zfill(6)}"
    
    if code.isdigit():
        code = code.zfill(6)
        first_char = code[0]
        if first_char in ['5','6','9','7'] or code.startswith("688"):
            return f"sh.{code}"
        elif first_char in ['0','1','2','3'] or code.startswith("30"):
            return f"sz.{code}"
        elif code.startswith("8"):
            return f"bj.{code}"
    
    raise ValueError(f"无法识别的股票代码格式: {code}")

def get_index_components(index_func):
    """通用指数成分股获取函数(支持自动重试)[8,11](@ref)"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            rs = index_func()
            if rs.error_code == '0':
                return {validate_stock_code(item[1]) for item in rs.data}
        except Exception as e:
            print(f"指数查询第{attempt+1}次失败: {str(e)}")
            if attempt == max_retries - 1:
                return set()
    return set()

def get_all_stocks_with_industry():
    """获取全市场股票及指数成分标记"""
    try:
        # 登录系统
        if bs.login().error_code != '0':
            raise ConnectionError("Baostock登录失败")

        # 获取三大指数成分股(提前获取避免多次查询)[7,11](@ref)
        sz50_set = get_index_components(bs.query_sz50_stocks)
        hs300_set = get_index_components(bs.query_hs300_stocks)
        zz500_set = get_index_components(bs.query_zz500_stocks)
        print(f"指数成分股数量:SZ50={len(sz50_set)}, HS300={len(hs300_set)}, ZZ500={len(zz500_set)}")

        # 获取全市场股票(含最新交易状态)[13](@ref)
        query_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
        rs = bs.query_all_stock(day=query_date)
        if rs.error_code != '0':
            raise ValueError(f"股票查询失败:{rs.error_msg}")

        # 处理原始数据(增强状态过滤)[13](@ref)
        raw_df = pd.DataFrame(rs.data, columns=rs.fields)
        valid_df = raw_df[raw_df['tradeStatus'] == '1']  # 过滤有效交易股票
        print(f"初始数据量:{len(raw_df)},有效交易股票:{len(valid_df)}")

        # 代码格式转换
        valid_df['valid_code'] = valid_df['code'].apply(lambda x: validate_stock_code(x))
        
        # 获取行业数据(批量查询优化)[3,9](@ref)
        industry_rs = bs.query_stock_industry()
        industry_data = []
        fields = industry_rs.fields
        while industry_rs.next():
            row = industry_rs.get_row_data()
            try:
                industry_data.append({
                    'valid_code': validate_stock_code(row[1]),
                    'industry': row[3],
                    'industry_type': row[4]
                })
            except Exception as e:
                print(f"跳过无效行业数据: {row[1]}, 原因: {str(e)}")
        industry_df = pd.DataFrame(industry_data)

        # 合并数据
        merged_df = pd.merge(
            valid_df[['valid_code', 'code_name']].rename(columns={'valid_code':'code', 'code_name':'name'}),
            industry_df,
            left_on='code',
            right_on='valid_code',
            how='left'
        ).drop(columns=['valid_code'])

        # 标记指数成分[7,11](@ref)
        merged_df['issz50'] = merged_df['code'].isin(sz50_set).astype(int)
        merged_df['ishs300'] = merged_df['code'].isin(hs300_set).astype(int)
        merged_df['iszz500'] = merged_df['code'].isin(zz500_set).astype(int)

        # 拆分交易所代码
        merged_df['exchange'] = merged_df['code'].str.split('.').str[0]
        merged_df['code'] = merged_df['code'].str.split('.').str[1]
        
        # 填充缺失值
        merged_df.fillna({
            'industry': '未知',
            'industry_type': '未分类'
        }, inplace=True)

        return merged_df[[
            'code', 'name', 'industry', 'industry_type',
            'issz50', 'ishs300', 'iszz500', 'exchange'
        ]]

    except Exception as e:
        print(f"处理失败:{str(e)}")
        return None
    finally:
        bs.logout()


if __name__ == "__main__":
    df = get_all_stocks_with_industry()
    if df is not None:
        print("\n数据样例:")
        print(df[df['industry'] != '未知'].head(5))
        save_to_sqlite(df, r"../project/data/AAshares/code.db", "all_stocks")

0x02 获取日K线数据

import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import time
from datetime import datetime

def get_stock_codes(conn):
    """从数据库获取带交易所前缀的股票代码(结构不变)"""
    df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
    return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}" 
            for _, row in df.iterrows()]

def download_daily_data(code, start_date, end_date, max_retries=3):
    """下载日K线数据(仅修改frequency和fields)"""
    for attempt in range(max_retries):
        rs = bs.query_history_k_data_plus(
            code=code,
            fields="date,code,open,high,low,close,volume,amount",
            start_date=start_date,
            end_date=end_date,
            frequency="d",  # 改为日线[1,7](@ref)
            adjustflag="2"   # 保持前复权
        )
        if rs.error_code == '0':
            return rs
        time.sleep(1)
    return rs

def process_daily_data(rs):
    """处理日K线数据(移除时间合并步骤)"""
    data_list = []
    while rs.next():
        data_list.append(rs.get_row_data())
    df = pd.DataFrame(data_list, columns=rs.fields)
    
    # 数据清洗(仅保留日期处理)
    df['code'] = df['code'].str.split('.').str[1]
    numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'amount']
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
    return df.dropna().reset_index(drop=True)

def save_to_db(df, conn):
    # 将DataFrame转换为字典列表
    records = df.to_dict('records')

    for record in records:
        try:
            # 单条插入语句
            conn.execute(
                """
                INSERT OR IGNORE INTO stock_day_k 
                (code, date, open, close, high, low, volume, amount)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (record['code'], record['date'], 
                record['open'], record['close'],
                record['high'], record['low'],
                record['volume'], record['amount'])
            )
        except IntegrityError:
            print(f"主键冲突已跳过:{record['code']} - {record['date']}")
            continue
    conn.commit()

def main():
    # 初始化数据库(修改表结构)[5](@ref)
    conn = sqlite3.connect(r"../project/data/AAshares/code.db") 
    conn.executescript('''
        CREATE TABLE IF NOT EXISTS stock_day_k (
            date TEXT, code TEXT, open REAL, high REAL,
            low REAL, close REAL, volume INTEGER, amount REAL,
            PRIMARY KEY (date, code)
        );
        CREATE INDEX IF NOT EXISTS idx_day ON stock_day_k(code);
    ''')

    # 登录BaoStock(保持原登录逻辑)
    if bs.login().error_code != '0':
        print("登录失败")
        return

    try:
        codes = get_stock_codes(conn)
        start_date = '2025-01-01'  # 保持时间范围不变
        end_date = '2025-04-01'
        
        for code in tqdm(codes, desc="下载进度"):
            try:
                rs = download_daily_data(code, start_date, end_date)
                if rs.error_code != '0': continue
                
                df = process_daily_data(rs)
                if not df.empty:
                    save_to_db(df, conn)
                    
                time.sleep(0.5)  # 保持反爬策略不变[8](@ref)
                
            except Exception as e:
                print(f"{code} 下载失败: {str(e)}")
                
    finally:
        conn.close()
        bs.logout()

if __name__ == "__main__":
    main()

0x03 获取5分钟K线数据

import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import time
from datetime import datetime

def get_stock_codes(conn):
    """从数据库获取带交易所前缀的股票代码(网页6最佳实践)"""
    df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
    return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}" 
            for _, row in df.iterrows()]

def download_5min_data(code, start_date, end_date, max_retries=3):
    """下载5分钟K线数据(含错误重试机制,网页8优化)"""
    for attempt in range(max_retries):
        rs = bs.query_history_k_data_plus(
            code=code,
            fields="date,time,code,open,high,low,close,volume,amount",
            start_date=start_date,
            end_date=end_date,
            frequency="5",
            adjustflag="2"  # 前复权
        )
        if rs.error_code == '0':
            return rs
        time.sleep(1)
    return rs

def process_5min_data(rs):
    """处理5分钟数据格式(网页7时间处理优化)"""
    data_list = []
    while rs.next():
        data_list.append(rs.get_row_data())
    df = pd.DataFrame(data_list, columns=rs.fields)
    
    # 合并日期和时间字段
    df['datetime'] = df['time'].apply(
        lambda x: f"{x[:4]}-{x[4:6]}-{x[6:8]} {x[8:10]}:{x[10:12]}:00")
    df = df.drop(columns=['date', 'time'])
    df.rename(columns={'datetime': 'date'}, inplace=True)
    # 数据清洗
    df['code'] = df['code'].str.split('.').str[1]
    numeric_cols = ['open', 'high', 'low', 'close', 'volume','amount']
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
    return df.dropna().reset_index(drop=True)

def save_to_db(df, conn):
    # 将DataFrame转换为字典列表
    records = df.to_dict('records')

    for record in records:
        try:
            # 单条插入语句
            conn.execute(
                """
                INSERT OR IGNORE INTO stock_5min_k 
                (code, date, open, close, high, low, volume, amount)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (record['code'], record['date'], 
                record['open'], record['close'],
                record['high'], record['low'],
                record['volume'], record['amount'])
            )
        except IntegrityError:
            print(f"主键冲突已跳过:{record['code']} - {record['date']}")
            continue
    conn.commit()

def main():
    # 初始化数据库(网页6表结构)
    conn = sqlite3.connect(r"../project/data/AAshares/code.db") 
    conn.executescript('''
        CREATE TABLE IF NOT EXISTS stock_5min_k (
            date TEXT, code TEXT, open REAL, high REAL,
            low REAL, close REAL, volume INTEGER,amount REAL,
            PRIMARY KEY (date, code)
        );
        CREATE INDEX IF NOT EXISTS idx_5min ON stock_5min_k(code);
    ''')

    # 登录BaoStock(网页7规范)
    if bs.login().error_code != '0':
        print("登录失败")
        return

    try:
        codes = get_stock_codes(conn)
        start_date = '2025-04-19'
        end_date = datetime.now().strftime('%Y-%m-%d')  # 格式为 'YYYY-MM-DD'
        
        for code in tqdm(codes, desc="下载进度"):
            try:
                rs = download_5min_data(code, start_date, end_date)
                if rs.error_code != '0': continue
                
                df = process_5min_data(rs)
                if not df.empty:
                    save_to_db(df, conn)
                    
                time.sleep(0.5)  # 反爬策略(网页8建议)
                
            except Exception as e:
                print(f"{code} 下载失败: {str(e)}")
                
    finally:
        conn.close()
        bs.logout()

if __name__ == "__main__":
    main()

0x04 通过web将获取的数据读出来

import csv
from io import StringIO
from flask import Flask, jsonify, request, Response
import sqlite3
from datetime import datetime, timedelta, timezone
import pandas as pd
import requests

app = Flask(__name__)

def get_db_connection():
    conn = sqlite3.connect(r"../project/data/AAshares/code.db") 
    conn.row_factory = sqlite3.Row
    return conn

def getdatasqlite(code,table_name):
    """统一处理数据请求"""
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(f"""
        SELECT 
            date AS Date,         
            open AS Open,
            close AS Close,
            high AS High,
            low AS Low,
            volume AS Volume,
            amount AS Amount
        FROM {table_name} 
        WHERE code = ?
        ORDER BY date 
    """, (code,))
    
    data = cursor.fetchall()
    converted_data = []
    for row in data:
        # 按新的字段顺序解析
        date_str = row[0]  # 直接使用数据库返回的日期字符串[3,8](@ref)
        # 添加时区转换逻辑(假设数据库存储UTC时间)
        if(table_name=="stock_5min_k"):
            local_time = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")  # 添加秒并解析为本地时间
        else:
            local_time = datetime.strptime(date_str, "%Y-%m-%d")  # 添加秒并解析为本地时间      
        date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")  # 转换为UTC时间

        converted = {
            "Date": date_time_str,  # 统一时间格式[5,7](@ref)
            "Open": float(row[1]),
            "Close": float(row[2]),
            "High": float(row[3]),
            "Low": float(row[4]),
            "Volume": float(row[5]),  # 交易量
            "Amount": float(row[6])   # 修正字段索引[2,6](@ref)
        }
        converted_data.append(converted)
    return converted_data


def getdatahttp(code,table_name):
    # 请求第三方接口
    if code.startswith("6"):
        secid = "1." + code
    else:
        secid = "0." + code
    # end_date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
    end_date = (datetime.now()).strftime('%Y%m%d')
    if(table_name=="stock_5min_k"):
        api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=5&fqt=1&end={end_date}&lmt=1488"  # 替换为实际API地址
    else:
        api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=101&fqt=1&end={end_date}&lmt=1488"  # 替换为实际API地址

    try:
        response = requests.get(api_url, timeout=10)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        return {"error": f"API请求失败: {str(e)}"}, 500

    # 解析原始数据
    raw_data = response.json()
    klines = raw_data.get('data', {}).get('klines', [])
    
    # 数据清洗转换
    converted_data = []
    for item in klines:
        parts = item.split(',')
        if len(parts) < 11:
            continue  # 跳过无效数据
            
        # 按字段位置解析数据
        # 按字段位置解析数据
        if(table_name=="stock_5min_k"):
            local_time = datetime.strptime(parts[0] + ":00", "%Y-%m-%d %H:%M:%S")  # 添加秒并解析为本地时间
        else:
            local_time = datetime.strptime(parts[0] , "%Y-%m-%d")  # 添加秒并解析为本地时间
        date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")  # 转换为UTC时间
        converted = {
            "Date": date_time_str,
            "Open": float(parts[1]),
            "Close": float(parts[2]),
            "High": float(parts[3]),
            "Low": float(parts[4]),
            "Volume": float(parts[5]),  # 交易量
            "Amount": float(parts[6]),    # 交易额
        }
        converted_data.append(converted)
    return converted_data

def handle_request(code, table_name, format_type='json'):
    converted_data = getdatasqlite(code,table_name)
    # ...后续格式转换逻辑保持不变...
    # 返回数据
    if not converted_data:
        return {"error": "没有有效数据"}, 404

    # 根据要求格式化输出
    if format_type == 'csv':
        # 生成CSV
        si = StringIO()
        writer = csv.DictWriter(si, fieldnames=converted_data[0].keys())
        writer.writeheader()
        writer.writerows(converted_data)  # 写入多行数据
        return si.getvalue(), 200, {'Content-Type': 'text/csv'}
        
    return jsonify(converted_data[-1]), 200

@app.route('/dayapi', methods=['GET'])
def get_daystock_data():
    code = request.args.get('code')
    format_type = request.args.get('format', default='json')
    if not code:
        return jsonify({"error": "缺少参数: code"}), 400
    return handle_request(code, 'stock_day_k', format_type)

@app.route('/api', methods=['GET'])
def get_stock_data():
    code = request.args.get('code')
    format_type = request.args.get('format', default='json')
    if not code:
        return jsonify({"error": "缺少参数: code"}), 400
    return handle_request(code, 'stock_5min_k', format_type)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80, debug=True)

访问 http://ip/dayapi?code={Value} 获取最新的一根K线
访问 http://ip/dayapi?code={Value}&format=csv 返回的是历史的K线csv
如果成功了,则完成了基础数据的搭建,这个web是为了给Lean QuantConnect调用的。