diff --git a/server/szurubooru/api/user_api.py b/server/szurubooru/api/user_api.py index 8bb6ac86..e4134c63 100644 --- a/server/szurubooru/api/user_api.py +++ b/server/szurubooru/api/user_api.py @@ -1,9 +1,11 @@ ''' Exports UserListApi and UserDetailApi. ''' +import re import sqlalchemy from szurubooru.api.base_api import BaseApi -from szurubooru.errors import IntegrityError, ValidationError, NotFoundError +from szurubooru.errors import IntegrityError, ValidationError, NotFoundError, AuthError from szurubooru.services.search import UserSearchConfig, SearchExecutor +from szurubooru.util import is_valid_email def _serialize_user(authenticated_user, user): ret = { @@ -45,7 +47,7 @@ class UserListApi(BaseApi): self._auth_service.verify_privilege(context.user, 'users:create') try: - name = context.request['name'] + name = context.request['name'].strip() password = context.request['password'] email = context.request['email'].strip() except KeyError as ex: @@ -61,8 +63,12 @@ class UserListApi(BaseApi): class UserDetailApi(BaseApi): ''' API for individual users. ''' - def __init__(self, auth_service, user_service): + def __init__(self, config, auth_service, password_service, user_service): super().__init__() + self._available_access_ranks = config['service']['user_ranks'] + self._name_regex = config['service']['user_name_regex'] + self._password_regex = config['service']['password_regex'] + self._password_service = password_service self._auth_service = auth_service self._user_service = user_service @@ -76,5 +82,65 @@ class UserDetailApi(BaseApi): def put(self, request, context, user_name): ''' Updates an existing user. ''' - self._auth_service.verify_privilege(context.user, 'users:edit') - return {'message': 'Updating user ' + user_name} + user = self._user_service.get_by_name(context.session, user_name) + if not user: + raise NotFoundError('User %r not found.' % user_name) + + if context.user.user_id == user.user_id: + infix = 'self' + else: + infix = 'any' + + if 'name' in context.request: + self._auth_service.verify_privilege( + context.user, 'users:edit:%s:name' % infix) + name = context.request['name'].strip() + if not re.match(self._name_regex, name): + raise ValidationError( + 'Name must satisfy regex %r.' % self._name_regex) + user.name = name + + if 'password' in context.request: + password = context.request['password'] + self._auth_service.verify_privilege( + context.user, 'users:edit:%s:pass' % infix) + if not re.match(self._password_regex, password): + raise ValidationError( + 'Password must satisfy regex %r.' % self._password_regex) + user.password_salt = self._password_service.create_password() + user.password_hash = self._password_service.get_password_hash( + user.password_salt, password) + + if 'email' in context.request: + self._auth_service.verify_privilege( + context.user, 'users:edit:%s:email' % infix) + email = context.request['email'].strip() + if not is_valid_email(email): + raise ValidationError('%r is not a vaild email address.' % email) + # prefer nulls to empty strings in the DB + if not email: + email = None + user.email = email + + if 'accessRank' in context.request: + self._auth_service.verify_privilege( + context.user, 'users:edit:%s:rank' % infix) + rank = context.request['accessRank'].strip() + if not rank in self._available_access_ranks: + raise ValidationError( + 'Bad access rank. Valid access ranks: %r' \ + % self._available_access_ranks) + if self._available_access_ranks.index(context.user.access_rank) \ + < self._available_access_ranks.index(rank): + raise AuthError( + 'Trying to set higher access rank than one has') + user.access_rank = rank + + # TODO: avatar + + try: + context.session.commit() + except sqlalchemy.exc.IntegrityError: + raise IntegrityError('User %r already exists.' % name) + + return {'user': _serialize_user(context.user, user)} diff --git a/server/szurubooru/app.py b/server/szurubooru/app.py index 520aaa1f..59859e2d 100644 --- a/server/szurubooru/app.py +++ b/server/szurubooru/app.py @@ -67,8 +67,9 @@ def create_app(): auth_service = szurubooru.services.AuthService(config, password_service) user_service = szurubooru.services.UserService(config, password_service) - user_list = szurubooru.api.UserListApi(auth_service, user_service) - user = szurubooru.api.UserDetailApi(auth_service, user_service) + user_list_api = szurubooru.api.UserListApi(auth_service, user_service) + user_detail_api = szurubooru.api.UserDetailApi( + config, auth_service, password_service, user_service) app = falcon.API( request_type=_CustomRequest, @@ -85,7 +86,7 @@ def create_app(): app.add_error_handler(szurubooru.errors.SearchError, _on_search_error) app.add_error_handler(szurubooru.errors.NotFoundError, _on_not_found_error) - app.add_route('/users/', user_list) - app.add_route('/user/{user_name}', user) + app.add_route('/users/', user_list_api) + app.add_route('/user/{user_name}', user_detail_api) return app diff --git a/server/szurubooru/services/user_service.py b/server/szurubooru/services/user_service.py index 189a0272..c75cf950 100644 --- a/server/szurubooru/services/user_service.py +++ b/server/szurubooru/services/user_service.py @@ -4,6 +4,7 @@ import re from datetime import datetime from szurubooru.errors import ValidationError from szurubooru.model.user import User +from szurubooru.util import is_valid_email class UserService(object): ''' User management ''' @@ -25,6 +26,9 @@ class UserService(object): raise ValidationError( 'Password must satisfy regex %r.' % self._password_regex) + if not is_valid_email(email): + raise ValidationError('%r is not a vaild email address.' % email) + # prefer nulls to empty strings in the DB if not email: email = None diff --git a/server/szurubooru/tests/api/test_updating_user.py b/server/szurubooru/tests/api/test_updating_user.py new file mode 100644 index 00000000..1079865f --- /dev/null +++ b/server/szurubooru/tests/api/test_updating_user.py @@ -0,0 +1,143 @@ +from datetime import datetime +import szurubooru.services +from szurubooru.api.user_api import UserDetailApi +from szurubooru.errors import AuthError, ValidationError +from szurubooru.model.user import User +from szurubooru.tests.database_test_case import DatabaseTestCase +from szurubooru.util import dotdict + +class TestUserDetailApi(DatabaseTestCase): + def setUp(self): + super().setUp() + config = { + 'basic': { + 'secret': '', + }, + 'service': { + 'user_name_regex': '.{3,}', + 'password_regex': '.{3,}', + 'user_ranks': ['anonymous', 'regular_user', 'mod', 'admin'], + }, + 'privileges': { + 'users:edit:self:name': 'regular_user', + 'users:edit:self:pass': 'regular_user', + 'users:edit:self:email': 'regular_user', + 'users:edit:self:rank': 'mod', + + 'users:edit:any:name': 'mod', + 'users:edit:any:pass': 'mod', + 'users:edit:any:email': 'mod', + 'users:edit:any:rank': 'admin', + } + } + password_service = szurubooru.services.PasswordService(config) + auth_service = szurubooru.services.AuthService(config, password_service) + user_service = szurubooru.services.UserService(config, password_service) + self.auth_service = auth_service + self.api = UserDetailApi( + config, auth_service, password_service, user_service) + self.context = dotdict() + self.context.session = self.session + self.context.request = {} + self.request = dotdict() + self.request.context = self.context + + def _create_user(self, name, rank='admin'): + user = User() + user.name = name + user.password = 'dummy' + user.password_salt = 'dummy' + user.password_hash = 'dummy' + user.email = 'dummy' + user.access_rank = rank + user.creation_time = datetime.now() + user.avatar_style = User.AVATAR_GRAVATAR + return user + + def test_updating_nothing(self): + admin_user = self._create_user('u1', 'admin') + self.session.add(admin_user) + self.context.user = admin_user + self.api.put(self.request, self.context, 'u1') + admin_user = self.session.query(User).filter_by(name='u1').one() + self.assertEqual(admin_user.name, 'u1') + self.assertEqual(admin_user.email, 'dummy') + self.assertEqual(admin_user.access_rank, 'admin') + + def test_admin_updating_everything_for_themselves(self): + admin_user = self._create_user('u1', 'admin') + self.session.add(admin_user) + self.context.user = admin_user + self.context.request = { + 'name': 'chewie', + 'email': 'asd@asd.asd', + 'password': 'valid', + 'accessRank': 'mod', + } + self.api.put(self.request, self.context, 'u1') + admin_user = self.session.query(User).filter_by(name='chewie').one() + self.assertEqual(admin_user.name, 'chewie') + self.assertEqual(admin_user.email, 'asd@asd.asd') + self.assertEqual(admin_user.access_rank, 'mod') + self.assertTrue(self.auth_service.is_valid_password(admin_user, 'valid')) + self.assertFalse(self.auth_service.is_valid_password(admin_user, 'invalid')) + + def test_removing_email(self): + admin_user = self._create_user('u1', 'admin') + self.session.add(admin_user) + self.context.user = admin_user + self.context.request = {'email': ''} + self.api.put(self.request, self.context, 'u1') + admin_user = self.session.query(User).filter_by(name='u1').one() + self.assertEqual(admin_user.email, None) + + def test_invalid_inputs(self): + admin_user = self._create_user('u1', 'admin') + self.session.add(admin_user) + self.context.user = admin_user + self.context.request = {'name': '.'} + self.assertRaises( + ValidationError, self.api.put, self.request, self.context, 'u1') + self.context.request = {'password': '.'} + self.assertRaises( + ValidationError, self.api.put, self.request, self.context, 'u1') + self.context.request = {'accessRank': '.'} + self.assertRaises( + ValidationError, self.api.put, self.request, self.context, 'u1') + self.context.request = {'email': '.'} + self.assertRaises( + ValidationError, self.api.put, self.request, self.context, 'u1') + + def test_user_trying_to_update_someone_else(self): + user1 = self._create_user('u1', 'regular_user') + user2 = self._create_user('u2', 'regular_user') + self.session.add_all([user1, user2]) + self.context.user = user1 + for request in [ + {'name': 'whatever'}, + {'email': 'whatever'}, + {'accessRank': 'whatever'}, + {'password': 'whatever'}]: + self.context.request = request + self.assertRaises( + AuthError, self.api.put, self.request, self.context, user2.name) + + def test_user_trying_to_become_someone_else(self): + user1 = self._create_user('u1', 'regular_user') + user2 = self._create_user('u2', 'regular_user') + self.session.add_all([user1, user2]) + self.context.user = user1 + self.context.request = {'name': 'u2'} + self.assertRaises( + ValidationError, self.api.put, self.request, self.context, 'u1') + + def test_mods_trying_to_become_admin(self): + user1 = self._create_user('u1', 'mod') + user2 = self._create_user('u2', 'mod') + self.session.add_all([user1, user2]) + self.context.user = user1 + self.context.request = {'accessRank': 'admin'} + self.assertRaises( + AuthError, self.api.put, self.request, self.context, user1.name) + self.assertRaises( + AuthError, self.api.put, self.request, self.context, user2.name) diff --git a/server/szurubooru/util.py b/server/szurubooru/util.py index 90677d28..dee024f0 100644 --- a/server/szurubooru/util.py +++ b/server/szurubooru/util.py @@ -4,8 +4,12 @@ import datetime import re from szurubooru.errors import ValidationError +def is_valid_email(email): + ''' Validates given email address. ''' + return not email or re.match('^[^@]*@[^@]*\.[^@]*$', email) + class dotdict(dict): # pylint: disable=invalid-name - '''dot.notation access to dictionary attributes''' + ''' dot.notation access to dictionary attributes. ''' def __getattr__(self, attr): return self.get(attr) __setattr__ = dict.__setitem__