Fixed existing tests, added new tests around endpoints, authentication, and password hash hardening

This commit is contained in:
ReAnzu 2018-02-27 22:29:38 -06:00
parent 187ab77ebd
commit d9b3160437
17 changed files with 355 additions and 18 deletions

View file

@ -33,13 +33,18 @@ def create_user_token(ctx: rest.Context, params: Dict[str, str] = {}) -> rest.Re
@rest.routes.put('/user-token/(?P<user_name>[^/]+)/(?P<user_token>[^/]+)/?') @rest.routes.put('/user-token/(?P<user_name>[^/]+)/(?P<user_token>[^/]+)/?')
def edit_user_token(ctx: rest.Context, params: Dict[str, str] = {}) -> rest.Response: def update_user_token(ctx: rest.Context, params: Dict[str, str] = {}) -> rest.Response:
user = users.get_user_by_name(params['user_name']) user = users.get_user_by_name(params['user_name'])
infix = 'self' if ctx.user.user_id == user.user_id else 'any' infix = 'self' if ctx.user.user_id == user.user_id else 'any'
auth.verify_privilege(ctx.user, 'user_tokens:edit:%s' % infix) auth.verify_privilege(ctx.user, 'user_tokens:edit:%s' % infix)
user_token = user_tokens.get_user_token_by_user_and_token(user, params['user_token']) user_token = user_tokens.get_user_token_by_user_and_token(user, params['user_token'])
versions.verify_version(user_token, ctx) versions.verify_version(user_token, ctx)
versions.bump_version(user_token) versions.bump_version(user_token)
if ctx.has_param('enabled'):
auth.verify_privilege(ctx.user, 'user_tokens:edit:%s' % infix)
user_tokens.update_user_token_enabled(user_token, ctx.get_param_as_bool('enabled'))
user_tokens.update_user_token_edit_time(user_token)
ctx.session.commit()
return _serialize(ctx, user_token) return _serialize(ctx, user_token)

View file

@ -722,12 +722,14 @@ def merge_posts(
merge_favorites(source_post.post_id, target_post.post_id) merge_favorites(source_post.post_id, target_post.post_id)
merge_relations(source_post.post_id, target_post.post_id) merge_relations(source_post.post_id, target_post.post_id)
delete(source_post) content = None
db.session.flush()
if replace_content: if replace_content:
content = files.get(get_post_content_path(source_post)) content = files.get(get_post_content_path(source_post))
delete(source_post)
db.session.flush()
if content is not None:
update_post_content(target_post, content) update_post_content(target_post, content)

View file

@ -1,8 +1,6 @@
from datetime import datetime from datetime import datetime
from typing import Any, Optional, List, Dict, Callable from typing import Any, Optional, List, Dict, Callable
import sqlalchemy as sa
from szurubooru import db, model, rest from szurubooru import db, model, rest
from szurubooru.func import auth, serialization, users from szurubooru.func import auth, serialization, users
@ -22,7 +20,7 @@ class UserTokenSerializer(serialization.BaseSerializer):
'enabled': self.serialize_enabled, 'enabled': self.serialize_enabled,
'version': self.serialize_version, 'version': self.serialize_version,
'creationTime': self.serialize_creation_time, 'creationTime': self.serialize_creation_time,
'lastLoginTime': self.serialize_last_edit_time, 'lastEditTime': self.serialize_last_edit_time,
} }
def serialize_user(self) -> Any: def serialize_user(self) -> Any:
@ -76,3 +74,13 @@ def create_user_token(user: model.User) -> model.UserToken:
db.session.add(user_token) db.session.add(user_token)
db.session.commit() db.session.commit()
return user_token return user_token
def update_user_token_enabled(user_token: model.UserToken, enabled: bool) -> None:
assert user_token
user_token.enabled = enabled if enabled is not None else True
def update_user_token_edit_time(user_token: model.UserToken) -> None:
assert user_token
user_token.last_edit_time = datetime.utcnow()

View file

@ -49,7 +49,6 @@ def _get_user(ctx: rest.Context) -> Optional[model.User]:
msg.format(ctx.get_header('Authorization'), str(err))) msg.format(ctx.get_header('Authorization'), str(err)))
@rest.middleware.pre_hook
def process_request(ctx: rest.Context) -> None: def process_request(ctx: rest.Context) -> None:
''' Bind the user to request. Update last login time if needed. ''' ''' Bind the user to request. Update last login time if needed. '''
auth_user = _get_user(ctx) auth_user = _get_user(ctx)
@ -58,3 +57,8 @@ def process_request(ctx: rest.Context) -> None:
if ctx.get_param_as_bool('bump-login', default=False) and ctx.user.user_id: if ctx.get_param_as_bool('bump-login', default=False) and ctx.user.user_id:
users.bump_user_login_time(ctx.user) users.bump_user_login_time(ctx.user)
ctx.session.commit() ctx.session.commit()
@rest.middleware.pre_hook
def process_request_hook(ctx: rest.Context) -> None:
process_request(ctx)

View file

@ -7,9 +7,9 @@ pre_hooks = [] # type: List[Callable[[Context], None]]
post_hooks = [] # type: List[Callable[[Context], None]] post_hooks = [] # type: List[Callable[[Context], None]]
def pre_hook(handler: Callable) -> None: def pre_hook(handler: Callable) -> Callable:
pre_hooks.append(handler) pre_hooks.append(handler)
def post_hook(handler: Callable) -> None: def post_hook(handler: Callable) -> Callable:
post_hooks.insert(0, handler) post_hooks.insert(0, handler)

View file

@ -0,0 +1,29 @@
from unittest.mock import patch
import pytest
from szurubooru import api
from szurubooru.func import user_tokens, users
@pytest.fixture(autouse=True)
def inject_config(config_injector):
config_injector({'privileges': {'user_tokens:create:self': 'regular'}})
def test_creating_user_token(user_token_factory, context_factory, fake_datetime):
user_token = user_token_factory()
with patch('szurubooru.func.user_tokens.create_user_token'), \
patch('szurubooru.func.user_tokens.serialize_user_token'), \
patch('szurubooru.func.users.get_user_by_name'), \
fake_datetime('1969-02-12'):
users.get_user_by_name.return_value = user_token.user
user_tokens.serialize_user_token.return_value = 'serialized user token'
user_tokens.create_user_token.return_value = user_token
result = api.user_token_api.create_user_token(
context_factory(
user=user_token.user),
{'user_name': user_token.user.name})
assert result == 'serialized user token'
user_tokens.create_user_token.assert_called_once_with(
user_token.user)

View file

@ -0,0 +1,29 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db
from szurubooru.func import user_tokens, users
@pytest.fixture(autouse=True)
def inject_config(config_injector):
config_injector({'privileges': {'user_tokens:delete:self': 'regular'}})
def test_deleting_user_token(user_token_factory, context_factory, fake_datetime):
user_token = user_token_factory()
db.session.add(user_token)
db.session.commit()
with patch('szurubooru.func.user_tokens.get_user_token_by_user_and_token'), \
patch('szurubooru.func.users.get_user_by_name'), \
fake_datetime('1969-02-12'):
users.get_user_by_name.return_value = user_token.user
user_tokens.get_user_token_by_user_and_token.return_value = user_token
result = api.user_token_api.delete_user_token(
context_factory(
user=user_token.user),
{'user_name': user_token.user.name, 'user_token': user_token.token})
assert result == {}
user_tokens.get_user_token_by_user_and_token.assert_called_once_with(
user_token.user, user_token.token)

View file

@ -0,0 +1,31 @@
from unittest.mock import patch
import pytest
from szurubooru import api
from szurubooru.func import user_tokens, users
@pytest.fixture(autouse=True)
def inject_config(config_injector):
config_injector({'privileges': {'user_tokens:list:self': 'regular'}})
def test_retrieving_user_tokens(user_token_factory, context_factory, fake_datetime):
user_token1 = user_token_factory()
user_token2 = user_token_factory(user=user_token1.user)
user_token3 = user_token_factory(user=user_token1.user)
with patch('szurubooru.func.user_tokens.get_user_tokens'), \
patch('szurubooru.func.user_tokens.serialize_user_token'), \
patch('szurubooru.func.users.get_user_by_name'), \
fake_datetime('1969-02-12'):
users.get_user_by_name.return_value = user_token1.user
user_tokens.serialize_user_token.return_value = 'serialized user token'
user_tokens.get_user_tokens.return_value = [user_token1, user_token2, user_token3]
result = api.user_token_api.get_user_tokens(
context_factory(
user=user_token1.user),
{'user_name': user_token1.user.name})
assert result == {'results': ['serialized user token', 'serialized user token', 'serialized user token']}
user_tokens.get_user_tokens.assert_called_once_with(
user_token1.user)

View file

@ -0,0 +1,41 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db
from szurubooru.func import user_tokens, users
@pytest.fixture(autouse=True)
def inject_config(config_injector):
config_injector({'privileges': {'user_tokens:edit:self': 'regular'}})
def test_edit_user_token(user_token_factory, context_factory, fake_datetime):
user_token = user_token_factory()
db.session.add(user_token)
db.session.commit()
with patch('szurubooru.func.user_tokens.get_user_token_by_user_and_token'), \
patch('szurubooru.func.user_tokens.update_user_token_enabled'), \
patch('szurubooru.func.user_tokens.update_user_token_edit_time'), \
patch('szurubooru.func.user_tokens.serialize_user_token'), \
patch('szurubooru.func.users.get_user_by_name'), \
fake_datetime('1969-02-12'):
users.get_user_by_name.return_value = user_token.user
user_tokens.serialize_user_token.return_value = 'serialized user token'
user_tokens.get_user_token_by_user_and_token.return_value = user_token
result = api.user_token_api.update_user_token(
context_factory(
params={
'version': user_token.version,
'enabled': False,
},
user=user_token.user),
{'user_name': user_token.user.name, 'user_token': user_token.token})
assert result == 'serialized user token'
user_tokens.get_user_token_by_user_and_token.assert_called_once_with(
user_token.user, user_token.token)
user_tokens.update_user_token_enabled.assert_called_once_with(
user_token, False)
user_tokens.update_user_token_edit_time.assert_called_once_with(
user_token)

View file

@ -93,11 +93,11 @@ def session(query_logger): # pylint: disable=unused-argument
@pytest.fixture @pytest.fixture
def context_factory(session): def context_factory(session):
def factory(params=None, files=None, user=None): def factory(params=None, files=None, user=None, headers=None):
ctx = rest.Context( ctx = rest.Context(
method=None, method=None,
url=None, url=None,
headers={}, headers=headers or {},
params=params or {}, params=params or {},
files=files or {}) files=files or {})
ctx.session = session ctx.session = session
@ -115,11 +115,11 @@ def config_injector():
@pytest.fixture @pytest.fixture
def user_factory(): def user_factory():
def factory(name=None, rank=model.User.RANK_REGULAR, email='dummy'): def factory(name=None, rank=model.User.RANK_REGULAR, email='dummy', password_salt=None, password=None):
user = model.User() user = model.User()
user.name = name or get_unique_name() user.name = name or get_unique_name()
user.password_salt = 'dummy' user.password_salt = password_salt or 'dummy'
user.password_hash = 'dummy' user.password_hash = password or 'dummy'
user.email = email user.email = email
user.rank = rank user.rank = rank
user.creation_time = datetime(1997, 1, 1) user.creation_time = datetime(1997, 1, 1)
@ -128,6 +128,21 @@ def user_factory():
return factory return factory
@pytest.fixture
def user_token_factory(user_factory):
def factory(user=None, token=None, enabled=None, creation_time=None):
if user is None:
user = user_factory()
db.session.add(user)
user_token = model.UserToken()
user_token.user = user
user_token.token = token or 'dummy'
user_token.enabled = enabled or True
user_token.creation_time = creation_time or datetime(1997, 1, 1)
return user_token
return factory
@pytest.fixture @pytest.fixture
def tag_category_factory(): def tag_category_factory():
def factory(name=None, color='dummy', default=False): def factory(name=None, color='dummy', default=False):

View file

@ -0,0 +1,39 @@
from szurubooru.func import auth
import pytest
@pytest.fixture(autouse=True)
def inject_config(config_injector):
config_injector({'secret': 'testSecret'})
def test_get_sha256_legacy_password_hash():
salt, password = ('testSalt', 'pass')
result = auth.get_sha256_legacy_password_hash(salt, password)
assert result == '2031ac9631353ac9303719a7f808a24f79aa1d71712c98523e4bb4cce579428a'
def test_get_sha1_legacy_password_hash():
salt, password = ('testSalt', 'pass')
result = auth.get_sha1_legacy_password_hash(salt, password)
assert result == '1eb1f953d9be303a1b54627e903e6124cfb1245b'
def test_is_valid_password(user_factory):
salt, password = ('testSalt', 'pass')
user = user_factory(password_salt=salt, password=password)
legacy_password_hash = auth.get_sha256_legacy_password_hash(salt, password)
user.password_hash = legacy_password_hash
result = auth.is_valid_password(user, password)
assert result is True
assert user.password_hash != legacy_password_hash
def test_is_valid_token(user_token_factory):
user_token = user_token_factory()
assert auth.is_valid_token(user_token)
def test_generate_authorization_token():
result = auth.generate_authorization_token()
assert result != auth.generate_authorization_token()

View file

@ -936,6 +936,6 @@ def test_merge_posts_replaces_content(
assert posts.try_get_post_by_id(source_post.post_id) is None assert posts.try_get_post_by_id(source_post.post_id) is None
post = posts.get_post_by_id(target_post.post_id) post = posts.get_post_by_id(target_post.post_id)
assert post is not None assert post is not None
assert os.path.exists(source_path) assert not os.path.exists(source_path)
assert os.path.exists(target_path1) assert os.path.exists(target_path1)
assert not os.path.exists(target_path2) assert not os.path.exists(target_path2)

View file

@ -116,7 +116,7 @@ def test_update_category_color_with_too_long_string(tag_category_factory):
def test_update_category_color_with_invalid_string(tag_category_factory): def test_update_category_color_with_invalid_string(tag_category_factory):
category = tag_category_factory() category = tag_category_factory()
with pytest.raises(tag_categories.InvalidTagCategoryColorError): with pytest.raises(tag_categories.InvalidTagCategoryColorError):
tag_categories.update_category_color(category, 'NOPE') tag_categories.update_category_color(category, 'NOPE#')
@pytest.mark.parametrize('attempt', ['#aaaaaa', '#012345', '012345', 'red']) @pytest.mark.parametrize('attempt', ['#aaaaaa', '#012345', '012345', 'red'])

View file

@ -0,0 +1,72 @@
from datetime import datetime
from unittest.mock import patch
from szurubooru import db
from szurubooru.func import user_tokens, users, auth
def test_serialize_user_token(user_token_factory):
user_token = user_token_factory()
db.session.add(user_token)
db.session.flush()
with patch('szurubooru.func.users.get_avatar_url'):
users.get_avatar_url.return_value = 'https://example.com/avatar.png'
result = user_tokens.serialize_user_token(user_token, user_token.user)
assert result == {'creationTime': datetime(1997, 1, 1, 0, 0),
'enabled': True,
'lastEditTime': None,
'token': 'dummy',
'user': {
'avatarUrl': 'https://example.com/avatar.png',
'name': user_token.user.name},
'version': 1}
def test_serialize_user_token_none():
result = user_tokens.serialize_user_token(None, None)
assert result is None
def test_get_user_token_by_user_and_token(user_token_factory):
user_token = user_token_factory()
db.session.add(user_token)
db.session.flush()
db.session.commit()
result = user_tokens.get_user_token_by_user_and_token(user_token.user, user_token.token)
assert result == user_token
def test_get_user_tokens(user_token_factory):
user_token1 = user_token_factory()
user_token2 = user_token_factory(user=user_token1.user)
db.session.add(user_token1)
db.session.add(user_token2)
db.session.flush()
db.session.commit()
result = user_tokens.get_user_tokens(user_token1.user)
assert result == [user_token1, user_token2]
def test_create_user_token(user_factory):
user = user_factory()
db.session.add(user)
db.session.flush()
db.session.commit()
with patch('szurubooru.func.auth.generate_authorization_token'):
auth.generate_authorization_token.return_value = 'test'
result = user_tokens.create_user_token(user)
assert result.token == 'test'
assert result.user == user
def test_update_user_token_enabled(user_token_factory):
user_token = user_token_factory()
user_tokens.update_user_token_enabled(user_token, False)
assert user_token.enabled is False
def test_update_user_token_edit_time(user_token_factory):
user_token = user_token_factory()
assert user_token.last_edit_time is None
user_tokens.update_user_token_edit_time(user_token)
assert user_token.last_edit_time is not None

View file

@ -0,0 +1,48 @@
from unittest.mock import patch
from szurubooru.func import auth, users, user_tokens
from szurubooru.middleware import authenticator
from szurubooru.rest import errors
import pytest
def test_process_request_no_header(context_factory):
ctx = context_factory()
authenticator.process_request(ctx)
assert ctx.user.name is None
def test_process_request_basic_auth_valid(context_factory, user_factory):
user = user_factory()
ctx = context_factory(headers={
'Authorization': "Basic dGVzdFVzZXI6dGVzdFBhc3N3b3Jk"
})
with patch('szurubooru.func.auth.is_valid_password'), \
patch('szurubooru.func.users.get_user_by_name'):
users.get_user_by_name.return_value = user
auth.is_valid_password.return_value = True
authenticator.process_request(ctx)
assert ctx.user == user
def test_process_request_token_auth_valid(context_factory, user_token_factory):
user_token = user_token_factory()
ctx = context_factory(headers={
'Authorization': "Token dGVzdFVzZXI6dGVzdFRva2Vu"
})
with patch('szurubooru.func.auth.is_valid_token'), \
patch('szurubooru.func.users.get_user_by_name'), \
patch('szurubooru.func.user_tokens.get_user_token_by_user_and_token'):
users.get_user_by_name.return_value = user_token.user
user_tokens.get_user_token_by_user_and_token.return_value = user_token
auth.is_valid_password.return_value = True
authenticator.process_request(ctx)
assert ctx.user == user_token.user
def test_process_request_bad_header(context_factory):
ctx = context_factory(headers={
'Authorization': "Secret SuperSecretValue"
})
with pytest.raises(errors.HttpBadRequest):
authenticator.process_request(ctx)

View file

@ -0,0 +1,14 @@
from datetime import datetime
from szurubooru import db
def test_saving_user_token(user_token_factory):
user_token = user_token_factory()
db.session.add(user_token)
db.session.flush()
db.session.refresh(user_token)
assert not db.session.dirty
assert user_token.user is not None
assert user_token.token == 'dummy'
assert user_token.enabled is True
assert user_token.creation_time == datetime(1997, 1, 1)