Address pull request comments

* Added column password_revision. This field will default to 0, which all passwords will have till they're updated. After that each password hash method has a revision.
* appeased pycodestyle
This commit is contained in:
ReAnzu 2018-03-01 02:24:15 -06:00
parent 0cd5600163
commit 3276662c39
8 changed files with 87 additions and 60 deletions

View file

@ -1,3 +1,5 @@
from typing import Tuple
import hashlib import hashlib
import random import random
from collections import OrderedDict from collections import OrderedDict
@ -5,7 +7,7 @@ from nacl.exceptions import InvalidkeyError
from szurubooru import config, model, errors, db from szurubooru import config, model, errors, db
from szurubooru.func import util from szurubooru.func import util
from nacl.pwhash import argon2id, verify from nacl import pwhash
RANK_MAP = OrderedDict([ RANK_MAP = OrderedDict([
@ -19,29 +21,29 @@ RANK_MAP = OrderedDict([
]) ])
def get_password_hash(salt: str, password: str) -> str: def get_password_hash(salt: str, password: str) -> Tuple[str, int]:
""" Retrieve argon2id password hash.""" ''' Retrieve argon2id password hash. '''
return argon2id.str( return pwhash.argon2id.str(
(config.config['secret'] + salt + password).encode('utf8') (config.config['secret'] + salt + password).encode('utf8')
).decode('utf8') ).decode('utf8'), 3
def get_sha256_legacy_password_hash(salt: str, password: str) -> str: def get_sha256_legacy_password_hash(salt: str, password: str) -> Tuple[str, int]:
""" Retrieve old-style sha256 password hash.""" ''' Retrieve old-style sha256 password hash. '''
digest = hashlib.sha256() digest = hashlib.sha256()
digest.update(config.config['secret'].encode('utf8')) digest.update(config.config['secret'].encode('utf8'))
digest.update(salt.encode('utf8')) digest.update(salt.encode('utf8'))
digest.update(password.encode('utf8')) digest.update(password.encode('utf8'))
return digest.hexdigest() return digest.hexdigest(), 2
def get_sha1_legacy_password_hash(salt: str, password: str) -> str: def get_sha1_legacy_password_hash(salt: str, password: str) -> Tuple[str, int]:
""" Retrieve old-style sha1 password hash.""" ''' Retrieve old-style sha1 password hash. '''
digest = hashlib.sha1() digest = hashlib.sha1()
digest.update(b'1A2/$_4xVa') digest.update(b'1A2/$_4xVa')
digest.update(salt.encode('utf8')) digest.update(salt.encode('utf8'))
digest.update(password.encode('utf8')) digest.update(password.encode('utf8'))
return digest.hexdigest() return digest.hexdigest(), 1
def create_password() -> str: def create_password() -> str:
@ -59,16 +61,19 @@ def is_valid_password(user: model.User, password: str) -> bool:
salt, valid_hash = user.password_salt, user.password_hash salt, valid_hash = user.password_salt, user.password_hash
try: try:
return verify(user.password_hash.encode('utf8'), return pwhash.verify(
(config.config['secret'] + salt + password).encode('utf8')) user.password_hash.encode('utf8'),
(config.config['secret'] + salt + password).encode('utf8'))
except InvalidkeyError: except InvalidkeyError:
possible_hashes = [ possible_hashes = [
get_sha256_legacy_password_hash(salt, password), get_sha256_legacy_password_hash(salt, password)[0],
get_sha1_legacy_password_hash(salt, password) get_sha1_legacy_password_hash(salt, password)[0]
] ]
if valid_hash in possible_hashes: if valid_hash in possible_hashes:
# Convert the user password hash to the new hash # Convert the user password hash to the new hash
user.password_hash = get_password_hash(salt, password) new_hash, revision = get_password_hash(salt, password)
user.password_hash = new_hash
user.password_revision = revision
db.session.commit() db.session.commit()
return True return True

View file

@ -243,7 +243,9 @@ def update_user_password(user: model.User, password: str) -> None:
raise InvalidPasswordError( raise InvalidPasswordError(
'Password must satisfy regex %r.' % password_regex) 'Password must satisfy regex %r.' % password_regex)
user.password_salt = auth.create_password() user.password_salt = auth.create_password()
user.password_hash = auth.get_password_hash(user.password_salt, password) hash, revision = auth.get_password_hash(user.password_salt, password)
user.password_hash = hash
user.password_revision = revision
def update_user_email(user: model.User, email: str) -> None: def update_user_email(user: model.User, email: str) -> None:
@ -308,5 +310,7 @@ def reset_user_password(user: model.User) -> str:
assert user assert user
password = auth.create_password() password = auth.create_password()
user.password_salt = auth.create_password() user.password_salt = auth.create_password()
user.password_hash = auth.get_password_hash(user.password_salt, password) hash, revision = auth.get_password_hash(user.password_salt, password)
user.password_hash = hash
user.password_revision = revision
return password return password

View file

@ -1,30 +0,0 @@
'''
Alter the password_hash field to work with larger output. Particularly libsodium output for greater password security.
Revision ID: 9ef1a1643c2a
Created at: 2018-02-24 23:00:32.848575
'''
import sqlalchemy as sa
from alembic import op
revision = '9ef1a1643c2a'
down_revision = '02ef5f73f4ab'
branch_labels = None
depends_on = None
def upgrade():
op.alter_column('user', 'password_hash',
existing_type=sa.VARCHAR(length=64),
type_=sa.Unicode(length=128),
existing_nullable=False)
def downgrade():
op.alter_column('user', 'password_hash',
existing_type=sa.Unicode(length=128),
type_=sa.VARCHAR(length=64),
existing_nullable=False)

View file

@ -0,0 +1,34 @@
'''
Alter the password_hash field to work with larger output.
Particularly libsodium output for greater password security.
Revision ID: 9ef1a1643c2a
Created at: 2018-02-24 23:00:32.848575
'''
import sqlalchemy as sa
from alembic import op
revision = '9ef1a1643c2a'
down_revision = '02ef5f73f4ab'
branch_labels = None
depends_on = None
def upgrade():
op.alter_column('user', 'password_hash',
existing_type=sa.VARCHAR(length=64),
type_=sa.Unicode(length=128),
existing_nullable=False)
op.add_column('user', sa.Column('password_revision',
sa.SmallInteger(),
nullable=False))
def downgrade():
op.alter_column('user', 'password_hash',
existing_type=sa.Unicode(length=128),
type_=sa.VARCHAR(length=64),
existing_nullable=False)
op.drop_column('user', 'password_revision')

View file

@ -25,6 +25,8 @@ class User(Base):
name = sa.Column('name', sa.Unicode(50), nullable=False, unique=True) name = sa.Column('name', sa.Unicode(50), nullable=False, unique=True)
password_hash = sa.Column('password_hash', sa.Unicode(128), nullable=False) password_hash = sa.Column('password_hash', sa.Unicode(128), nullable=False)
password_salt = sa.Column('password_salt', sa.Unicode(32)) password_salt = sa.Column('password_salt', sa.Unicode(32))
password_revision = sa.Column('password_revision', sa.SmallInteger,
default=0, nullable=False)
email = sa.Column('email', sa.Unicode(64), nullable=True) email = sa.Column('email', sa.Unicode(64), nullable=True)
rank = sa.Column('rank', sa.Unicode(32), nullable=False) rank = sa.Column('rank', sa.Unicode(32), nullable=False)
avatar_style = sa.Column( avatar_style = sa.Column(

View file

@ -115,7 +115,11 @@ def config_injector():
@pytest.fixture @pytest.fixture
def user_factory(): def user_factory():
def factory(name=None, rank=model.User.RANK_REGULAR, email='dummy', password_salt=None, password_hash=None): def factory(name=None,
rank=model.User.RANK_REGULAR,
email='dummy',
password_salt=None,
password_hash=None):
user = model.User() user = model.User()
user.name = name or get_unique_name() user.name = name or get_unique_name()
user.password_salt = password_salt or 'dummy' user.password_salt = password_salt or 'dummy'

View file

@ -9,29 +9,35 @@ def inject_config(config_injector):
def test_get_password_hash(): def test_get_password_hash():
salt, password = ('testSalt', 'pass') salt, password = ('testSalt', 'pass')
result = auth.get_password_hash(salt, password) result, revision = auth.get_password_hash(salt, password)
assert result assert result
hash_parts = list(filter(lambda e: e is not None and e != '', result.split('$'))) assert revision == 3
hash_parts = list(
filter(lambda e: e is not None and e != '', result.split('$')))
assert len(hash_parts) == 5 assert len(hash_parts) == 5
assert hash_parts[0] == 'argon2id' assert hash_parts[0] == 'argon2id'
def test_get_sha256_legacy_password_hash(): def test_get_sha256_legacy_password_hash():
salt, password = ('testSalt', 'pass') salt, password = ('testSalt', 'pass')
result = auth.get_sha256_legacy_password_hash(salt, password) result, revision = auth.get_sha256_legacy_password_hash(salt, password)
assert result == '2031ac9631353ac9303719a7f808a24f79aa1d71712c98523e4bb4cce579428a' hash = '2031ac9631353ac9303719a7f808a24f79aa1d71712c98523e4bb4cce579428a'
assert result == hash
assert revision == 2
def test_get_sha1_legacy_password_hash(): def test_get_sha1_legacy_password_hash():
salt, password = ('testSalt', 'pass') salt, password = ('testSalt', 'pass')
result = auth.get_sha1_legacy_password_hash(salt, password) result, revision = auth.get_sha1_legacy_password_hash(salt, password)
assert result == '1eb1f953d9be303a1b54627e903e6124cfb1245b' assert result == '1eb1f953d9be303a1b54627e903e6124cfb1245b'
assert revision == 1
def test_is_valid_password_auto_upgrades_user_password_hash_on_success_of_legacy_hash(user_factory): def test_is_valid_password_auto_upgrades_user_password_hash(user_factory):
salt, password = ('testSalt', 'pass') salt, password = ('testSalt', 'pass')
legacy_password_hash = auth.get_sha256_legacy_password_hash(salt, password) hash, revision = auth.get_sha256_legacy_password_hash(salt, password)
user = user_factory(password_salt=salt, password_hash=legacy_password_hash) user = user_factory(password_salt=salt, password_hash=hash)
result = auth.is_valid_password(user, password) result = auth.is_valid_password(user, password)
assert result is True assert result is True
assert user.password_hash != legacy_password_hash assert user.password_hash != hash
assert user.password_revision > revision

View file

@ -320,10 +320,11 @@ def test_update_user_password(user_factory, config_injector):
with patch('szurubooru.func.auth.create_password'), \ with patch('szurubooru.func.auth.create_password'), \
patch('szurubooru.func.auth.get_password_hash'): patch('szurubooru.func.auth.get_password_hash'):
auth.create_password.return_value = 'salt' auth.create_password.return_value = 'salt'
auth.get_password_hash.return_value = 'hash' auth.get_password_hash.return_value = ('hash', 3)
users.update_user_password(user, 'a') users.update_user_password(user, 'a')
assert user.password_salt == 'salt' assert user.password_salt == 'salt'
assert user.password_hash == 'hash' assert user.password_hash == 'hash'
assert user.password_revision == 3
def test_update_user_email_with_too_long_string(user_factory): def test_update_user_email_with_too_long_string(user_factory):
@ -447,7 +448,8 @@ def test_reset_user_password(user_factory):
patch('szurubooru.func.auth.get_password_hash'): patch('szurubooru.func.auth.get_password_hash'):
user = user_factory() user = user_factory()
auth.create_password.return_value = 'salt' auth.create_password.return_value = 'salt'
auth.get_password_hash.return_value = 'hash' auth.get_password_hash.return_value = ('hash', 3)
users.reset_user_password(user) users.reset_user_password(user)
assert user.password_salt == 'salt' assert user.password_salt == 'salt'
assert user.password_hash == 'hash' assert user.password_hash == 'hash'
assert user.password_revision == 3