本文基于 SQLite 轻量级数据库与 QuantConnect Lean 框架,构建了一套支持高频数据回测、策略无缝实盘部署的本地化解决方案。
0x01获取所有股票
import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import datetime
def save_to_sqlite(df, db_path, table_name):
"""将DataFrame存储到SQLite数据库"""
try:
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists='replace', index=False)
print(f"数据成功保存至 {db_path} 的 {table_name} 表")
except Exception as e:
print(f"数据库操作失败:{str(e)}")
finally:
if 'conn' in locals() and conn:
conn.close()
def validate_stock_code(code):
"""规范股票代码格式为交易所.6位数字(增强创业板/科创板处理)[7,12](@ref)"""
if not isinstance(code, str):
code = str(code)
code = code.strip().lower().replace(" ", "")
if code.startswith(("sh.", "sz.", "bj.")):
prefix, num = code.split(".")
return f"{prefix}.{num.zfill(6)}"
if code.isdigit():
code = code.zfill(6)
first_char = code[0]
if first_char in ['5','6','9','7'] or code.startswith("688"):
return f"sh.{code}"
elif first_char in ['0','1','2','3'] or code.startswith("30"):
return f"sz.{code}"
elif code.startswith("8"):
return f"bj.{code}"
raise ValueError(f"无法识别的股票代码格式: {code}")
def get_index_components(index_func):
"""通用指数成分股获取函数(支持自动重试)[8,11](@ref)"""
max_retries = 3
for attempt in range(max_retries):
try:
rs = index_func()
if rs.error_code == '0':
return {validate_stock_code(item[1]) for item in rs.data}
except Exception as e:
print(f"指数查询第{attempt+1}次失败: {str(e)}")
if attempt == max_retries - 1:
return set()
return set()
def get_all_stocks_with_industry():
"""获取全市场股票及指数成分标记"""
try:
# 登录系统
if bs.login().error_code != '0':
raise ConnectionError("Baostock登录失败")
# 获取三大指数成分股(提前获取避免多次查询)[7,11](@ref)
sz50_set = get_index_components(bs.query_sz50_stocks)
hs300_set = get_index_components(bs.query_hs300_stocks)
zz500_set = get_index_components(bs.query_zz500_stocks)
print(f"指数成分股数量:SZ50={len(sz50_set)}, HS300={len(hs300_set)}, ZZ500={len(zz500_set)}")
# 获取全市场股票(含最新交易状态)[13](@ref)
query_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
rs = bs.query_all_stock(day=query_date)
if rs.error_code != '0':
raise ValueError(f"股票查询失败:{rs.error_msg}")
# 处理原始数据(增强状态过滤)[13](@ref)
raw_df = pd.DataFrame(rs.data, columns=rs.fields)
valid_df = raw_df[raw_df['tradeStatus'] == '1'] # 过滤有效交易股票
print(f"初始数据量:{len(raw_df)},有效交易股票:{len(valid_df)}")
# 代码格式转换
valid_df['valid_code'] = valid_df['code'].apply(lambda x: validate_stock_code(x))
# 获取行业数据(批量查询优化)[3,9](@ref)
industry_rs = bs.query_stock_industry()
industry_data = []
fields = industry_rs.fields
while industry_rs.next():
row = industry_rs.get_row_data()
try:
industry_data.append({
'valid_code': validate_stock_code(row[1]),
'industry': row[3],
'industry_type': row[4]
})
except Exception as e:
print(f"跳过无效行业数据: {row[1]}, 原因: {str(e)}")
industry_df = pd.DataFrame(industry_data)
# 合并数据
merged_df = pd.merge(
valid_df[['valid_code', 'code_name']].rename(columns={'valid_code':'code', 'code_name':'name'}),
industry_df,
left_on='code',
right_on='valid_code',
how='left'
).drop(columns=['valid_code'])
# 标记指数成分[7,11](@ref)
merged_df['issz50'] = merged_df['code'].isin(sz50_set).astype(int)
merged_df['ishs300'] = merged_df['code'].isin(hs300_set).astype(int)
merged_df['iszz500'] = merged_df['code'].isin(zz500_set).astype(int)
# 拆分交易所代码
merged_df['exchange'] = merged_df['code'].str.split('.').str[0]
merged_df['code'] = merged_df['code'].str.split('.').str[1]
# 填充缺失值
merged_df.fillna({
'industry': '未知',
'industry_type': '未分类'
}, inplace=True)
return merged_df[[
'code', 'name', 'industry', 'industry_type',
'issz50', 'ishs300', 'iszz500', 'exchange'
]]
except Exception as e:
print(f"处理失败:{str(e)}")
return None
finally:
bs.logout()
if __name__ == "__main__":
df = get_all_stocks_with_industry()
if df is not None:
print("\n数据样例:")
print(df[df['industry'] != '未知'].head(5))
save_to_sqlite(df, r"../project/data/AAshares/code.db", "all_stocks")
0x02 获取日K线数据
import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import time
from datetime import datetime
def get_stock_codes(conn):
"""从数据库获取带交易所前缀的股票代码(结构不变)"""
df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}"
for _, row in df.iterrows()]
def download_daily_data(code, start_date, end_date, max_retries=3):
"""下载日K线数据(仅修改frequency和fields)"""
for attempt in range(max_retries):
rs = bs.query_history_k_data_plus(
code=code,
fields="date,code,open,high,low,close,volume,amount",
start_date=start_date,
end_date=end_date,
frequency="d", # 改为日线[1,7](@ref)
adjustflag="2" # 保持前复权
)
if rs.error_code == '0':
return rs
time.sleep(1)
return rs
def process_daily_data(rs):
"""处理日K线数据(移除时间合并步骤)"""
data_list = []
while rs.next():
data_list.append(rs.get_row_data())
df = pd.DataFrame(data_list, columns=rs.fields)
# 数据清洗(仅保留日期处理)
df['code'] = df['code'].str.split('.').str[1]
numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'amount']
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
return df.dropna().reset_index(drop=True)
def save_to_db(df, conn):
# 将DataFrame转换为字典列表
records = df.to_dict('records')
for record in records:
try:
# 单条插入语句
conn.execute(
"""
INSERT OR IGNORE INTO stock_day_k
(code, date, open, close, high, low, volume, amount)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(record['code'], record['date'],
record['open'], record['close'],
record['high'], record['low'],
record['volume'], record['amount'])
)
except IntegrityError:
print(f"主键冲突已跳过:{record['code']} - {record['date']}")
continue
conn.commit()
def main():
# 初始化数据库(修改表结构)[5](@ref)
conn = sqlite3.connect(r"../project/data/AAshares/code.db")
conn.executescript('''
CREATE TABLE IF NOT EXISTS stock_day_k (
date TEXT, code TEXT, open REAL, high REAL,
low REAL, close REAL, volume INTEGER, amount REAL,
PRIMARY KEY (date, code)
);
CREATE INDEX IF NOT EXISTS idx_day ON stock_day_k(code);
''')
# 登录BaoStock(保持原登录逻辑)
if bs.login().error_code != '0':
print("登录失败")
return
try:
codes = get_stock_codes(conn)
start_date = '2025-01-01' # 保持时间范围不变
end_date = '2025-04-01'
for code in tqdm(codes, desc="下载进度"):
try:
rs = download_daily_data(code, start_date, end_date)
if rs.error_code != '0': continue
df = process_daily_data(rs)
if not df.empty:
save_to_db(df, conn)
time.sleep(0.5) # 保持反爬策略不变[8](@ref)
except Exception as e:
print(f"{code} 下载失败: {str(e)}")
finally:
conn.close()
bs.logout()
if __name__ == "__main__":
main()
0x03 获取5分钟K线数据
import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import time
from datetime import datetime
def get_stock_codes(conn):
"""从数据库获取带交易所前缀的股票代码(网页6最佳实践)"""
df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
return [f"{row['exchange'].lower().strip()}.{str(row['code']).zfill(6)}"
for _, row in df.iterrows()]
def download_5min_data(code, start_date, end_date, max_retries=3):
"""下载5分钟K线数据(含错误重试机制,网页8优化)"""
for attempt in range(max_retries):
rs = bs.query_history_k_data_plus(
code=code,
fields="date,time,code,open,high,low,close,volume,amount",
start_date=start_date,
end_date=end_date,
frequency="5",
adjustflag="2" # 前复权
)
if rs.error_code == '0':
return rs
time.sleep(1)
return rs
def process_5min_data(rs):
"""处理5分钟数据格式(网页7时间处理优化)"""
data_list = []
while rs.next():
data_list.append(rs.get_row_data())
df = pd.DataFrame(data_list, columns=rs.fields)
# 合并日期和时间字段
df['datetime'] = df['time'].apply(
lambda x: f"{x[:4]}-{x[4:6]}-{x[6:8]} {x[8:10]}:{x[10:12]}:00")
df = df.drop(columns=['date', 'time'])
df.rename(columns={'datetime': 'date'}, inplace=True)
# 数据清洗
df['code'] = df['code'].str.split('.').str[1]
numeric_cols = ['open', 'high', 'low', 'close', 'volume','amount']
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
return df.dropna().reset_index(drop=True)
def save_to_db(df, conn):
# 将DataFrame转换为字典列表
records = df.to_dict('records')
for record in records:
try:
# 单条插入语句
conn.execute(
"""
INSERT OR IGNORE INTO stock_5min_k
(code, date, open, close, high, low, volume, amount)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(record['code'], record['date'],
record['open'], record['close'],
record['high'], record['low'],
record['volume'], record['amount'])
)
except IntegrityError:
print(f"主键冲突已跳过:{record['code']} - {record['date']}")
continue
conn.commit()
def main():
# 初始化数据库(网页6表结构)
conn = sqlite3.connect(r"../project/data/AAshares/code.db")
conn.executescript('''
CREATE TABLE IF NOT EXISTS stock_5min_k (
date TEXT, code TEXT, open REAL, high REAL,
low REAL, close REAL, volume INTEGER,amount REAL,
PRIMARY KEY (date, code)
);
CREATE INDEX IF NOT EXISTS idx_5min ON stock_5min_k(code);
''')
# 登录BaoStock(网页7规范)
if bs.login().error_code != '0':
print("登录失败")
return
try:
codes = get_stock_codes(conn)
start_date = '2025-04-19'
end_date = datetime.now().strftime('%Y-%m-%d') # 格式为 'YYYY-MM-DD'
for code in tqdm(codes, desc="下载进度"):
try:
rs = download_5min_data(code, start_date, end_date)
if rs.error_code != '0': continue
df = process_5min_data(rs)
if not df.empty:
save_to_db(df, conn)
time.sleep(0.5) # 反爬策略(网页8建议)
except Exception as e:
print(f"{code} 下载失败: {str(e)}")
finally:
conn.close()
bs.logout()
if __name__ == "__main__":
main()
0x04 通过web将获取的数据读出来
import csv
from io import StringIO
from flask import Flask, jsonify, request, Response
import sqlite3
from datetime import datetime, timedelta, timezone
import pandas as pd
import requests
app = Flask(__name__)
def get_db_connection():
conn = sqlite3.connect(r"../project/data/AAshares/code.db")
conn.row_factory = sqlite3.Row
return conn
def getdatasqlite(code,table_name):
"""统一处理数据请求"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
SELECT
date AS Date,
open AS Open,
close AS Close,
high AS High,
low AS Low,
volume AS Volume,
amount AS Amount
FROM {table_name}
WHERE code = ?
ORDER BY date
""", (code,))
data = cursor.fetchall()
converted_data = []
for row in data:
# 按新的字段顺序解析
date_str = row[0] # 直接使用数据库返回的日期字符串[3,8](@ref)
# 添加时区转换逻辑(假设数据库存储UTC时间)
if(table_name=="stock_5min_k"):
local_time = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") # 添加秒并解析为本地时间
else:
local_time = datetime.strptime(date_str, "%Y-%m-%d") # 添加秒并解析为本地时间
date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # 转换为UTC时间
converted = {
"Date": date_time_str, # 统一时间格式[5,7](@ref)
"Open": float(row[1]),
"Close": float(row[2]),
"High": float(row[3]),
"Low": float(row[4]),
"Volume": float(row[5]), # 交易量
"Amount": float(row[6]) # 修正字段索引[2,6](@ref)
}
converted_data.append(converted)
return converted_data
def getdatahttp(code,table_name):
# 请求第三方接口
if code.startswith("6"):
secid = "1." + code
else:
secid = "0." + code
# end_date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
end_date = (datetime.now()).strftime('%Y%m%d')
if(table_name=="stock_5min_k"):
api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=5&fqt=1&end={end_date}&lmt=1488" # 替换为实际API地址
else:
api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=101&fqt=1&end={end_date}&lmt=1488" # 替换为实际API地址
try:
response = requests.get(api_url, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
return {"error": f"API请求失败: {str(e)}"}, 500
# 解析原始数据
raw_data = response.json()
klines = raw_data.get('data', {}).get('klines', [])
# 数据清洗转换
converted_data = []
for item in klines:
parts = item.split(',')
if len(parts) < 11:
continue # 跳过无效数据
# 按字段位置解析数据
# 按字段位置解析数据
if(table_name=="stock_5min_k"):
local_time = datetime.strptime(parts[0] + ":00", "%Y-%m-%d %H:%M:%S") # 添加秒并解析为本地时间
else:
local_time = datetime.strptime(parts[0] , "%Y-%m-%d") # 添加秒并解析为本地时间
date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # 转换为UTC时间
converted = {
"Date": date_time_str,
"Open": float(parts[1]),
"Close": float(parts[2]),
"High": float(parts[3]),
"Low": float(parts[4]),
"Volume": float(parts[5]), # 交易量
"Amount": float(parts[6]), # 交易额
}
converted_data.append(converted)
return converted_data
def handle_request(code, table_name, format_type='json'):
converted_data = getdatasqlite(code,table_name)
# ...后续格式转换逻辑保持不变...
# 返回数据
if not converted_data:
return {"error": "没有有效数据"}, 404
# 根据要求格式化输出
if format_type == 'csv':
# 生成CSV
si = StringIO()
writer = csv.DictWriter(si, fieldnames=converted_data[0].keys())
writer.writeheader()
writer.writerows(converted_data) # 写入多行数据
return si.getvalue(), 200, {'Content-Type': 'text/csv'}
return jsonify(converted_data[-1]), 200
@app.route('/dayapi', methods=['GET'])
def get_daystock_data():
code = request.args.get('code')
format_type = request.args.get('format', default='json')
if not code:
return jsonify({"error": "缺少参数: code"}), 400
return handle_request(code, 'stock_day_k', format_type)
@app.route('/api', methods=['GET'])
def get_stock_data():
code = request.args.get('code')
format_type = request.args.get('format', default='json')
if not code:
return jsonify({"error": "缺少参数: code"}), 400
return handle_request(code, 'stock_5min_k', format_type)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80, debug=True)
访问 http://ip/dayapi?code={Value} 获取最新的一根K线
访问 http://ip/dayapi?code={Value}&format=csv 返回的是历史的K线csv
如果成功了,则完成了基础数据的搭建,这个web是为了给Lean QuantConnect调用的。