需要的包
flask-login
werkzeug
itsdangerous
flask-mail
flask-bootstrap
flask-wtf
1、 app/models.py
from werkzeug.security import generate_password_hash, check_password_hash
from flask.ext.login import UserMixin,AnonymousUserMixin
from . import login_manager
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import current_app
from . import db
login_manager.anonymous_user = AnonymousUser
class User(UserMixin, db.Model):
tablename = 'users'
id = db.Column(db.Integer, primary_key = True)
email = db.Column(db.String(64), unique=True, index=True)
username = db.Column(db.String(64), unique=True, index=True)
password_hash = db.Column(db.String(128))
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
password_hash = db.Column(db.String(128))
confirmed = db.Column(db.Boolean, default=False)
name = db.Column(db.String(64))
location = db.Column(db.String(64))
about_me = db.Column(db.Text())
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
avatar_hash = db.Column(db.String(32))
def init(self, kwargs):
super(User, self).init(kwargs)
if self.role is None:
if self.email == current_app.config['FLASKY_ADMIN']:
self.role = Role.query.filter_by(permissions=0xff).first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()
if self.email is not None and self.avatar_hash is None:
self.avatar_hash = hashlib.md5(
self.email.encode('utf-8')).hexdigest()
self.followed.append(Follow(followed=self))
def change_email(self, token):
# ...
self.email = new_email
self.avatar_hash = hashlib.md5(
self.email.encode('utf-8')).hexdigest()
db.session.add(self)
return True
def gravatar(self, size=100, default='identicon', rating='g'):
if request.is_secure:
url = 'https://secure.gravatar.com/avatar'
else:
url = 'http://www.gravatar.com/avatar'
hash = hashlib.md5(self.email.encode('utf-8')).hexdigest()
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(url=url, hash=hash, size=size, default=default, rating=rating)
def ping(self):
self.last_seen = datetime.utcnow()
db.session.add(self)
def can(self, permissions):
return self.role is not None and (self.role.permissions & permissions) ==permissions
def is_administrator(self):
return self.can(Permission.ADMINISTER)
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
def generate_confirmation_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.id})
def confirm(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
return True
class AnonymousUser(AnonymousUserMixin):
def can(self, permissions):
return False
def is_administrator(self):
return False
class Permission:
FOLLOW = 0x01
COMMENT = 0x02
WRITE_ARTICLES = 0x04
MODERATE_COMMENTS = 0x08
ADMINISTER = 0x80
class Role(db.Model):
tablename = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')
@staticmethod
def insert_roles():
roles = {
'User': (Permission.FOLLOW |Permission.COMMENT |Permission.WRITE_ARTICLES, True),
'Moderator': (Permission.FOLLOW |Permission.COMMENT |Permission.WRITE_ARTICLES |Permission.MODERATE_COMMENTS, False),
'Administrator': (0xff, False)
}
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.permissions = roles[r][0]
role.default = roles[r][1]
db.session.add(role)
db.session.commit()
2、app/auth/init.py ##蓝本
from flask import Blueprint
auth = Blueprint('auth', name)
from . import views
3、 app/auth/views.py #视图
from flask.ext.login import logout_user,login_required
from flask import render_template, redirect, request, url_for, flash
from flask.ext.login import login_user,current_user
from . import auth
from ..models import User
from .forms import LoginForm
from ..email import send_email
@auth.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user is None:
user =User.query.filter_by(username=form.username .data).first()
if user is not None and user.verify_password(form.password.data):
login_user(user, form.remember_me.data)
return redirect(request.args.get('next') or url_for('main.index'))
flash('Invalid username or password.')
return render_template('auth/login.html', form=form)
@app.route('/secret')
@login_required
def secret():
return 'Only authenticated users are allowed!'
@auth.route('/logout')
@login_required
def logout():
logout_user()
flash('You have been logged out.')
return redirect(url_for('main.index'))
@auth.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(email=form.email.data,username=form.username.data,password=form.password.data)
db.session.add(user)
db.session.commit()
token = user.generate_confirmation_token()
send_email(user.email, 'Confirm Your Account','auth/email/confirm', user=user, token=token)
flash('A confirmation email has been sent to you by email.')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)
@auth.route('/confirm/<token>')
@login_required
def confirm(token):
if current_user.confirmed:
return redirect(url_for('main.index'))
if current_user.confirm(token):
flash('You have confirmed your account. Thanks!')
else:
flash('The confirmation link is invalid or has expired.')
return redirect(url_for('main.index'))
@auth.before_app_request
def before_request():
if current_user.is_authenticated:
current_user.ping()
if current_user.is_authenticated()
and not current_user.confirmed
and request.endpoint[:5] != 'auth.':
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
@auth.route('/unconfirmed')
def unconfirmed():
if current_user.is_anonymous() or current_user.confirmed:
return redirect(url_for('main.index'))
return render_template('auth/unconfirmed.html')
@auth.route('/confirm')
@login_required
def resend_confirmation():
token = current_user.generate_confirmation_token()
send_email(current_user.email, 'Confirm Your Account','auth/email/confirm', user=current_user, token=token)
flash('A new confirmation email has been sent to you by email.')
return redirect(url_for('main.index'))
4 app/__init__py #附加蓝本
from flask.ext.login import LoginManager
from .auth import auth as auth_blueprint
login_manager = LoginManager()
login_manager.session_protection = 'strong'
login_manager.login_view = 'auth.login'
def create_app(config_name):
# ...
login_manager.init_app(app)
app.register_blueprint(auth_blueprint, url_prefix='/auth')
return app
5 app/auth/forms.py #登录表单
from flask.ext.wtf import Form
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import Required, Length, Email, Regexp, EqualTo
from wtforms import ValidationError
from ..models import User
class LoginForm(Form):
email = StringField('Email', validators=[Required(), Length(1, 64),Email()])
password = PasswordField('Password', validators=[Required()])
remember_me = BooleanField('Keep me logged in')
submit = SubmitField('Log In')
class RegistrationForm(Form):
email = StringField('Email', validators=[Required(), Length(1, 64),Email()])
username = StringField('Username', validators=[Required(), Length(1, 64),Regexp('^[A-Za-z][A-Za-z0-9_.]*$', 0,'Usernames must have only letters:numbers, dots or underscores')])
password = PasswordField('Password', validators=[Required(), EqualTo('password2',message='Passwords must match.')])
password2 = PasswordField('Confirm password', validators=[Required()])
submit = SubmitField('Register')
def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError('Email already registered.')
def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError('Username already in use.')
6 app/templates/base.html ##基础模板中添加登录、登出
{% extends "bootstrap/base.html" %}
{% block title %}Flasky{% endblock %}
{% block head %}
{{ super() }}
<link rel="shortcut icon" href="{{ url_for('static', filename='favicon.ico') }}" type="image/x-icon">
<link rel="icon" href="{{ url_for('static', filename='favicon.ico') }}" type="image/x-icon">
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='styles.css') }}">
{% endblock %}
{% block navbar %}
<div class="navbar navbar-inverse" role="navigation">
<div class="container">
<div class="navbar-header">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand" href="{{ url_for('main.index') }}">Flasky</a>
</div>
<div class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<li><a href="{{ url_for('main.index') }}">Home</a></li>
{% if current_user.is_authenticated %}
<li><a href="{{ url_for('main.user', username=current_user.username) }}">Profile</a></li>
{% endif %}
</ul>
<ul class="nav navbar-nav navbar-right">
{% if current_user.can(Permission.MODERATE_COMMENTS) %}
<li><a href="{{ url_for('main.moderate') }}">Moderate Comments</a></li>
{% endif %}
{% if current_user.is_authenticated %}
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">
<img src="{{ current_user.gravatar(size=18) }}">
Account <b class="caret"></b>
</a>
<ul class="dropdown-menu">
<li><a href="{{ url_for('auth.change_password') }}">Change Password</a></li>
<li><a href="{{ url_for('auth.change_email_request') }}">Change Email</a></li>
<li><a href="{{ url_for('auth.logout') }}">Log Out</a></li>
</ul>
</li>
{% else %}
<li><a href="{{ url_for('auth.login') }}">Log In</a></li>
{% endif %}
</ul>
</div>
</div>
</div>
{% endblock %}
{% block content %}
<div class="container">
{% for message in get_flashed_messages() %}
<div class="alert alert-warning">
<button type="button" class="close" data-dismiss="alert">×</button>
{{ message }}
</div>
{% endfor %}
{% block page_content %}{% endblock %}
</div>
{% endblock %}
{% block scripts %}
{{ super() }}
{{ moment.include_moment() }}
{% endblock %}
7 app/templates/auth/login.html #登录页面
{% extends "base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky - Login{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>Login</h1>
</div>
<div class="col-md-4">
{{ wtf.quick_form(form) }}
</div>
<p>
New user?
<a href="{{ url_for('auth.register') }}">
Click here to register
</a>
</p>
{% endblock %}
8 app/templates/index.html ##首页
Hello,
{% if current_user.is_authenticated() %}
{{ current_user.username }}
{% else %}
Stranger
{% endif %}!
9 app/templates/auth/email/confirm.txt #确认邮件
Dear {{ user.username }},
Welcome to Flasky!
To confirm your account please click on the following link:
{{ url_for('auth.confirm', token=token, _external=True) }}
Sincerely,
The Flasky Team
Note: replies to this email address are not monitored.
10 app/decorators.py ##自定义权限修饰器
from functools import wraps
from flask import abort
from flask.ext.login import current_user
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, *kwargs):
if not current_user.can(permission):
abort(403)
return f(args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
return permission_required(Permission.ADMINISTER)(f)
11、app/main/init.py
@main.app_context_processor
def inject_permissions():
return dict(Permission=Permission)
12 app/main/views.py #主页面views.py
@main.route('/user/<username>')
def user(username):
user = User.query.filter_by(username=username).first_or_404()
page = request.args.get('page', 1, type=int)
pagination = user.posts.order_by(Post.timestamp.desc()).paginate(
page, per_page=current_app.config['FLASKY_POSTS_PER_PAGE'],
error_out=False)
posts = pagination.items
return render_template('user.html', user=user, posts=posts,
pagination=pagination)
13 app/templates/user.html
{% extends "base.html" %}
{% import "_macros.html" as macros %}
{% block title %}Flasky - {{ user.username }}{% endblock %}
{% block page_content %}
<div class="page-header">
<img class="img-rounded profile-thumbnail" src="{{ user.gravatar(size=256) }}">
<div class="profile-header">
<h1>{{ user.username }}</h1>
{% if user.name or user.location %}
<p>
{% if user.name %}{{ user.name }}
{% endif %}
{% if user.location %}
From <a href="http://maps.google.com/?q={{ user.location }}">{{ user.location }}</a>
{% endif %}
</p>
{% endif %}
{% if current_user.is_administrator() %}
<p><a href="mailto:{{ user.email }}">{{ user.email }}</a></p>
{% endif %}
{% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
<p>Member since {{ moment(user.member_since).format('L') }}. Last seen {{ moment(user.last_seen).fromNow() }}.</p>
<p>{{ user.posts.count() }} blog posts. {{ user.comments.count() }} comments.</p>
<p>
{% if current_user.can(Permission.FOLLOW) and user != current_user %}
{% if not current_user.is_following(user) %}
<a href="{{ url_for('.follow', username=user.username) }}" class="btn btn-primary">Follow</a>
{% else %}
<a href="{{ url_for('.unfollow', username=user.username) }}" class="btn btn-default">Unfollow</a>
{% endif %}
{% endif %}
<a href="{{ url_for('.followers', username=user.username) }}">Followers: <span class="badge">{{ user.followers.count() - 1 }}</span></a>
<a href="{{ url_for('.followed_by', username=user.username) }}">Following: <span class="badge">{{ user.followed.count() - 1 }}</span></a>
{% if current_user.is_authenticated and user != current_user and user.is_following(current_user) %}
| <span class="label label-default">Follows you</span>
{% endif %}
</p>
<p>
{% if user == current_user %}
<a class="btn btn-default" href="{{ url_for('.edit_profile') }}">Edit Profile</a>
{% endif %}
{% if current_user.is_administrator() %}
<a class="btn btn-danger" href="{{ url_for('.edit_profile_admin', id=user.id) }}">Edit Profile [Admin]</a>
{% endif %}
</p>
</div>
</div>
<h3>Posts by {{ user.username }}</h3>
{% include '_posts.html' %}
{% if pagination %}
<div class="pagination">
{{ macros.pagination_widget(pagination, '.user', username=user.username) }}
</div>
{% endif %}
{% endblock %}
14 app/main/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, BooleanField, SelectField,
SubmitField
from wtforms.validators import Required, Length, Email, Regexp
from wtforms import ValidationError
from flask_pagedown.fields import PageDownField
from ..models import Role, User
class NameForm(FlaskForm):
name = StringField('What is your name?', validators=[Required()])
submit = SubmitField('Submit')
class EditProfileForm(FlaskForm):
name = StringField('Real name', validators=[Length(0, 64)])
location = StringField('Location', validators=[Length(0, 64)])
about_me = TextAreaField('About me')
submit = SubmitField('Submit')
class EditProfileAdminForm(FlaskForm):
email = StringField('Email', validators=[Required(), Length(1, 64),
Email()])
username = StringField('Username', validators=[
Required(), Length(1, 64), Regexp('^[A-Za-z][A-Za-z0-9_.]*$', 0,
'Usernames must have only letters, '
'numbers, dots or underscores')])
confirmed = BooleanField('Confirmed')
role = SelectField('Role', coerce=int)
name = StringField('Real name', validators=[Length(0, 64)])
location = StringField('Location', validators=[Length(0, 64)])
about_me = TextAreaField('About me')
submit = SubmitField('Submit')
def init(self, user, *args, *kwargs):
super(EditProfileAdminForm, self).init(args, **kwargs)
self.role.choices = [(role.id, role.name)
for role in Role.query.order_by(Role.name).all()]
self.user = user
def validate_email(self, field):
if field.data != self.user.email and
User.query.filter_by(email=field.data).first():
raise ValidationError('Email already registered.')
def validate_username(self, field):
if field.data != self.user.username and
User.query.filter_by(username=field.data).first():
raise ValidationError('Username already in use.')
class PostForm(FlaskForm):
body = PageDownField("What's on your mind?", validators=[Required()])
submit = SubmitField('Submit')
class CommentForm(FlaskForm):
body = StringField('Enter your comment', validators=[Required()])
submit = SubmitField('Submit')
15 app/main/views.py
from flask import render_template, redirect, url_for, abort, flash, request,
current_app, make_response
from flask_login import login_required, current_user
from flask_sqlalchemy import get_debug_queries
from . import main
from .forms import EditProfileForm, EditProfileAdminForm, PostForm,
CommentForm
from .. import db
from ..models import Permission, Role, User, Post, Comment
from ..decorators import admin_required, permission_required
@main.route('/edit-profile', methods=['GET', 'POST'])
@login_required
def edit_profile():
form = EditProfileForm()
if form.validate_on_submit():
current_user.name = form.name.data
current_user.location = form.location.data
current_user.about_me = form.about_me.data
db.session.add(current_user)
flash('Your profile has been updated.')
return redirect(url_for('.user', username=current_user.username))
form.name.data = current_user.name
form.location.data = current_user.location
form.about_me.data = current_user.about_me
return render_template('edit_profile.html', form=form)
@main.route('/edit-profile/<int:id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_profile_admin(id):
user = User.query.get_or_404(id)
form = EditProfileAdminForm(user=user)
if form.validate_on_submit():
user.email = form.email.data
user.username = form.username.data
user.confirmed = form.confirmed.data
user.role = Role.query.get(form.role.data)
user.name = form.name.data
user.location = form.location.data
user.about_me = form.about_me.data
db.session.add(user)
flash('The profile has been updated.')
return redirect(url_for('.user', username=user.username))
form.email.data = user.email
form.username.data = user.username
form.confirmed.data = user.confirmed
form.role.data = user.role_id
form.name.data = user.name
form.location.data = user.location
form.about_me.data = user.about_me
return render_template('edit_profile.html', form=form, user=user)
16 app/templates/user.html
{% extends "base.html" %}
{% import "_macros.html" as macros %}
{% block title %}Flasky - {{ user.username }}{% endblock %}
{% block page_content %}
<div class="page-header">
<img class="img-rounded profile-thumbnail" src="{{ user.gravatar(size=256) }}">
<div class="profile-header">
<h1>{{ user.username }}</h1>
{% if user.name or user.location %}
<p>
{% if user.name %}{{ user.name }}
{% endif %}
{% if user.location %}
From <a href="http://maps.google.com/?q={{ user.location }}">{{ user.location }}</a>
{% endif %}
</p>
{% endif %}
{% if current_user.is_administrator() %}
<p><a href="mailto:{{ user.email }}">{{ user.email }}</a></p>
{% endif %}
{% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
<p>Member since {{ moment(user.member_since).format('L') }}. Last seen {{ moment(user.last_seen).fromNow() }}.</p>
<p>{{ user.posts.count() }} blog posts. {{ user.comments.count() }} comments.</p>
<p>
{% if current_user.can(Permission.FOLLOW) and user != current_user %}
{% if not current_user.is_following(user) %}
<a href="{{ url_for('.follow', username=user.username) }}" class="btn btn-primary">Follow</a>
{% else %}
<a href="{{ url_for('.unfollow', username=user.username) }}" class="btn btn-default">Unfollow</a>
{% endif %}
{% endif %}
<a href="{{ url_for('.followers', username=user.username) }}">Followers: <span class="badge">{{ user.followers.count() - 1 }}</span></a>
<a href="{{ url_for('.followed_by', username=user.username) }}">Following: <span class="badge">{{ user.followed.count() - 1 }}</span></a>
{% if current_user.is_authenticated and user != current_user and user.is_following(current_user) %}
| <span class="label label-default">Follows you</span>
{% endif %}
</p>
<p>
{% if user == current_user %}
<a class="btn btn-default" href="{{ url_for('.edit_profile') }}">Edit Profile</a>
{% endif %}
{% if current_user.is_administrator() %}
<a class="btn btn-danger" href="{{ url_for('.edit_profile_admin', id=user.id) }}">Edit Profile [Admin]</a>
{% endif %}
</p>
</div>
</div>
<h3>Posts by {{ user.username }}</h3>
{% include '_posts.html' %}
{% if pagination %}
<div class="pagination">
{{ macros.pagination_widget(pagination, '.user', username=user.username) }}
</div>
{% endif %}
{% endblock %}
数据库: python manage.py db upgrade
手动添加用户:
(venv) $ python manage.py shell
u = User(email='john@example.com', username='john', password='cat')
db.session.add(u)
db.session.commit()
插入角色
(venv) $ python manage.py shell
Role.insert_roles()
Role.query.all()