serve-ssh-certs/servecerts/auth.py

147 lines
4 KiB
Python
Raw Normal View History

import functools
from flask import (
Blueprint, flash, g, redirect, render_template, request, session, url_for
)
from werkzeug.security import check_password_hash, generate_password_hash
from servecerts.db import get_db
from flask_debugtoolbar import DebugToolbarExtension
import logging
bp = Blueprint('auth', __name__, url_prefix='/auth')
@bp.route('/register', methods=('GET', 'POST'))
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
fullname = request.form['fullname']
email = request.form['email']
db = get_db()
error = None
if not username:
error = 'Username is required.'
elif not password:
error = 'Password is required.'
elif not fullname:
error = 'Fullname is required.'
elif not email:
error = 'Email is required.'
elif db.execute(
'SELECT id FROM user WHERE username = ?', (username,)
).fetchone() is not None:
error = 'User {} is already registered.'.format(username)
if error is None:
db.execute(
'INSERT INTO user (username, password, fullname, email) VALUES (?, ?, ?, ?)',
(username, generate_password_hash(password), fullname, email)
)
db.commit()
return redirect(url_for('auth.login'))
flash(error)
return render_template('auth/register.html')
@bp.route('/<int:id>/update', methods=('GET', 'POST'))
def update(id):
user = get_user(id)
if request.method == 'POST':
username = request.form['username']
fullname = request.form['fullname']
password = request.form['password']
email = request.form['email']
error = None
if not username:
error = 'Username is required.'
elif not password:
error = 'Password is required.'
elif not fullname:
error = 'Fullname is required.'
elif not email:
error = 'Email is required.'
if error is not None:
flash(error)
else:
db = get_db()
db.execute(
'UPDATE user SET username = ?, password = ?, fullname = ?, email = ?'
' WHERE id = ?',
(username, generate_password_hash(password), fullname, email, id)
)
db.commit()
return redirect(url_for('pubkeys.index'))
return render_template('auth/update.html', user=user)
@bp.route('/login', methods=('GET', 'POST'))
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
error = None
user = db.execute(
'SELECT * FROM user WHERE username = ?', (username,)
).fetchone()
if user is None:
error = 'Incorrect username.'
elif not check_password_hash(user['password'], password):
error = 'Incorrect password.'
if error is None:
session.clear()
session['user_id'] = user['id']
return redirect(url_for('pubkeys.index'))
flash(error)
return render_template('auth/login.html')
@bp.before_app_request
def load_logged_in_user():
user_id = session.get('user_id')
if user_id is None:
g.user = None
else:
g.user = get_db().execute(
'SELECT * FROM user WHERE id = ?', (user_id,)
).fetchone()
@bp.route('/logout')
def logout():
session.clear()
return redirect(url_for('pubkeys.index'))
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for('auth.login'))
return view(**kwargs)
return wrapped_view
def get_user(id):
user = get_db().execute(
'SELECT * FROM user WHERE id = ?', (id,)
).fetchone()
if user is None:
abort(404, "User id {0} does not exist.".format(id))
return user