项目结构

1
2
3
4
5
6
7
8
9
项目名
├─ 包名 【源码】
│ ├─ static 【静态资源、文件】
│ ├─ templates 【模板】
│ ├─ __init__.py 【初始化】
├─ instance
├─ tests 【测试】
├─ MANIFEST.in
├─ setup.py

应用设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
from flask import Flask

def create_app(app_config = None):
# 创建 Flask 实例
app = Flask(__name__, instance_relative_config = True)

# 设置默认配置
app.config.from_mapping(
SECRET_KEY = 'DEV',
DATABASE = os.path.join(app.instance_path, 'sqlite')
)

if app_config is None:
app.config.from_pyfile('config.py', silent = True)
else:
app.config.from_mapping(app_config)

# 验证 instance 是否存在
try:
os.makedirs(app.instance_path)
except OSError:
pass

@app.route('/')
def main():
return '应用配置成功...'

# close_db、init_db_command 需要注册,否则无法使用
from . import db
db.init_app(app)

return app

定义操作数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#encoding: utf-8
import sqlite3

import click
from flask import current_app, g
from flask.cli import with_appcontext

def get_db():
if 'db' not in g:
g.db = sqlite3.connect(
current_app.config['DATABASE'],
detect_types = sqlite3.PARSE_DECLTYPES
)
g.db.row_factory = sqlite3.Row
return g.db

def close_db(e = None):
db = g.pop('db', None)

if db is not None:
db.close()

def init_db():
db = get_db()
with current_app.open_resource('schema.sql') as f:
db.executescript(f.read().decode('utf-8'))

@click.command('init_db') # 定义一个命令行
@with_appcontext
def init_db_command():
init_db()
click.echo('Initialized the database...')

函数应用注册

1
2
3
4
5
def init_app(app):
# 告诉 Flask 返回响应后进行清理时调用此函数
app.teardown_appcontext(close_db)
# 添加一个新的可以和 flask 一起工作的命令
app.cli.add_command(init_db_command)

schema.sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS post;

CREATE TABLE users(
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);

CREATE TABLE post(
id INTEGER PRIMARY KEY AUTOINCREMENT,
author_id INTEGER NOT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
title TEXT NOT NULL,
context TEXT NOT NULL,
FOREIGN KEY (author_id) REFERENCES users (id)
);

蓝图和视图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#encoding utf-8

import funtools

from flask import(
Blueprint, flash, g, redirect, render_template, request, sesstion, url_for
)
from werkzeug.security import check_password_hash, generate_passwork_hash

from db import get_db

bp = Blueprint('auth', __name__, url_prefix = '/auth')

'''
注册
2020-08-18
'''
@bp.route('/register', methods = ('GET', 'post'))
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
error = None

if not username:
error = '用户名不能为空...'
elif not password:
error = '密码不能为空...'
elif db.execute(
'select id from users where username = ?', (username,)
).fetchone() is not None:
error = '用户 {} 已经存在...'.format(username)

if error is None:
db.execute(
'insert into users (username, password) values (?, ?)',
(username, generate_password_hash(password))
)
db.commit()
return redirect(url_for('auth.login'))

flash(error)
return render_template('register.html')

'''
登录
2020-08-18
'''
@bp.route('/login', methods = ('GET', 'POST'))
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
error = None
user = db.execute(
'select * from users where username = ?', (username,)
).fetchone()

if user is None:
error = '用户名错误...'
elif not check_password_hash(user['password'], password):
error = '密码错误...'

if error is None:
session.clear()
session['user_id'] = user['id']
return redirect(url_for('index'))

flash(error)
return render_template('login.html')

'''
注销
2020-08-18
'''
@bp.route('logout')
def logout():
session.clear()
return redirect(url_for('index'))

'''
登录验证
2020-08-18
'''
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for('auth.login'))
return view(**kwargs)
return wrapped_view

'''
拦截函数
2020-08-18
'''
@bp.before_app_request
def load_logged_in_user():
user_id = session.get('user_id')

if user_id is None:
g.user = None
else:
g.user = get_db().execute(
'select * from user where id = ?', (user_id,)
).fetchone()

模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{% extends 'base.html' %}

{% block header %}
<h4>{% block title %} 列表 {% endblock %}</h4>
{% if g.user %}
<a class="add" href="{{ url_for('blog.create') }}">新建</a>
{% endif %}
{% endblock %}

{% block content %}
{% for post in posts %}
<article class="post">
<div class="title">
<h2>{{ post['title'] }}</h2>
<p>{{ post['username'] }} {{ post['created'].strftime('%Y-%m-%d') }}</p>
</div>
{% if g.user['id'] == post['author_id'] %}
<a class="edit" href="{{ url_for('blog.update', id = post['id']) }}">修改</a>
{% endif %}
<p class="context">{{ post['context'] }}</p>
</article>
{% if not loop.last %}
<hr/>
{% endif %}
{% endfor %}
{% endblock %}

测试

pip install pytest coverage

conftest.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
import tempfile

import pytest
from flaskr import create_app
from flaskr.db import get_db, init_db

with open(os.path.join(os.path.dirname(__file__), 'data.sql'), 'rb') as f:
_data_sql = f.read().decode('utf8')


@pytest.fixture
def app():
db_fd, db_path = tempfile.mkstemp()

app = create_app({
'TESTING': True,
'DATABASE': db_path,
})

with app.app_context():
init_db()
get_db().executescript(_data_sql)

yield app

os.close(db_fd)
os.unlink(db_path)


@pytest.fixture
def client(app):
return app.test_client()


@pytest.fixture
def runner(app):
return app.test_cli_runner()

class AuthActions(object):
def __init__(self, client):
self._client = client

def login(self, username='test', password='test'):
return self._client.post(
'/auth/login',
data={'username': username, 'password': password}
)

def logout(self):
return self._client.get('/auth/logout')


@pytest.fixture
def auth(client):
return AuthActions(client)

test_factory.py

1
2
3
4
5
6
7
8
9
10
11
from flaskr import create_app


def test_config():
assert not create_app().testing
assert create_app({'TESTING': True}).testing


def test_hello(client):
response = client.get('/hello')
assert response.data == b'Hello, World!'

test_db.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import sqlite3

import pytest
from flaskr.db import get_db


def test_get_close_db(app):
with app.app_context():
db = get_db()
assert db is get_db()

with pytest.raises(sqlite3.ProgrammingError) as e:
db.execute('SELECT 1')

assert 'closed' in str(e.value)

def test_init_db_command(runner, monkeypatch):
class Recorder(object):
called = False

def fake_init_db():
Recorder.called = True

monkeypatch.setattr('flaskr.db.init_db', fake_init_db)
result = runner.invoke(args=['init-db'])
assert 'Initialized' in result.output
assert Recorder.called

test_auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import pytest
from flask import g, session
from flaskr.db import get_db


def test_register(client, app):
assert client.get('/auth/register').status_code == 200
response = client.post(
'/auth/register', data={'username': 'a', 'password': 'a'}
)
assert 'http://localhost/auth/login' == response.headers['Location']

with app.app_context():
assert get_db().execute(
"select * from user where username = 'a'",
).fetchone() is not None


@pytest.mark.parametrize(('username', 'password', 'message'), (
('', '', b'Username is required.'),
('a', '', b'Password is required.'),
('test', 'test', b'already registered'),
))
def test_register_validate_input(client, username, password, message):
response = client.post(
'/auth/register',
data={'username': username, 'password': password}
)
assert message in response.data

def test_login(client, auth):
assert client.get('/auth/login').status_code == 200
response = auth.login()
assert response.headers['Location'] == 'http://localhost/'

with client:
client.get('/')
assert session['user_id'] == 1
assert g.user['username'] == 'test'


@pytest.mark.parametrize(('username', 'password', 'message'), (
('a', 'test', b'Incorrect username.'),
('test', 'a', b'Incorrect password.'),
))
def test_login_validate_input(auth, username, password, message):
response = auth.login(username, password)
assert message in response.data

运行测试

1
2
3
4
5
6
7
[tool:pytest]
testpaths = tests

[coverage:run]
branch = True
source =
blog_source

pytest
coverage report
coverage html

部署

python setup.py bdist_wheel

文件名由项目名称、版 本号和一些关于项目安装要求的标记组成。

配置秘钥

python -c ‘import os; print(os.urandom(16))’

产品服务器

pip install waitress

waitress-serve –call ‘app:create_app’

常用扩展

pip install flask
pip install flask-login
pip install flask-openid
pip install flask-mail
pip install flask-sqlalchemy
pip install sqlalchemy-migrate
pip install flask-whooshalchemy
pip install flask-wtf
pip install flask-babel
pip install guess_language
pip install flipflop
pip install coverage

  • pip install –no-deps lamson chardet flask-mail