server/api: refactor + remove ID from user JSON
This commit is contained in:
parent
adecdd4cd9
commit
e4239a199c
23 changed files with 482 additions and 560 deletions
1
API.md
1
API.md
|
@ -459,7 +459,6 @@ Not yet implemented.
|
|||
|
||||
```json5
|
||||
{
|
||||
"id": 2,
|
||||
"name": "rr-",
|
||||
"email": "rr-@sakuya.pl", // available only if the request is authenticated by the same user
|
||||
"rank": "admin", // controlled by server's configuration
|
||||
|
|
|
@ -140,8 +140,13 @@ class Api {
|
|||
cookies.remove('auth');
|
||||
}
|
||||
|
||||
isLoggedIn() {
|
||||
return this.userName !== null;
|
||||
isLoggedIn(user) {
|
||||
if (user) {
|
||||
return this.userName !== null &&
|
||||
this.userName.toLowerCase() === user.name.toLowerCase();
|
||||
} else {
|
||||
return this.userName !== null;
|
||||
}
|
||||
}
|
||||
|
||||
getFullUrl(url) {
|
||||
|
|
|
@ -155,7 +155,7 @@ class UsersController {
|
|||
files.avatar = data.avatarContent;
|
||||
}
|
||||
|
||||
const isLoggedIn = api.isLoggedIn() && api.user.id == user.id;
|
||||
const isLoggedIn = api.isLoggedIn(user);
|
||||
return new Promise((resolve, reject) => {
|
||||
api.put('/user/' + user.name, data, files)
|
||||
.then(response => {
|
||||
|
@ -182,7 +182,7 @@ class UsersController {
|
|||
}
|
||||
|
||||
_delete(user) {
|
||||
const isLoggedIn = api.isLoggedIn() && api.user.id == user.id;
|
||||
const isLoggedIn = api.isLoggedIn(user);
|
||||
return new Promise((resolve, reject) => {
|
||||
api.delete('/user/' + user.name)
|
||||
.then(response => {
|
||||
|
@ -205,7 +205,7 @@ class UsersController {
|
|||
}
|
||||
|
||||
_show(user, section) {
|
||||
const isLoggedIn = api.isLoggedIn() && api.user.id == user.id;
|
||||
const isLoggedIn = api.isLoggedIn(user);
|
||||
const infix = isLoggedIn ? 'self' : 'any';
|
||||
|
||||
const myRankIdx = api.user ? config.ranks.indexOf(api.user.rank) : 0;
|
||||
|
|
|
@ -8,7 +8,7 @@ dummy-variables-rgx=_|dummy
|
|||
max-line-length=90
|
||||
|
||||
[messages control]
|
||||
disable=missing-docstring,no-self-use,too-few-public-methods
|
||||
disable=missing-docstring,no-self-use,too-few-public-methods,multiple-statements
|
||||
|
||||
[typecheck]
|
||||
generated-members=add|add_all
|
||||
|
|
|
@ -35,6 +35,7 @@ class Context(object):
|
|||
return default
|
||||
raise errors.ValidationError('Required paramter %r is missing.' % name)
|
||||
|
||||
# pylint: disable=redefined-builtin,too-many-arguments
|
||||
def get_param_as_int(
|
||||
self, name, required=False, min=None, max=None, default=None):
|
||||
if name in self.input:
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import datetime
|
||||
from szurubooru import errors
|
||||
from szurubooru.util import auth, tags
|
||||
from szurubooru.api.base_api import BaseApi
|
||||
|
||||
|
@ -49,8 +48,7 @@ class TagDetailApi(BaseApi):
|
|||
|
||||
if ctx.has_param('category'):
|
||||
auth.verify_privilege(ctx.user, 'tags:edit:category')
|
||||
tags.update_category(
|
||||
ctx.session, tag, ctx.get_param_as_string('category'))
|
||||
tags.update_category(tag, ctx.get_param_as_string('category'))
|
||||
|
||||
if ctx.has_param('suggestions'):
|
||||
auth.verify_privilege(ctx.user, 'tags:edit:suggestions')
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
import hashlib
|
||||
from szurubooru import config, errors, search
|
||||
from szurubooru import config, search
|
||||
from szurubooru.util import auth, users
|
||||
from szurubooru.api.base_api import BaseApi
|
||||
|
||||
def _serialize_user(authenticated_user, user):
|
||||
ret = {
|
||||
'id': user.user_id,
|
||||
'name': user.name,
|
||||
'rank': user.rank,
|
||||
'rankName': config.config['rank_names'].get(user.rank, 'Unknown'),
|
||||
|
@ -57,9 +56,7 @@ class UserListApi(BaseApi):
|
|||
password = ctx.get_param_as_string('password', required=True)
|
||||
email = ctx.get_param_as_string('email', required=True)
|
||||
|
||||
if users.get_by_name(ctx.session, name):
|
||||
raise errors.IntegrityError('User %r already exists.' % name)
|
||||
user = users.create_user(ctx.session, name, password, email)
|
||||
user = users.create_user(ctx.session, name, password, email, ctx.user)
|
||||
ctx.session.add(user)
|
||||
ctx.session.commit()
|
||||
return {'user': _serialize_user(ctx.user, user)}
|
||||
|
@ -69,13 +66,13 @@ class UserDetailApi(BaseApi):
|
|||
auth.verify_privilege(ctx.user, 'users:view')
|
||||
user = users.get_by_name(ctx.session, user_name)
|
||||
if not user:
|
||||
raise errors.NotFoundError('User %r not found.' % user_name)
|
||||
raise users.UserNotFoundError('User %r not found.' % user_name)
|
||||
return {'user': _serialize_user(ctx.user, user)}
|
||||
|
||||
def put(self, ctx, user_name):
|
||||
user = users.get_by_name(ctx.session, user_name)
|
||||
if not user:
|
||||
raise errors.NotFoundError('User %r not found.' % user_name)
|
||||
raise users.UserNotFoundError('User %r not found.' % user_name)
|
||||
|
||||
if ctx.user.user_id == user.user_id:
|
||||
infix = 'self'
|
||||
|
@ -84,10 +81,8 @@ class UserDetailApi(BaseApi):
|
|||
|
||||
if ctx.has_param('name'):
|
||||
auth.verify_privilege(ctx.user, 'users:edit:%s:name' % infix)
|
||||
other_user = users.get_by_name(ctx.session, ctx.get_param_as_string('name'))
|
||||
if other_user and other_user.user_id != user.user_id:
|
||||
raise errors.IntegrityError('User %r already exists.' % user.name)
|
||||
users.update_name(user, ctx.get_param_as_string('name'))
|
||||
users.update_name(
|
||||
ctx.session, user, ctx.get_param_as_string('name'), ctx.user)
|
||||
|
||||
if ctx.has_param('password'):
|
||||
auth.verify_privilege(ctx.user, 'users:edit:%s:pass' % infix)
|
||||
|
@ -114,7 +109,7 @@ class UserDetailApi(BaseApi):
|
|||
def delete(self, ctx, user_name):
|
||||
user = users.get_by_name(ctx.session, user_name)
|
||||
if not user:
|
||||
raise errors.NotFoundError('User %r not found.' % user_name)
|
||||
raise users.UserNotFoundError('User %r not found.' % user_name)
|
||||
|
||||
if ctx.user.user_id == user.user_id:
|
||||
infix = 'self'
|
||||
|
|
|
@ -13,52 +13,48 @@ def merge(left, right):
|
|||
left[key] = right[key]
|
||||
return left
|
||||
|
||||
class Config(object):
|
||||
''' Config parser and container. '''
|
||||
def __init__(self):
|
||||
with open('../config.yaml.dist') as handle:
|
||||
self.config = yaml.load(handle.read())
|
||||
def read_config():
|
||||
with open('../config.yaml.dist') as handle:
|
||||
ret = yaml.load(handle.read())
|
||||
if os.path.exists('../config.yaml'):
|
||||
with open('../config.yaml') as handle:
|
||||
self.config = merge(self.config, yaml.load(handle.read()))
|
||||
self._validate()
|
||||
ret = merge(ret, yaml.load(handle.read()))
|
||||
return ret
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.config[key]
|
||||
|
||||
def _validate(self):
|
||||
'''
|
||||
Check whether config doesn't contain errors that might prove
|
||||
lethal at runtime.
|
||||
'''
|
||||
all_ranks = self['ranks']
|
||||
for privilege, rank in self['privileges'].items():
|
||||
if rank not in all_ranks:
|
||||
raise errors.ConfigError(
|
||||
'Rank %r for privilege %r is missing' % (rank, privilege))
|
||||
for rank in ['anonymous', 'admin', 'nobody']:
|
||||
if rank not in all_ranks:
|
||||
raise errors.ConfigError('Protected rank %r is missing' % rank)
|
||||
if self['default_rank'] not in all_ranks:
|
||||
def validate_config(src):
|
||||
'''
|
||||
Check whether config doesn't contain errors that might prove
|
||||
lethal at runtime.
|
||||
'''
|
||||
all_ranks = src['ranks']
|
||||
for privilege, rank in src['privileges'].items():
|
||||
if rank not in all_ranks:
|
||||
raise errors.ConfigError(
|
||||
'Default rank %r is not on the list of known ranks' % (
|
||||
self['default_rank']))
|
||||
'Rank %r for privilege %r is missing' % (rank, privilege))
|
||||
for rank in ['anonymous', 'admin', 'nobody']:
|
||||
if rank not in all_ranks:
|
||||
raise errors.ConfigError('Protected rank %r is missing' % rank)
|
||||
if src['default_rank'] not in all_ranks:
|
||||
raise errors.ConfigError(
|
||||
'Default rank %r is not on the list of known ranks' % (
|
||||
src['default_rank']))
|
||||
|
||||
for key in ['base_url', 'api_url', 'data_url', 'data_dir']:
|
||||
if not self[key]:
|
||||
raise errors.ConfigError(
|
||||
'Service is not configured: %r is missing' % key)
|
||||
|
||||
if not os.path.isabs(self['data_dir']):
|
||||
for key in ['base_url', 'api_url', 'data_url', 'data_dir']:
|
||||
if not src[key]:
|
||||
raise errors.ConfigError(
|
||||
'data_dir must be an absolute path')
|
||||
'Service is not configured: %r is missing' % key)
|
||||
|
||||
for key in ['schema', 'host', 'port', 'user', 'pass', 'name']:
|
||||
if not self['database'][key]:
|
||||
raise errors.ConfigError(
|
||||
'Database is not configured: %r is missing' % key)
|
||||
if not os.path.isabs(src['data_dir']):
|
||||
raise errors.ConfigError(
|
||||
'data_dir must be an absolute path')
|
||||
|
||||
if not len(self['tag_categories']):
|
||||
raise errors.ConfigError('Must have at least one tag category')
|
||||
for key in ['schema', 'host', 'port', 'user', 'pass', 'name']:
|
||||
if not src['database'][key]:
|
||||
raise errors.ConfigError(
|
||||
'Database is not configured: %r is missing' % key)
|
||||
|
||||
config = Config() # pylint: disable=invalid-name
|
||||
if not len(src['tag_categories']):
|
||||
raise errors.ConfigError('Must have at least one tag category')
|
||||
|
||||
config = read_config() # pylint: disable=invalid-name
|
||||
validate_config(config)
|
||||
|
|
|
@ -53,7 +53,7 @@ class TagName(Base):
|
|||
__tablename__ = 'tag_name'
|
||||
tag_name_id = Column('tag_name_id', Integer, primary_key=True)
|
||||
tag_id = Column('tag_id', Integer, ForeignKey('tag.id'))
|
||||
name = Column('name', String(50), nullable=False, unique=True)
|
||||
name = Column('name', String(64), nullable=False, unique=True)
|
||||
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
|
|
@ -11,7 +11,7 @@ class User(Base):
|
|||
name = Column('name', String(50), nullable=False, unique=True)
|
||||
password_hash = Column('password_hash', String(64), nullable=False)
|
||||
password_salt = Column('password_salt', String(32))
|
||||
email = Column('email', String(200), nullable=True)
|
||||
email = Column('email', String(64), nullable=True)
|
||||
rank = Column('rank', String(32), nullable=False)
|
||||
creation_time = Column('creation_time', DateTime, nullable=False)
|
||||
last_login_time = Column('last_login_time', DateTime)
|
||||
|
|
|
@ -1,20 +1,7 @@
|
|||
class ConfigError(RuntimeError):
|
||||
''' A problem with configuration file. '''
|
||||
|
||||
class AuthError(RuntimeError):
|
||||
''' Generic authentication error '''
|
||||
|
||||
class IntegrityError(RuntimeError):
|
||||
''' Database integrity error (e.g. trying to edit nonexisting resource) '''
|
||||
|
||||
class ValidationError(RuntimeError):
|
||||
''' Validation error (e.g. trying to create user with invalid name) '''
|
||||
|
||||
class SearchError(RuntimeError):
|
||||
''' Search error (e.g. trying to use special: where it doesn't make sense) '''
|
||||
|
||||
class NotFoundError(RuntimeError):
|
||||
''' Error thrown when a resource (usually DB) couldn't be found. '''
|
||||
|
||||
class ProcessingError(RuntimeError):
|
||||
''' Error thrown by things such as thumbnail generator. '''
|
||||
class ConfigError(RuntimeError): pass
|
||||
class AuthError(RuntimeError): pass
|
||||
class IntegrityError(RuntimeError): pass
|
||||
class ValidationError(RuntimeError): pass
|
||||
class SearchError(RuntimeError): pass
|
||||
class NotFoundError(RuntimeError): pass
|
||||
class ProcessingError(RuntimeError): pass
|
||||
|
|
|
@ -21,13 +21,14 @@ class ContextAdapter(object):
|
|||
def process_request(self, request, _response):
|
||||
request.context.files = {}
|
||||
request.context.input = {}
|
||||
# pylint: disable=protected-access
|
||||
for key, value in request._params.items():
|
||||
request.context.input[key] = value
|
||||
|
||||
if request.content_length in (None, 0):
|
||||
return
|
||||
|
||||
if 'multipart/form-data' in (request.content_type or ''):
|
||||
if request.content_type and 'multipart/form-data' in request.content_type:
|
||||
# obscure, claims to "avoid a bug in cgi.FieldStorage"
|
||||
request.env.setdefault('QUERY_STRING', '')
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ def upgrade():
|
|||
'tag_name',
|
||||
sa.Column('tag_name_id', sa.Integer(), nullable=False),
|
||||
sa.Column('tag_id', sa.Integer(), nullable=True),
|
||||
sa.Column('name', sa.String(length=50), nullable=False),
|
||||
sa.Column('name', sa.String(length=64), nullable=False),
|
||||
sa.ForeignKeyConstraint(['tag_id'], ['tag.id']),
|
||||
sa.PrimaryKeyConstraint('tag_name_id'),
|
||||
sa.UniqueConstraint('name'))
|
||||
|
|
|
@ -20,7 +20,7 @@ def upgrade():
|
|||
sa.Column('name', sa.String(length=50), nullable=False),
|
||||
sa.Column('password_hash', sa.String(length=64), nullable=False),
|
||||
sa.Column('password_salt', sa.String(length=32), nullable=True),
|
||||
sa.Column('email', sa.String(length=200), nullable=True),
|
||||
sa.Column('email', sa.String(length=64), nullable=True),
|
||||
sa.Column('rank', sa.String(length=32), nullable=False),
|
||||
sa.Column('creation_time', sa.DateTime(), nullable=False),
|
||||
sa.Column('last_login_time', sa.DateTime()),
|
||||
|
|
|
@ -19,7 +19,7 @@ def test_ctx(
|
|||
config_injector({
|
||||
'tag_categories': ['meta', 'character', 'copyright'],
|
||||
'tag_name_regex': '^[^!]*$',
|
||||
'ranks': ['regular_user'],
|
||||
'ranks': ['anonymous', 'regular_user'],
|
||||
'privileges': {'tags:create': 'regular_user'},
|
||||
})
|
||||
ret = misc.dotdict()
|
||||
|
@ -86,12 +86,13 @@ def test_trying_to_create_tag_without_names(test_ctx):
|
|||
},
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
|
||||
def test_trying_to_create_tag_with_invalid_name(test_ctx):
|
||||
@pytest.mark.parametrize('names', [['!'], ['x' * 65]])
|
||||
def test_trying_to_create_tag_with_invalid_name(test_ctx, names):
|
||||
with pytest.raises(tags.InvalidNameError):
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'names': ['!'],
|
||||
'names': names,
|
||||
'category': 'meta',
|
||||
'suggestions': [],
|
||||
'implications': [],
|
||||
|
@ -244,7 +245,6 @@ def test_trying_to_create_tag_with_invalid_relation(test_ctx, input):
|
|||
}
|
||||
])
|
||||
def test_tag_trying_to_relate_to_itself(test_ctx, input):
|
||||
assert get_tag(test_ctx.session, 'tag') is None
|
||||
with pytest.raises(tags.RelationError):
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
|
@ -252,5 +252,14 @@ def test_tag_trying_to_relate_to_itself(test_ctx, input):
|
|||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
assert get_tag(test_ctx.session, 'tag') is None
|
||||
|
||||
# TODO: test bad privileges
|
||||
# TODO: test max length
|
||||
def test_trying_to_create_tag_without_privileges(test_ctx):
|
||||
with pytest.raises(errors.AuthError):
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'names': ['tag'],
|
||||
'category': 'meta',
|
||||
'suggestions': ['tag'],
|
||||
'implications': [],
|
||||
},
|
||||
user=test_ctx.user_factory(rank='anonymous')))
|
||||
|
|
|
@ -19,7 +19,7 @@ def test_ctx(
|
|||
config_injector({
|
||||
'tag_categories': ['meta', 'character', 'copyright'],
|
||||
'tag_name_regex': '^[^!]*$',
|
||||
'ranks': ['regular_user'],
|
||||
'ranks': ['anonymous', 'regular_user'],
|
||||
'privileges': {
|
||||
'tags:edit:names': 'regular_user',
|
||||
'tags:edit:category': 'regular_user',
|
||||
|
@ -110,6 +110,7 @@ def test_duplicating_names(test_ctx):
|
|||
@pytest.mark.parametrize('input', [
|
||||
{'names': []},
|
||||
{'names': ['!']},
|
||||
{'names': ['x' * 65]},
|
||||
])
|
||||
def test_trying_to_set_invalid_name(test_ctx, input):
|
||||
test_ctx.session.add(test_ctx.tag_factory(names=['tag1'], category='meta'))
|
||||
|
@ -252,5 +253,18 @@ def test_tag_trying_to_relate_to_itself(test_ctx, input):
|
|||
input=input, user=test_ctx.user_factory(rank='regular_user')),
|
||||
'tag1')
|
||||
|
||||
# TODO: test bad privileges
|
||||
# TODO: test max length
|
||||
@pytest.mark.parametrize('input', [
|
||||
{'names': 'whatever'},
|
||||
{'category': 'whatever'},
|
||||
{'suggestions': ['whatever']},
|
||||
{'implications': ['whatever']},
|
||||
])
|
||||
def test_trying_to_update_tag_without_privileges(test_ctx, input):
|
||||
test_ctx.session.add(test_ctx.tag_factory(names=['tag'], category='meta'))
|
||||
test_ctx.session.commit()
|
||||
with pytest.raises(errors.AuthError):
|
||||
test_ctx.api.put(
|
||||
test_ctx.context_factory(
|
||||
input=input,
|
||||
user=test_ctx.user_factory(rank='anonymous')),
|
||||
'tag')
|
||||
|
|
|
@ -1,18 +1,14 @@
|
|||
import datetime
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from szurubooru import api, db, errors
|
||||
from szurubooru.util import auth
|
||||
from szurubooru.util import auth, misc, users
|
||||
|
||||
def get_user(session, name):
|
||||
return session.query(db.User).filter_by(name=name).first()
|
||||
|
||||
@pytest.fixture
|
||||
def user_list_api():
|
||||
return api.UserListApi()
|
||||
|
||||
def test_creating_users(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_list_api):
|
||||
def test_ctx(
|
||||
session, config_injector, context_factory, user_factory):
|
||||
config_injector({
|
||||
'secret': '',
|
||||
'user_name_regex': '.{3,}',
|
||||
|
@ -23,94 +19,122 @@ def test_creating_users(
|
|||
'rank_names': {},
|
||||
'privileges': {'users:create': 'anonymous'},
|
||||
})
|
||||
ret = misc.dotdict()
|
||||
ret.session = session
|
||||
ret.context_factory = context_factory
|
||||
ret.user_factory = user_factory
|
||||
ret.api = api.UserListApi()
|
||||
return ret
|
||||
|
||||
user_list_api.post(
|
||||
context_factory(
|
||||
def test_creating_user(test_ctx, fake_datetime):
|
||||
fake_datetime(datetime.datetime(1969, 2, 12))
|
||||
result = test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'name': 'chewie1',
|
||||
'email': 'asd@asd.asd',
|
||||
'password': 'oks',
|
||||
},
|
||||
user=user_factory(rank='regular_user')))
|
||||
user_list_api.post(
|
||||
context_factory(
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
assert result == {
|
||||
'user': {
|
||||
'avatarStyle': 'gravatar',
|
||||
'avatarUrl': 'http://gravatar.com/avatar/' +
|
||||
'6f370c8c7109534c3d5c394123a477d7?d=retro&s=200',
|
||||
'creationTime': datetime.datetime(1969, 2, 12),
|
||||
'lastLoginTime': None,
|
||||
'name': 'chewie1',
|
||||
'rank': 'admin',
|
||||
'rankName': 'Unknown',
|
||||
}
|
||||
}
|
||||
user = get_user(test_ctx.session, 'chewie1')
|
||||
assert user.name == 'chewie1'
|
||||
assert user.email == 'asd@asd.asd'
|
||||
assert user.rank == 'admin'
|
||||
assert auth.is_valid_password(user, 'oks') is True
|
||||
assert auth.is_valid_password(user, 'invalid') is False
|
||||
|
||||
def test_first_user_becomes_admin_others_not(test_ctx):
|
||||
result1 = test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'name': 'chewie1',
|
||||
'email': 'asd@asd.asd',
|
||||
'password': 'oks',
|
||||
},
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
result2 = test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'name': 'chewie2',
|
||||
'email': 'asd@asd.asd',
|
||||
'password': 'sok',
|
||||
},
|
||||
user=user_factory(rank='regular_user')))
|
||||
|
||||
first_user = session.query(db.User).filter_by(name='chewie1').one()
|
||||
other_user = session.query(db.User).filter_by(name='chewie2').one()
|
||||
assert first_user.name == 'chewie1'
|
||||
assert first_user.email == 'asd@asd.asd'
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
assert result1['user']['rank'] == 'admin'
|
||||
assert result2['user']['rank'] == 'regular_user'
|
||||
first_user = get_user(test_ctx.session, 'chewie1')
|
||||
other_user = get_user(test_ctx.session, 'chewie2')
|
||||
assert first_user.rank == 'admin'
|
||||
assert auth.is_valid_password(first_user, 'oks') is True
|
||||
assert auth.is_valid_password(first_user, 'invalid') is False
|
||||
assert other_user.name == 'chewie2'
|
||||
assert other_user.email == 'asd@asd.asd'
|
||||
assert other_user.rank == 'regular_user'
|
||||
assert auth.is_valid_password(other_user, 'sok') is True
|
||||
assert auth.is_valid_password(other_user, 'invalid') is False
|
||||
|
||||
def test_creating_user_that_already_exists(
|
||||
config_injector, context_factory, user_factory, user_list_api):
|
||||
config_injector({
|
||||
'secret': '',
|
||||
'user_name_regex': '.{3,}',
|
||||
'password_regex': '.{3,}',
|
||||
'default_rank': 'regular_user',
|
||||
'thumbnails': {'avatar_width': 200},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'rank_names': {},
|
||||
'privileges': {'users:create': 'anonymous'},
|
||||
})
|
||||
user_list_api.post(
|
||||
context_factory(
|
||||
def test_creating_user_that_already_exists(test_ctx):
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'name': 'chewie',
|
||||
'email': 'asd@asd.asd',
|
||||
'password': 'oks',
|
||||
},
|
||||
user=user_factory(rank='regular_user')))
|
||||
with pytest.raises(errors.IntegrityError):
|
||||
user_list_api.post(
|
||||
context_factory(
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
with pytest.raises(users.UserAlreadyExistsError):
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'name': 'chewie',
|
||||
'email': 'asd@asd.asd',
|
||||
'password': 'oks',
|
||||
},
|
||||
user=user_factory(rank='regular_user')))
|
||||
with pytest.raises(errors.IntegrityError):
|
||||
user_list_api.post(
|
||||
context_factory(
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
with pytest.raises(users.UserAlreadyExistsError):
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'name': 'CHEWIE',
|
||||
'email': 'asd@asd.asd',
|
||||
'password': 'oks',
|
||||
},
|
||||
user=user_factory(rank='regular_user')))
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
|
||||
@pytest.mark.parametrize('field', ['name', 'email', 'password'])
|
||||
def test_missing_field(
|
||||
config_injector, context_factory, user_factory, user_list_api, field):
|
||||
config_injector({
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'privileges': {'users:create': 'anonymous'},
|
||||
})
|
||||
request = {
|
||||
def test_missing_field(test_ctx, field):
|
||||
input = {
|
||||
'name': 'chewie',
|
||||
'email': 'asd@asd.asd',
|
||||
'password': 'oks',
|
||||
}
|
||||
del request[field]
|
||||
del input[field]
|
||||
with pytest.raises(errors.ValidationError):
|
||||
user_list_api.post(
|
||||
context_factory(
|
||||
input=request, user=user_factory(rank='regular_user')))
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(
|
||||
input=input,
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
|
||||
@pytest.mark.parametrize('input', [
|
||||
{'name': '.'},
|
||||
{'name': 'x' * 51},
|
||||
{'password': '.'},
|
||||
{'rank': '.'},
|
||||
{'email': '.'},
|
||||
{'email': 'x' * 65},
|
||||
{'avatarStyle': 'manual'},
|
||||
])
|
||||
def test_invalid_inputs(test_ctx, input):
|
||||
user = test_ctx.user_factory(name='u1', rank='admin')
|
||||
test_ctx.session.add(user)
|
||||
with pytest.raises(errors.ValidationError):
|
||||
test_ctx.api.post(
|
||||
test_ctx.context_factory(input=input, user=user))
|
||||
|
||||
# TODO: test too long name
|
||||
# TODO: test bad password, email or name
|
||||
# TODO: support avatar and avatarStyle
|
||||
|
|
|
@ -1,17 +1,10 @@
|
|||
import pytest
|
||||
from datetime import datetime
|
||||
from szurubooru import api, db, errors
|
||||
from szurubooru.util import misc, users
|
||||
|
||||
@pytest.fixture
|
||||
def user_detail_api():
|
||||
return api.UserDetailApi()
|
||||
|
||||
def test_removing_oneself(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
def test_ctx(session, config_injector, context_factory, user_factory):
|
||||
config_injector({
|
||||
'privileges': {
|
||||
'users:delete:self': 'regular_user',
|
||||
|
@ -19,47 +12,37 @@ def test_removing_oneself(
|
|||
},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
user1 = user_factory(name='u1', rank='regular_user')
|
||||
user2 = user_factory(name='u2', rank='regular_user')
|
||||
session.add_all([user1, user2])
|
||||
session.commit()
|
||||
ret = misc.dotdict()
|
||||
ret.session = session
|
||||
ret.context_factory = context_factory
|
||||
ret.user_factory = user_factory
|
||||
ret.api = api.UserDetailApi()
|
||||
return ret
|
||||
|
||||
def test_removing_oneself(test_ctx):
|
||||
user1 = test_ctx.user_factory(name='u1', rank='regular_user')
|
||||
user2 = test_ctx.user_factory(name='u2', rank='regular_user')
|
||||
test_ctx.session.add_all([user1, user2])
|
||||
test_ctx.session.commit()
|
||||
with pytest.raises(errors.AuthError):
|
||||
user_detail_api.delete(context_factory(user=user1), 'u2')
|
||||
user_detail_api.delete(context_factory(user=user1), 'u1')
|
||||
assert [u.name for u in session.query(db.User).all()] == ['u2']
|
||||
test_ctx.api.delete(test_ctx.context_factory(user=user1), 'u2')
|
||||
result = test_ctx.api.delete(test_ctx.context_factory(user=user1), 'u1')
|
||||
assert result == {}
|
||||
assert [u.name for u in test_ctx.session.query(db.User).all()] == ['u2']
|
||||
|
||||
def test_removing_someone_else(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'privileges': {
|
||||
'users:delete:self': 'regular_user',
|
||||
'users:delete:any': 'mod',
|
||||
},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
user1 = user_factory(name='u1', rank='regular_user')
|
||||
user2 = user_factory(name='u2', rank='regular_user')
|
||||
mod_user = user_factory(rank='mod')
|
||||
session.add_all([user1, user2])
|
||||
session.commit()
|
||||
user_detail_api.delete(context_factory(user=mod_user), 'u1')
|
||||
user_detail_api.delete(context_factory(user=mod_user), 'u2')
|
||||
assert session.query(db.User).all() == []
|
||||
def test_removing_someone_else(test_ctx):
|
||||
user1 = test_ctx.user_factory(name='u1', rank='regular_user')
|
||||
user2 = test_ctx.user_factory(name='u2', rank='regular_user')
|
||||
mod_user = test_ctx.user_factory(rank='mod')
|
||||
test_ctx.session.add_all([user1, user2])
|
||||
test_ctx.session.commit()
|
||||
test_ctx.api.delete(test_ctx.context_factory(user=mod_user), 'u1')
|
||||
test_ctx.api.delete(test_ctx.context_factory(user=mod_user), 'u2')
|
||||
assert test_ctx.session.query(db.User).all() == []
|
||||
|
||||
def test_removing_non_existing(
|
||||
context_factory, config_injector, user_factory, user_detail_api):
|
||||
config_injector({
|
||||
'privileges': {
|
||||
'users:delete:self': 'regular_user',
|
||||
'users:delete:any': 'mod',
|
||||
},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
with pytest.raises(errors.NotFoundError):
|
||||
user_detail_api.delete(
|
||||
context_factory(user=user_factory(rank='regular_user')), 'bad')
|
||||
def test_removing_non_existing(test_ctx):
|
||||
with pytest.raises(users.UserNotFoundError):
|
||||
test_ctx.api.delete(
|
||||
test_ctx.context_factory(
|
||||
user=test_ctx.user_factory(rank='regular_user')), 'bad')
|
||||
|
||||
|
|
|
@ -1,116 +1,91 @@
|
|||
import datetime
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from szurubooru import api, db, errors
|
||||
from szurubooru.util import misc, users
|
||||
|
||||
@pytest.fixture
|
||||
def user_list_api():
|
||||
return api.UserListApi()
|
||||
|
||||
@pytest.fixture
|
||||
def user_detail_api():
|
||||
return api.UserDetailApi()
|
||||
|
||||
def test_retrieving_multiple(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_list_api):
|
||||
def test_ctx(session, context_factory, config_injector, user_factory):
|
||||
config_injector({
|
||||
'privileges': {'users:list': 'regular_user'},
|
||||
'privileges': {
|
||||
'users:list': 'regular_user',
|
||||
'users:view': 'regular_user',
|
||||
},
|
||||
'thumbnails': {'avatar_width': 200},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'rank_names': {},
|
||||
'rank_names': {'regular_user': 'Peasant'},
|
||||
})
|
||||
user1 = user_factory(name='u1', rank='mod')
|
||||
user2 = user_factory(name='u2', rank='mod')
|
||||
session.add_all([user1, user2])
|
||||
result = user_list_api.get(
|
||||
context_factory(
|
||||
ret = misc.dotdict()
|
||||
ret.session = session
|
||||
ret.context_factory = context_factory
|
||||
ret.user_factory = user_factory
|
||||
ret.list_api = api.UserListApi()
|
||||
ret.detail_api = api.UserDetailApi()
|
||||
return ret
|
||||
|
||||
def test_retrieving_multiple(test_ctx):
|
||||
user1 = test_ctx.user_factory(name='u1', rank='mod')
|
||||
user2 = test_ctx.user_factory(name='u2', rank='mod')
|
||||
test_ctx.session.add_all([user1, user2])
|
||||
result = test_ctx.list_api.get(
|
||||
test_ctx.context_factory(
|
||||
input={'query': '', 'page': 1},
|
||||
user=user_factory(rank='regular_user')))
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
assert result['query'] == ''
|
||||
assert result['page'] == 1
|
||||
assert result['pageSize'] == 100
|
||||
assert result['total'] == 2
|
||||
assert [u['name'] for u in result['users']] == ['u1', 'u2']
|
||||
|
||||
def test_retrieving_multiple_without_privileges(
|
||||
context_factory, config_injector, user_factory, user_list_api):
|
||||
config_injector({
|
||||
'privileges': {'users:list': 'regular_user'},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
def test_retrieving_multiple_without_privileges(test_ctx):
|
||||
with pytest.raises(errors.AuthError):
|
||||
user_list_api.get(
|
||||
context_factory(
|
||||
test_ctx.list_api.get(
|
||||
test_ctx.context_factory(
|
||||
input={'query': '', 'page': 1},
|
||||
user=user_factory(rank='anonymous')))
|
||||
user=test_ctx.user_factory(rank='anonymous')))
|
||||
|
||||
def test_retrieving_multiple_with_privileges(
|
||||
context_factory, config_injector, user_factory, user_list_api):
|
||||
config_injector({
|
||||
'privileges': {'users:list': 'regular_user'},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
result = user_list_api.get(
|
||||
context_factory(
|
||||
def test_retrieving_multiple_with_privileges(test_ctx):
|
||||
result = test_ctx.list_api.get(
|
||||
test_ctx.context_factory(
|
||||
input={'query': 'asd', 'page': 1},
|
||||
user=user_factory(rank='regular_user')))
|
||||
user=test_ctx.user_factory(rank='regular_user')))
|
||||
assert result['query'] == 'asd'
|
||||
assert result['page'] == 1
|
||||
assert result['pageSize'] == 100
|
||||
assert result['total'] == 0
|
||||
assert [u['name'] for u in result['users']] == []
|
||||
|
||||
def test_retrieving_single(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'privileges': {'users:view': 'regular_user'},
|
||||
'thumbnails': {'avatar_width': 200},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'rank_names': {},
|
||||
})
|
||||
user = user_factory(name='u1', rank='regular_user')
|
||||
session.add(user)
|
||||
result = user_detail_api.get(
|
||||
context_factory(
|
||||
def test_retrieving_single(test_ctx):
|
||||
test_ctx.session.add(test_ctx.user_factory(name='u1', rank='regular_user'))
|
||||
result = test_ctx.detail_api.get(
|
||||
test_ctx.context_factory(
|
||||
input={'query': '', 'page': 1},
|
||||
user=user_factory(rank='regular_user')),
|
||||
user=test_ctx.user_factory(rank='regular_user')),
|
||||
'u1')
|
||||
assert result['user']['id'] == user.user_id
|
||||
assert result['user']['name'] == 'u1'
|
||||
assert result['user']['rank'] == 'regular_user'
|
||||
assert result['user']['creationTime'] == datetime(1997, 1, 1)
|
||||
assert result['user']['lastLoginTime'] == None
|
||||
assert result['user']['avatarStyle'] == 'gravatar'
|
||||
assert result == {
|
||||
'user': {
|
||||
'name': 'u1',
|
||||
'rank': 'regular_user',
|
||||
'rankName': 'Peasant',
|
||||
'creationTime': datetime.datetime(1997, 1, 1),
|
||||
'lastLoginTime': None,
|
||||
'avatarStyle': 'gravatar',
|
||||
'avatarUrl': 'http://gravatar.com/avatar/' +
|
||||
'275876e34cf609db118f3d84b799a790?d=retro&s=200',
|
||||
}
|
||||
}
|
||||
|
||||
def test_retrieving_non_existing(
|
||||
context_factory, config_injector, user_factory, user_detail_api):
|
||||
config_injector({
|
||||
'privileges': {'users:view': 'regular_user'},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
with pytest.raises(errors.NotFoundError):
|
||||
user_detail_api.get(
|
||||
context_factory(
|
||||
def test_retrieving_non_existing(test_ctx):
|
||||
with pytest.raises(users.UserNotFoundError):
|
||||
test_ctx.detail_api.get(
|
||||
test_ctx.context_factory(
|
||||
input={'query': '', 'page': 1},
|
||||
user=user_factory(rank='regular_user')),
|
||||
user=test_ctx.user_factory(rank='regular_user')),
|
||||
'-')
|
||||
|
||||
def test_retrieving_single_without_privileges(
|
||||
context_factory, config_injector, user_factory, user_detail_api):
|
||||
config_injector({
|
||||
'privileges': {'users:view': 'regular_user'},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
def test_retrieving_single_without_privileges(test_ctx):
|
||||
with pytest.raises(errors.AuthError):
|
||||
user_detail_api.get(
|
||||
context_factory(
|
||||
test_ctx.detail_api.get(
|
||||
test_ctx.context_factory(
|
||||
input={'query': '', 'page': 1},
|
||||
user=user_factory(rank='anonymous')),
|
||||
user=test_ctx.user_factory(rank='anonymous')),
|
||||
'-')
|
||||
|
|
|
@ -1,22 +1,19 @@
|
|||
import datetime
|
||||
import pytest
|
||||
from szurubooru import api, db, errors
|
||||
from szurubooru.util import auth
|
||||
from szurubooru import api, config, db, errors
|
||||
from szurubooru.util import auth, misc, users
|
||||
|
||||
def get_user(session, name):
|
||||
return session.query(db.User).filter_by(name=name).first()
|
||||
|
||||
@pytest.fixture
|
||||
def user_detail_api():
|
||||
return api.UserDetailApi()
|
||||
|
||||
def test_updating_user(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
def test_ctx(
|
||||
session, config_injector, context_factory, user_factory):
|
||||
config_injector({
|
||||
'secret': '',
|
||||
'user_name_regex': '.{3,}',
|
||||
'password_regex': '.{3,}',
|
||||
'thumbnails': {'avatar_width': 200},
|
||||
'thumbnails': {'avatar_width': 200, 'avatar_height': 200},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'rank_names': {},
|
||||
'privileges': {
|
||||
|
@ -25,12 +22,25 @@ def test_updating_user(
|
|||
'users:edit:self:email': 'regular_user',
|
||||
'users:edit:self:rank': 'mod',
|
||||
'users:edit:self:avatar': 'mod',
|
||||
'users:edit:any:name': 'mod',
|
||||
'users:edit:any:pass': 'mod',
|
||||
'users:edit:any:email': 'mod',
|
||||
'users:edit:any:rank': 'admin',
|
||||
'users:edit:any:avatar': 'admin',
|
||||
},
|
||||
})
|
||||
user = user_factory(name='u1', rank='admin')
|
||||
session.add(user)
|
||||
user_detail_api.put(
|
||||
context_factory(
|
||||
ret = misc.dotdict()
|
||||
ret.session = session
|
||||
ret.context_factory = context_factory
|
||||
ret.user_factory = user_factory
|
||||
ret.api = api.UserDetailApi()
|
||||
return ret
|
||||
|
||||
def test_updating_user(test_ctx):
|
||||
user = test_ctx.user_factory(name='u1', rank='admin')
|
||||
test_ctx.session.add(user)
|
||||
result = test_ctx.api.put(
|
||||
test_ctx.context_factory(
|
||||
input={
|
||||
'name': 'chewie',
|
||||
'email': 'asd@asd.asd',
|
||||
|
@ -40,7 +50,20 @@ def test_updating_user(
|
|||
},
|
||||
user=user),
|
||||
'u1')
|
||||
user = session.query(db.User).filter_by(name='chewie').one()
|
||||
assert result == {
|
||||
'user': {
|
||||
'avatarStyle': 'gravatar',
|
||||
'avatarUrl': 'http://gravatar.com/avatar/' +
|
||||
'6f370c8c7109534c3d5c394123a477d7?d=retro&s=200',
|
||||
'creationTime': datetime.datetime(1997, 1, 1),
|
||||
'lastLoginTime': None,
|
||||
'email': 'asd@asd.asd',
|
||||
'name': 'chewie',
|
||||
'rank': 'mod',
|
||||
'rankName': 'Unknown',
|
||||
}
|
||||
}
|
||||
user = get_user(test_ctx.session, 'chewie')
|
||||
assert user.name == 'chewie'
|
||||
assert user.email == 'asd@asd.asd'
|
||||
assert user.rank == 'mod'
|
||||
|
@ -48,192 +71,98 @@ def test_updating_user(
|
|||
assert auth.is_valid_password(user, 'oks') is True
|
||||
assert auth.is_valid_password(user, 'invalid') is False
|
||||
|
||||
def test_update_changing_nothing(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'thumbnails': {'avatar_width': 200},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'rank_names': {},
|
||||
})
|
||||
user = user_factory(name='u1', rank='admin')
|
||||
session.add(user)
|
||||
user_detail_api.put(context_factory(user=user), 'u1')
|
||||
user = session.query(db.User).filter_by(name='u1').one()
|
||||
def test_update_changing_nothing(test_ctx):
|
||||
user = test_ctx.user_factory(name='u1', rank='admin')
|
||||
test_ctx.session.add(user)
|
||||
test_ctx.api.put(test_ctx.context_factory(user=user), 'u1')
|
||||
user = get_user(test_ctx.session, 'u1')
|
||||
assert user.name == 'u1'
|
||||
assert user.email == 'dummy'
|
||||
assert user.rank == 'admin'
|
||||
|
||||
def test_updating_non_existing_user(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
})
|
||||
user = user_factory(name='u1', rank='admin')
|
||||
session.add(user)
|
||||
with pytest.raises(errors.NotFoundError):
|
||||
user_detail_api.put(context_factory(user=user), 'u2')
|
||||
def test_updating_non_existing_user(test_ctx):
|
||||
user = test_ctx.user_factory(name='u1', rank='admin')
|
||||
test_ctx.session.add(user)
|
||||
with pytest.raises(users.UserNotFoundError):
|
||||
test_ctx.api.put(test_ctx.context_factory(user=user), 'u2')
|
||||
|
||||
def test_removing_email(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'thumbnails': {'avatar_width': 200},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'rank_names': {},
|
||||
'privileges': {'users:edit:self:email': 'regular_user'},
|
||||
})
|
||||
user = user_factory(name='u1', rank='admin')
|
||||
session.add(user)
|
||||
user_detail_api.put(
|
||||
context_factory(input={'email': ''}, user=user), 'u1')
|
||||
assert session.query(db.User).filter_by(name='u1').one().email is None
|
||||
def test_removing_email(test_ctx):
|
||||
user = test_ctx.user_factory(name='u1', rank='admin')
|
||||
test_ctx.session.add(user)
|
||||
test_ctx.api.put(
|
||||
test_ctx.context_factory(input={'email': ''}, user=user), 'u1')
|
||||
assert get_user(test_ctx.session, 'u1').email is None
|
||||
|
||||
@pytest.mark.parametrize('request', [
|
||||
@pytest.mark.parametrize('input', [
|
||||
{'name': '.'},
|
||||
{'name': 'x' * 51},
|
||||
{'password': '.'},
|
||||
{'rank': '.'},
|
||||
{'email': '.'},
|
||||
{'email': 'x' * 65},
|
||||
{'avatarStyle': 'manual'},
|
||||
])
|
||||
def test_invalid_inputs(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api,
|
||||
request):
|
||||
config_injector({
|
||||
'user_name_regex': '.{3,}',
|
||||
'password_regex': '.{3,}',
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'privileges': {
|
||||
'users:edit:self:name': 'regular_user',
|
||||
'users:edit:self:pass': 'regular_user',
|
||||
'users:edit:self:email': 'regular_user',
|
||||
'users:edit:self:rank': 'mod',
|
||||
'users:edit:self:avatar': 'mod',
|
||||
},
|
||||
})
|
||||
user = user_factory(name='u1', rank='admin')
|
||||
session.add(user)
|
||||
def test_invalid_inputs(test_ctx, input):
|
||||
user = test_ctx.user_factory(name='u1', rank='admin')
|
||||
test_ctx.session.add(user)
|
||||
with pytest.raises(errors.ValidationError):
|
||||
user_detail_api.put(context_factory(input=request, user=user), 'u1')
|
||||
test_ctx.api.put(
|
||||
test_ctx.context_factory(input=input, user=user), 'u1')
|
||||
|
||||
@pytest.mark.parametrize('request', [
|
||||
@pytest.mark.parametrize('input', [
|
||||
{'name': 'whatever'},
|
||||
{'email': 'whatever'},
|
||||
{'rank': 'whatever'},
|
||||
{'password': 'whatever'},
|
||||
{'avatarStyle': 'whatever'},
|
||||
])
|
||||
def test_user_trying_to_update_someone_else(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api,
|
||||
request):
|
||||
config_injector({
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'privileges': {
|
||||
'users:edit:any:name': 'mod',
|
||||
'users:edit:any:pass': 'mod',
|
||||
'users:edit:any:email': 'mod',
|
||||
'users:edit:any:rank': 'admin',
|
||||
'users:edit:any:avatar': 'admin',
|
||||
},
|
||||
})
|
||||
user1 = user_factory(name='u1', rank='regular_user')
|
||||
user2 = user_factory(name='u2', rank='regular_user')
|
||||
session.add_all([user1, user2])
|
||||
def test_user_trying_to_update_someone_else(test_ctx, input):
|
||||
user1 = test_ctx.user_factory(name='u1', rank='regular_user')
|
||||
user2 = test_ctx.user_factory(name='u2', rank='regular_user')
|
||||
test_ctx.session.add_all([user1, user2])
|
||||
with pytest.raises(errors.AuthError):
|
||||
user_detail_api.put(
|
||||
context_factory(input=request, user=user1), user2.name)
|
||||
test_ctx.api.put(
|
||||
test_ctx.context_factory(input=input, user=user1), user2.name)
|
||||
|
||||
def test_user_trying_to_become_someone_else(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'privileges': {'users:edit:self:name': 'regular_user'},
|
||||
})
|
||||
user1 = user_factory(name='me', rank='regular_user')
|
||||
user2 = user_factory(name='her', rank='regular_user')
|
||||
session.add_all([user1, user2])
|
||||
with pytest.raises(errors.IntegrityError):
|
||||
user_detail_api.put(
|
||||
context_factory(input={'name': 'her'}, user=user1),
|
||||
def test_user_trying_to_become_someone_else(test_ctx):
|
||||
user1 = test_ctx.user_factory(name='me', rank='regular_user')
|
||||
user2 = test_ctx.user_factory(name='her', rank='regular_user')
|
||||
test_ctx.session.add_all([user1, user2])
|
||||
with pytest.raises(users.UserAlreadyExistsError):
|
||||
test_ctx.api.put(
|
||||
test_ctx.context_factory(input={'name': 'her'}, user=user1),
|
||||
'me')
|
||||
with pytest.raises(errors.IntegrityError):
|
||||
user_detail_api.put(
|
||||
context_factory(input={'name': 'HER'}, user=user1), 'me')
|
||||
with pytest.raises(users.UserAlreadyExistsError):
|
||||
test_ctx.api.put(
|
||||
test_ctx.context_factory(input={'name': 'HER'}, user=user1), 'me')
|
||||
|
||||
def test_mods_trying_to_become_admin(
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'privileges': {
|
||||
'users:edit:self:rank': 'mod',
|
||||
'users:edit:any:rank': 'admin',
|
||||
},
|
||||
})
|
||||
user1 = user_factory(name='u1', rank='mod')
|
||||
user2 = user_factory(name='u2', rank='mod')
|
||||
session.add_all([user1, user2])
|
||||
context = context_factory(input={'rank': 'admin'}, user=user1)
|
||||
def test_mods_trying_to_become_admin(test_ctx):
|
||||
user1 = test_ctx.user_factory(name='u1', rank='mod')
|
||||
user2 = test_ctx.user_factory(name='u2', rank='mod')
|
||||
test_ctx.session.add_all([user1, user2])
|
||||
context = test_ctx.context_factory(input={'rank': 'admin'}, user=user1)
|
||||
with pytest.raises(errors.AuthError):
|
||||
user_detail_api.put(context, user1.name)
|
||||
test_ctx.api.put(context, user1.name)
|
||||
with pytest.raises(errors.AuthError):
|
||||
user_detail_api.put(context, user2.name)
|
||||
test_ctx.api.put(context, user2.name)
|
||||
|
||||
def test_uploading_avatar(
|
||||
tmpdir,
|
||||
session,
|
||||
config_injector,
|
||||
context_factory,
|
||||
user_factory,
|
||||
user_detail_api):
|
||||
config_injector({
|
||||
'data_dir': str(tmpdir.mkdir('data')),
|
||||
'data_url': 'http://example.com/data/',
|
||||
'thumbnails': {'avatar_width': 200, 'avatar_height': 200},
|
||||
'ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||
'rank_names': {},
|
||||
'privileges': {'users:edit:self:avatar': 'mod'},
|
||||
})
|
||||
user = user_factory(name='u1', rank='mod')
|
||||
session.add(user)
|
||||
def test_uploading_avatar(test_ctx, tmpdir):
|
||||
config.config['data_dir'] = str(tmpdir.mkdir('data'))
|
||||
config.config['data_url'] = 'http://example.com/data/'
|
||||
|
||||
user = test_ctx.user_factory(name='u1', rank='mod')
|
||||
test_ctx.session.add(user)
|
||||
empty_pixel = \
|
||||
b'\x47\x49\x46\x38\x39\x61\x01\x00\x01\x00\x80\x01\x00\x00\x00\x00' \
|
||||
b'\xff\xff\xff\x21\xf9\x04\x01\x00\x00\x01\x00\x2c\x00\x00\x00\x00' \
|
||||
b'\x01\x00\x01\x00\x00\x02\x02\x4c\x01\x00\x3b'
|
||||
response = user_detail_api.put(
|
||||
context_factory(
|
||||
response = test_ctx.api.put(
|
||||
test_ctx.context_factory(
|
||||
input={'avatarStyle': 'manual'},
|
||||
files={'avatar': empty_pixel},
|
||||
user=user),
|
||||
'u1')
|
||||
user = session.query(db.User).filter_by(name='u1').one()
|
||||
user = get_user(test_ctx.session, 'u1')
|
||||
assert user.avatar_style == user.AVATAR_MANUAL
|
||||
assert response['user']['avatarUrl'] == \
|
||||
'http://example.com/data/avatars/u1.jpg'
|
||||
|
||||
# TODO: test too long name
|
||||
|
|
|
@ -70,3 +70,6 @@ def icase_unique(source):
|
|||
target.append(source_item)
|
||||
target_low.append(source_item.lower())
|
||||
return target
|
||||
|
||||
def value_exceeds_column_size(value, column):
|
||||
return len(value) > column.property.columns[0].type.length
|
||||
|
|
|
@ -4,27 +4,11 @@ import sqlalchemy
|
|||
from szurubooru import config, db, errors
|
||||
from szurubooru.util import misc
|
||||
|
||||
class TagNotFoundError(errors.NotFoundError):
|
||||
def __init__(self, tag):
|
||||
super().__init__('Tag %r not found')
|
||||
|
||||
class TagAlreadyExistsError(errors.ValidationError):
|
||||
def __init__(self):
|
||||
super().__init__('One of names is already used by another tag.')
|
||||
|
||||
class InvalidNameError(errors.ValidationError):
|
||||
def __init__(self, message):
|
||||
super().__init__(message)
|
||||
|
||||
class InvalidCategoryError(errors.ValidationError):
|
||||
def __init__(self, category, valid_categories):
|
||||
super().__init__(
|
||||
'Category %r is invalid. Valid categories: %r.' % (
|
||||
category, valid_categories))
|
||||
|
||||
class RelationError(errors.ValidationError):
|
||||
def __init__(self, message):
|
||||
super().__init__(message)
|
||||
class TagNotFoundError(errors.NotFoundError): pass
|
||||
class TagAlreadyExistsError(errors.ValidationError): pass
|
||||
class InvalidNameError(errors.ValidationError): pass
|
||||
class InvalidCategoryError(errors.ValidationError): pass
|
||||
class RelationError(errors.ValidationError): pass
|
||||
|
||||
def _verify_name_validity(name):
|
||||
name_regex = config.config['tag_name_regex']
|
||||
|
@ -82,14 +66,16 @@ def create_tag(session, names, category, suggestions, implications):
|
|||
tag = db.Tag()
|
||||
tag.creation_time = datetime.datetime.now()
|
||||
update_names(session, tag, names)
|
||||
update_category(session, tag, category)
|
||||
update_category(tag, category)
|
||||
update_suggestions(session, tag, suggestions)
|
||||
update_implications(session, tag, implications)
|
||||
return tag
|
||||
|
||||
def update_category(session, tag, category):
|
||||
def update_category(tag, category):
|
||||
if not category in config.config['tag_categories']:
|
||||
raise InvalidCategoryError(category, config.config['tag_categories'])
|
||||
raise InvalidCategoryError(
|
||||
'Category %r is invalid. Valid categories: %r.' % (
|
||||
category, config.config['tag_categories']))
|
||||
tag.category = category
|
||||
|
||||
def update_names(session, tag, names):
|
||||
|
@ -100,23 +86,24 @@ def update_names(session, tag, names):
|
|||
_verify_name_validity(name)
|
||||
expr = sqlalchemy.sql.false()
|
||||
for name in names:
|
||||
if misc.value_exceeds_column_size(name, db.TagName.name):
|
||||
raise InvalidNameError('Name is too long.')
|
||||
expr = expr | db.TagName.name.ilike(name)
|
||||
if tag.tag_id:
|
||||
expr = expr & (db.TagName.tag_id != tag.tag_id)
|
||||
existing_tags = session.query(db.TagName).filter(expr).all()
|
||||
if len(existing_tags):
|
||||
raise TagAlreadyExistsError()
|
||||
raise TagAlreadyExistsError(
|
||||
'One of names is already used by another tag.')
|
||||
tag_names_to_remove = []
|
||||
for tag_name in tag.names:
|
||||
if tag_name.name.lower() not in [name.lower() for name in names]:
|
||||
if not _check_name_intersection([tag_name.name], names):
|
||||
tag_names_to_remove.append(tag_name)
|
||||
for tag_name in tag_names_to_remove:
|
||||
tag.names.remove(tag_name)
|
||||
for name in names:
|
||||
if name.lower() not in [tag_name.name.lower() for tag_name in tag.names]:
|
||||
tag_name = db.TagName(name)
|
||||
session.add(tag_name)
|
||||
tag.names.append(tag_name)
|
||||
if not _check_name_intersection(_get_plain_names(tag), [name]):
|
||||
tag.names.append(db.TagName(name))
|
||||
|
||||
def update_implications(session, tag, relations):
|
||||
if _check_name_intersection(_get_plain_names(tag), relations):
|
||||
|
|
|
@ -1,79 +1,16 @@
|
|||
import datetime
|
||||
import re
|
||||
from datetime import datetime
|
||||
from sqlalchemy import func
|
||||
from szurubooru import config, db, errors
|
||||
from szurubooru.util import auth, misc, files, images
|
||||
|
||||
def create_user(session, name, password, email):
|
||||
user = db.User()
|
||||
update_name(user, name)
|
||||
update_password(user, password)
|
||||
update_email(user, email)
|
||||
if not session.query(db.User).count():
|
||||
user.rank = 'admin'
|
||||
else:
|
||||
user.rank = config.config['default_rank']
|
||||
user.creation_time = datetime.now()
|
||||
user.avatar_style = db.User.AVATAR_GRAVATAR
|
||||
return user
|
||||
|
||||
def update_name(user, name):
|
||||
name = name.strip()
|
||||
name_regex = config.config['user_name_regex']
|
||||
if not re.match(name_regex, name):
|
||||
raise errors.ValidationError(
|
||||
'Name must satisfy regex %r.' % name_regex)
|
||||
user.name = name
|
||||
|
||||
def update_password(user, password):
|
||||
password_regex = config.config['password_regex']
|
||||
if not re.match(password_regex, password):
|
||||
raise errors.ValidationError(
|
||||
'Password must satisfy regex %r.' % password_regex)
|
||||
user.password_salt = auth.create_password()
|
||||
user.password_hash = auth.get_password_hash(user.password_salt, password)
|
||||
|
||||
def update_email(user, email):
|
||||
email = email.strip() or None
|
||||
if not misc.is_valid_email(email):
|
||||
raise errors.ValidationError(
|
||||
'%r is not a vaild email address.' % email)
|
||||
user.email = email
|
||||
|
||||
def update_rank(user, rank, authenticated_user):
|
||||
rank = rank.strip()
|
||||
available_ranks = config.config['ranks']
|
||||
if not rank in available_ranks:
|
||||
raise errors.ValidationError(
|
||||
'Bad rank %r. Valid ranks: %r' % (rank, available_ranks))
|
||||
if available_ranks.index(authenticated_user.rank) \
|
||||
< available_ranks.index(rank):
|
||||
raise errors.AuthError('Trying to set higher rank than your own.')
|
||||
user.rank = rank
|
||||
|
||||
def update_avatar(user, avatar_style, avatar_content):
|
||||
if avatar_style == 'gravatar':
|
||||
user.avatar_style = user.AVATAR_GRAVATAR
|
||||
elif avatar_style == 'manual':
|
||||
user.avatar_style = user.AVATAR_MANUAL
|
||||
if not avatar_content:
|
||||
raise errors.ValidationError('Avatar content missing.')
|
||||
image = images.Image(avatar_content)
|
||||
image.resize_fill(
|
||||
int(config.config['thumbnails']['avatar_width']),
|
||||
int(config.config['thumbnails']['avatar_height']))
|
||||
files.save('avatars/' + user.name.lower() + '.jpg', image.to_jpeg())
|
||||
else:
|
||||
raise errors.ValidationError('Unknown avatar style: %r' % avatar_style)
|
||||
|
||||
def bump_login_time(user):
|
||||
user.last_login_time = datetime.now()
|
||||
|
||||
def reset_password(user):
|
||||
password = auth.create_password()
|
||||
user.password_salt = auth.create_password()
|
||||
user.password_hash = auth.get_password_hash(user.password_salt, password)
|
||||
return password
|
||||
class UserNotFoundError(errors.NotFoundError): pass
|
||||
class UserAlreadyExistsError(errors.ValidationError): pass
|
||||
class InvalidNameError(errors.ValidationError): pass
|
||||
class InvalidEmailError(errors.ValidationError): pass
|
||||
class InvalidPasswordError(errors.ValidationError): pass
|
||||
class InvalidRankError(errors.ValidationError): pass
|
||||
class InvalidAvatarError(errors.ValidationError): pass
|
||||
|
||||
def get_by_name(session, name):
|
||||
return session.query(db.User) \
|
||||
|
@ -86,3 +23,82 @@ def get_by_name_or_email(session, name_or_email):
|
|||
(func.lower(db.User.name) == func.lower(name_or_email))
|
||||
| (func.lower(db.User.email) == func.lower(name_or_email))) \
|
||||
.first()
|
||||
|
||||
def create_user(session, name, password, email, auth_user):
|
||||
user = db.User()
|
||||
update_name(session, user, name, auth_user)
|
||||
update_password(user, password)
|
||||
update_email(user, email)
|
||||
if not session.query(db.User).count():
|
||||
user.rank = 'admin'
|
||||
else:
|
||||
user.rank = config.config['default_rank']
|
||||
user.creation_time = datetime.datetime.now()
|
||||
user.avatar_style = db.User.AVATAR_GRAVATAR
|
||||
return user
|
||||
|
||||
def update_name(session, user, name, auth_user):
|
||||
if misc.value_exceeds_column_size(name, db.User.name):
|
||||
raise InvalidNameError('User name is too long.')
|
||||
other_user = get_by_name(session, name)
|
||||
if other_user and other_user.user_id != auth_user.user_id:
|
||||
raise UserAlreadyExistsError('User %r already exists.' % name)
|
||||
name = name.strip()
|
||||
name_regex = config.config['user_name_regex']
|
||||
if not re.match(name_regex, name):
|
||||
raise InvalidNameError(
|
||||
'User name %r must satisfy regex %r.' % (name, name_regex))
|
||||
user.name = name
|
||||
|
||||
def update_password(user, password):
|
||||
password_regex = config.config['password_regex']
|
||||
if not re.match(password_regex, password):
|
||||
raise InvalidPasswordError(
|
||||
'Password must satisfy regex %r.' % password_regex)
|
||||
user.password_salt = auth.create_password()
|
||||
user.password_hash = auth.get_password_hash(user.password_salt, password)
|
||||
|
||||
def update_email(user, email):
|
||||
email = email.strip() or None
|
||||
if email and misc.value_exceeds_column_size(email, db.User.email):
|
||||
raise InvalidEmailError('Email is too long.')
|
||||
if not misc.is_valid_email(email):
|
||||
raise InvalidEmailError('E-mail is invalid.')
|
||||
user.email = email
|
||||
|
||||
def update_rank(user, rank, authenticated_user):
|
||||
rank = rank.strip()
|
||||
available_ranks = config.config['ranks']
|
||||
if not rank in available_ranks:
|
||||
raise InvalidRankError(
|
||||
'Rank %r is invalid. Valid ranks: %r' % (rank, available_ranks))
|
||||
if available_ranks.index(authenticated_user.rank) \
|
||||
< available_ranks.index(rank):
|
||||
raise errors.AuthError('Trying to set higher rank than your own.')
|
||||
user.rank = rank
|
||||
|
||||
def update_avatar(user, avatar_style, avatar_content):
|
||||
if avatar_style == 'gravatar':
|
||||
user.avatar_style = user.AVATAR_GRAVATAR
|
||||
elif avatar_style == 'manual':
|
||||
user.avatar_style = user.AVATAR_MANUAL
|
||||
if not avatar_content:
|
||||
raise InvalidAvatarError('Avatar content missing.')
|
||||
image = images.Image(avatar_content)
|
||||
image.resize_fill(
|
||||
int(config.config['thumbnails']['avatar_width']),
|
||||
int(config.config['thumbnails']['avatar_height']))
|
||||
files.save('avatars/' + user.name.lower() + '.jpg', image.to_jpeg())
|
||||
else:
|
||||
raise InvalidAvatarError(
|
||||
'Avatar style %r is invalid. Valid avatar styles: %r.' % (
|
||||
avatar_style, ['gravatar', 'manual']))
|
||||
|
||||
def bump_login_time(user):
|
||||
user.last_login_time = datetime.datetime.now()
|
||||
|
||||
def reset_password(user):
|
||||
password = auth.create_password()
|
||||
user.password_salt = auth.create_password()
|
||||
user.password_hash = auth.get_password_hash(user.password_salt, password)
|
||||
return password
|
||||
|
|
Loading…
Reference in a new issue