Python Flask
XWenXiang 人气:01. 数据库连接池
使用 pymsql 链接数据库
导入:pip3 install dbutils
pool.py 创建数据库连接池
from dbutils.pooled_db import PooledDB import pymysql POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。 ping=0, # ping MySQL服务端,检查是否服务可用。 host='127.0.0.1', port=3306, user='???', password='???', database='???', charset='utf8' )
使用连接池
from flask import Flask # 导入类对象 from pool import POOL import pymysql app = Flask(__name__) app.debug = True # 定义在外面,全局conn,cursor会出现数据错乱,正常情况应该放在视图函数内部建立链接,但是会影响 # 性能,所以需要建立数据库连接池,限制连接的数量 # conn = pymysql.connect( # host='127.0.0.1', # port=3306, # database='数据库', # user='用户', # password='密码' # ) # cursor = conn.cursor() @app.route('/user') def user(): # 从池中拿链接,创建出cursor对象 conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from index_user') print(cursor.fetchall()) # 并未关闭连接,只是将连接重新放回池子 conn.close() return 'hello' @app.route('/username') def username(): # 该方式对数据库的连接会一直增加,有可能造成数据压力过大 conn = pymysql.connect( host='127.0.0.1', port=3306, database='???', user='???', password='???') cursor = conn.cursor() cursor.execute('select username from index_user') print(cursor.fetchall()) return 'hello' if __name__ == '__main__': app.run()
2. wtfroms
其类似于 django 的 forms 组件,不过在 flask 中使用第三方 wtfroms 完成 forms 的功能。
安装:pip3 install wtforms
from flask import Flask,request,render_template app = Flask(__name__) from wtforms.fields import core from wtforms import Form from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件 render_kw={'class': 'form-control'} ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) class RegisterForm(Form): name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='alex' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validators.DataRequired(message='重复密码不能为空.'), validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # “1” “2” ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证 @app.route('/login',methods=['GET','POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run()
login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
register.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body> </html>
3. 信号
Flask 信号,是由 signal 翻译过来的。
Flask框架中的信号基于 blinker,其主要就是让开发者可是在 Flask 请求过程中定制一些用户行为。
在某种情况下会触发某个函数执行( aop理念,面向切面编程的理念 ),例如在用户注册后统计用户的数量
安装:pip3 install blinker
3.1 内置信号
# 请求到来前执行
request_started = _signals.signal('request-started')# 请求结束后执行
request_finished = _signals.signal('request-finished')# 模板渲染前执行
before_render_template = _signals.signal('before-render-template')# 模板渲染后执行
template_rendered = _signals.signal('template-rendered')# 请求执行出现异常时执行
got_request_exception = _signals.signal('got-request-exception')# 请求执行完毕后自动执行(无论成功与否)
request_tearing_down = _signals.signal('request-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文push时执行
appcontext_pushed = _signals.signal('appcontext-pushed')# 应用上下文pop时执行
appcontext_popped = _signals.signal('appcontext-popped')# 调用flask在其中添加数据时,自动触发
message_flashed = _signals.signal('message-flashed')
3.2 使用信号
from flask import Flask, render_template from flask import signals app = Flask(__name__) app.debug = True # 第一步:编写需要执行的函数 def render_before(*args, **kwargs): print(args) print(kwargs) print('模板渲染前执行') # 第二步:绑定信号 signals.before_render_template.connect(render_before) # 模板渲染前执行 # 第三步:自动触发信号 @app.route('/home') def home(): return render_template('home.html') if __name__ == '__main__': app.run()
3.3 自定义信号
from flask import Flask from flask.signals import _signals app = Flask(import_name=__name__) # 第一步:自定义信号 x = _signals.signal('x') # 第二步:编写函数 def func(sender, *args, **kwargs): print(sender) print(args) print(kwargs) # 第三步:自定义信号中注册函数 x.connect(func) @app.route("/index") def index(): # 第四步:触发信号 x.send('123123', k1='v1') return 'Index' if __name__ == '__main__': app.run()
4. 多app应用
在一个 flask 项目,可以支持多个 app 对象。
from flask import Flask from werkzeug.middleware.dispatcher import DispatcherMiddleware from werkzeug.serving import run_simple # 创建多个 app app1 = Flask('app01') app2 = Flask('app02') # 第一个 app 的根路径 @app1.route('/') def home(): return "app1" # 第二个 app 的根路径 @app2.route('/') def sec(): return "app2" # 访问 /sec 才是访问第二个 app 的根路径 dm = DispatcherMiddleware(app1, { '/sec': app2, }) if __name__ == "__main__": # 请求来了,会执行 dm(),触发DispatcherMiddleware的__call__ run_simple('localhost', 5000, dm)
5. flask-script
原本启动项目右键运行即可,若要以命令的方式,类似于 django 的 python3 manage.py runserver
命令可以使用 flask-script 实现。
安装:pip3 install flask-script
5.1 快速使用
from flask_script import Manager app = Flask(__name__) manager=Manager(app) if __name__ == '__main__': manager.run() # 以后在执行,直接:python3 manage.py runserver # python3 manage.py runserver --help
有可能会出现如下错误,由于 flask 的版本太高只需要将 flask 的版本降低即可
Traceback (most recent call last):
File "manage.py", line 3, in <module>
from flask_script import Manager
File "D:\python36\lib\site-packages\flask_script\__init__.py", line 15, in <module>
from flask._compat import text_type
ModuleNotFoundError: No module named 'flask._compat'
5.2 自定制命令
可以编一个功能,通过execl,把execl的数据存同步到某个数据表中得命令
from flask import Flask from flask_script import Manager app = Flask('__name__') # 基本使用,多了runserver命令 manager = Manager(app) @manager.command def dbinit(): """ python manage.py dbinit """ print("数据库初始化完成") @manager.option('-n', '--name', dest='name') @manager.option('-a', '--age', dest='age') def user(name, age): """ 自定义命令(-n也可以写成--name) 执行: python manage.py user -n xwx -a 12 执行: python manage.py user --name xwx --age 12 """ print(name, age) @app.route('/index') def index(): return "index" if __name__ == "__main__": manager.run()
加载全部内容