
每次查询数据都要连接、关闭数据库,频繁操作导致性能损耗严重,因此引入数据库连接池机制对查询数据过程进行优化。
下载并引入dbunits模块实现数据库连接池配置:
(.venv) PS D:\PythonProject2> pip install dbutils
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Collecting dbutils
Downloading https://pypi.tuna.tsinghua.edu.cn/packages/98/a7/4fe7da3082241028e62c390bf9357d60522dd03d9329e3a560045fe14dfd/dbutils-3.1.2-py3-none-any.whl (32 kB)
Installing collected packages: dbutils
Successfully installed dbutils-3.1.2
[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip
(.venv) PS D:\PythonProject2>
from dbutils.pooled_db import PooledDB
创建连接池:
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=10, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
host='127.0.0.1', port=3306, user='root', passwd='rootpassword', charset="utf8", db='day20'
)
连接池优化性能:
连接池配置最大10个连接,动态创建与回收闲置连接
阻塞模式等待连接释放,闲置连接超限自动清理
通过 conn = POOL.connection() 从池中获取连接,通过 conn.close() 将连接交还到连接池
完整代码如下:
import hashlib
import pymysql
from flask import Flask, request, jsonify
from dbutils.pooled_db import PooledDB
app = Flask(__name__)
# 创建连接池
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=10, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
host='127.0.0.1', port=3306, user='root', passwd='rootpassword', charset="utf8", db='day20'
)
# 连接MySQL,执行SQL查询语句,返回结果
def fetch_one(sql,params):
# conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='rootpassword', charset="utf8", db='day20')
conn = POOL.connection()
cursor = conn.cursor()
# cursor.execute("select * from user where token=%s", [token, ])
cursor.execute(sql, params)
result = cursor.fetchone()
cursor.close()
conn.close()# 不是关闭连接,而是将连接交还到连接池中
print("result", result)
return result
@app.route("/bili", methods=["POST"])
def bili():
"""
请求URL中携带/bili?token=...
请求的数据格式要求:{ "ordered_string":"......" }
:return:
"""
# 1.token是否为空
token = request.args.get("token")
print("token", token)
if not token:
return jsonify({"status":False , 'error':"认证失败"})
# 从数据库表中读取授权码的情况
# 连接MYSQL执行命令,验证token是否合法
result = fetch_one("select * from user where token=%s", [token, ])
if not result:
return jsonify({"status": False, "error": "认证失败"})
ordered_string = request.json.get("ordered_string")
if not ordered_string:
return jsonify({"status": False, "error": "参数错误"})
# 调用核心算法,生成sign签名
encrypt_string = ordered_string + "560c52ccd288fed045859ed18bffd973"
obj = hashlib.md5(encrypt_string.encode('utf-8'))
sign = obj.hexdigest()
# 返回签名
return jsonify({"status": True, "data": sign})
if __name__ == '__main__':
app.run(host="127.0.0.1",port=5000)
通过postman调用带token认证的bili方法进行测试: