server/general: disable autoflush

This commit is contained in:
rr- 2016-08-26 13:43:55 +02:00
parent 7451d16baf
commit bb369efa99
25 changed files with 137 additions and 20 deletions

View file

@ -5,7 +5,7 @@ from szurubooru import config
# pylint: disable=invalid-name
_engine = sqlalchemy.create_engine(config.config['database'])
sessionmaker = sqlalchemy.orm.sessionmaker(bind=_engine)
sessionmaker = sqlalchemy.orm.sessionmaker(bind=_engine, autoflush=False)
session = sqlalchemy.orm.scoped_session(sessionmaker)
_data = threading.local()

View file

@ -18,6 +18,7 @@ def test_retrieving_multiple(user_factory, comment_factory, context_factory):
comment1 = comment_factory(text='text 1')
comment2 = comment_factory(text='text 2')
db.session.add_all([comment1, comment2])
db.session.flush()
with patch('szurubooru.func.comments.serialize_comment'):
comments.serialize_comment.return_value = 'serialized comment'
result = api.comment_api.get_comments(

View file

@ -19,6 +19,7 @@ def test_info_api(
},
})
db.session.add_all([post_factory(), post_factory()])
db.session.flush()
expected_config_key = {
'userNameRegex': '1',

View file

@ -16,6 +16,7 @@ def inject_config(config_injector):
def test_reset_sending_email(context_factory, user_factory):
db.session.add(user_factory(
name='u1', rank=db.User.RANK_REGULAR, email='user@example.com'))
db.session.flush()
for initiating_user in ['u1', 'user@example.com']:
with patch('szurubooru.func.mailer.send_mail'):
assert api.password_reset_api.start_password_reset(
@ -39,6 +40,7 @@ def test_trying_to_reset_non_existing(context_factory):
def test_trying_to_reset_without_email(context_factory, user_factory):
db.session.add(
user_factory(name='u1', rank=db.User.RANK_REGULAR, email=None))
db.session.flush()
with pytest.raises(errors.ValidationError):
api.password_reset_api.start_password_reset(
context_factory(), {'user_name': 'u1'})
@ -49,6 +51,7 @@ def test_confirming_with_good_token(context_factory, user_factory):
name='u1', rank=db.User.RANK_REGULAR, email='user@example.com')
old_hash = user.password_hash
db.session.add(user)
db.session.flush()
context = context_factory(
params={'token': '4ac0be176fb364f13ee6b634c43220e2'})
result = api.password_reset_api.finish_password_reset(
@ -66,6 +69,7 @@ def test_trying_to_confirm_non_existing(context_factory):
def test_trying_to_confirm_without_token(context_factory, user_factory):
db.session.add(user_factory(
name='u1', rank=db.User.RANK_REGULAR, email='user@example.com'))
db.session.flush()
with pytest.raises(errors.ValidationError):
api.password_reset_api.finish_password_reset(
context_factory(params={}), {'user_name': 'u1'})
@ -74,6 +78,7 @@ def test_trying_to_confirm_without_token(context_factory, user_factory):
def test_trying_to_confirm_with_bad_token(context_factory, user_factory):
db.session.add(user_factory(
name='u1', rank=db.User.RANK_REGULAR, email='user@example.com'))
db.session.flush()
with pytest.raises(errors.ValidationError):
api.password_reset_api.finish_password_reset(
context_factory(params={'token': 'bad'}), {'user_name': 'u1'})

View file

@ -13,6 +13,7 @@ def test_deleting(user_factory, post_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
post = post_factory(id=1)
db.session.add(post)
db.session.flush()
with patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.delete'):
result = api.post_api.delete_post(

View file

@ -18,6 +18,7 @@ def test_featuring(user_factory, post_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
post = post_factory(id=1)
db.session.add(post)
db.session.flush()
assert not posts.get_post_by_id(1).is_featured
with patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.snapshots.modify'):

View file

@ -19,6 +19,7 @@ def test_retrieving_multiple(user_factory, post_factory, context_factory):
post1 = post_factory(id=1)
post2 = post_factory(id=2)
db.session.add_all([post1, post2])
db.session.flush()
with patch('szurubooru.func.posts.serialize_post'):
posts.serialize_post.return_value = 'serialized post'
result = api.post_api.get_posts(
@ -82,6 +83,7 @@ def test_trying_to_retrieve_multiple_without_privileges(
def test_retrieving_single(user_factory, post_factory, context_factory):
db.session.add(post_factory(id=1))
db.session.flush()
with patch('szurubooru.func.posts.serialize_post'):
posts.serialize_post.return_value = 'serialized post'
result = api.post_api.get_post(

View file

@ -25,6 +25,7 @@ def test_retrieving_multiple(user_factory, context_factory):
snapshot1 = snapshot_factory()
snapshot2 = snapshot_factory()
db.session.add_all([snapshot1, snapshot2])
db.session.flush()
result = api.snapshot_api.get_snapshots(
context_factory(
params={'query': '', 'page': 1},

View file

@ -16,6 +16,7 @@ def test_deleting(user_factory, tag_category_factory, context_factory):
category = tag_category_factory(name='category')
db.session.add(tag_category_factory(name='root'))
db.session.add(category)
db.session.flush()
with patch('szurubooru.func.snapshots.delete'), \
patch('szurubooru.func.tags.export_to_json'):
result = api.tag_category_api.delete_tag_category(

View file

@ -19,6 +19,7 @@ def test_retrieving_multiple(
tag_category_factory(name='c1'),
tag_category_factory(name='c2'),
])
db.session.flush()
result = api.tag_category_api.get_tag_categories(
context_factory(user=user_factory(rank=db.User.RANK_REGULAR)))
assert [cat['name'] for cat in result['results']] == ['c1', 'c2']
@ -27,6 +28,7 @@ def test_retrieving_multiple(
def test_retrieving_single(
user_factory, tag_category_factory, context_factory):
db.session.add(tag_category_factory(name='cat'))
db.session.flush()
result = api.tag_category_api.get_tag_category(
context_factory(user=user_factory(rank=db.User.RANK_REGULAR)),
{'category_name': 'cat'})

View file

@ -23,6 +23,7 @@ def test_simple_updating(user_factory, tag_category_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
category = tag_category_factory(name='name', color='black')
db.session.add(category)
db.session.flush()
with patch('szurubooru.func.tag_categories.serialize_category'), \
patch('szurubooru.func.tag_categories.update_category_name'), \
patch('szurubooru.func.tag_categories.update_category_color'), \

View file

@ -18,6 +18,7 @@ def test_retrieving_multiple(user_factory, tag_factory, context_factory):
tag1 = tag_factory(names=['t1'])
tag2 = tag_factory(names=['t2'])
db.session.add_all([tag1, tag2])
db.session.flush()
with patch('szurubooru.func.tags.serialize_tag'):
tags.serialize_tag.return_value = 'serialized tag'
result = api.tag_api.get_tags(
@ -44,6 +45,7 @@ def test_trying_to_retrieve_multiple_without_privileges(
def test_retrieving_single(user_factory, tag_factory, context_factory):
db.session.add(tag_factory(names=['tag']))
db.session.flush()
with patch('szurubooru.func.tags.serialize_tag'):
tags.serialize_tag.return_value = 'serialized tag'
result = api.tag_api.get_tag(

View file

@ -11,6 +11,7 @@ def inject_config(config_injector):
def test_get_tag_siblings(user_factory, tag_factory, context_factory):
db.session.add(tag_factory(names=['tag']))
db.session.flush()
with patch('szurubooru.func.tags.serialize_tag'), \
patch('szurubooru.func.tags.get_tag_siblings'):
tags.serialize_tag.side_effect = \

View file

@ -19,6 +19,7 @@ def test_retrieving_multiple(user_factory, context_factory):
user1 = user_factory(name='u1', rank=db.User.RANK_MODERATOR)
user2 = user_factory(name='u2', rank=db.User.RANK_MODERATOR)
db.session.add_all([user1, user2])
db.session.flush()
with patch('szurubooru.func.users.serialize_user'):
users.serialize_user.return_value = 'serialized user'
result = api.user_api.get_users(
@ -47,6 +48,7 @@ def test_retrieving_single(user_factory, context_factory):
user = user_factory(name='u1', rank=db.User.RANK_REGULAR)
auth_user = user_factory(rank=db.User.RANK_REGULAR)
db.session.add(user)
db.session.flush()
with patch('szurubooru.func.users.serialize_user'):
users.serialize_user.return_value = 'serialized user'
result = api.user_api.get_user(
@ -65,6 +67,7 @@ def test_trying_to_retrieve_single_without_privileges(
user_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_ANONYMOUS)
db.session.add(user_factory(name='u1', rank=db.User.RANK_REGULAR))
db.session.flush()
with pytest.raises(errors.AuthError):
api.user_api.get_user(
context_factory(user=auth_user), {'user_name': 'u1'})

View file

@ -71,6 +71,7 @@ def test_updating_user(context_factory, user_factory):
def test_omitting_optional_field(user_factory, context_factory, field):
user = user_factory(name='u1', rank=db.User.RANK_ADMINISTRATOR)
db.session.add(user)
db.session.flush()
params = {
'name': 'chewie',
'email': 'asd@asd.asd',
@ -97,6 +98,7 @@ def test_omitting_optional_field(user_factory, context_factory, field):
def test_trying_to_update_non_existing(user_factory, context_factory):
user = user_factory(name='u1', rank=db.User.RANK_ADMINISTRATOR)
db.session.add(user)
db.session.flush()
with pytest.raises(users.UserNotFoundError):
api.user_api.update_user(
context_factory(user=user), {'user_name': 'u2'})
@ -114,6 +116,7 @@ def test_trying_to_update_field_without_privileges(
user1 = user_factory(name='u1', rank=db.User.RANK_REGULAR)
user2 = user_factory(name='u2', rank=db.User.RANK_REGULAR)
db.session.add_all([user1, user2])
db.session.flush()
with pytest.raises(errors.AuthError):
api.user_api.update_user(
context_factory(

View file

@ -78,7 +78,8 @@ def query_logger():
@pytest.yield_fixture(scope='function', autouse=True)
def session(query_logger): # pylint: disable=unused-argument
db.sessionmaker = sqlalchemy.orm.sessionmaker(bind=_engine)
db.sessionmaker = sqlalchemy.orm.sessionmaker(
bind=_engine, autoflush=False)
db.session = sqlalchemy.orm.scoped_session(db.sessionmaker)
try:
yield db.session

View file

@ -196,6 +196,7 @@ def test_serialize_micro_post(post_factory, user_factory):
def test_get_post_count(post_factory):
previous_count = posts.get_post_count()
db.session.add_all([post_factory(), post_factory()])
db.session.flush()
new_count = posts.get_post_count()
assert previous_count == 0
assert new_count == 2
@ -299,8 +300,8 @@ def test_update_post_content(
})
post = post_factory(id=1)
db.session.add(post)
db.session.flush()
posts.update_post_content(post, read_asset(input_file))
db.session.flush()
assert post.mime_type == expected_mime_type
assert post.type == expected_type
assert post.checksum == 'crc'
@ -321,6 +322,7 @@ def test_update_post_content_to_existing_content(
db.session.add_all([post, another_post])
db.session.flush()
posts.update_post_content(post, read_asset('png.png'))
db.session.flush()
with pytest.raises(posts.PostAlreadyUploadedError):
posts.update_post_content(another_post, read_asset('png.png'))
@ -538,7 +540,9 @@ def test_feature_post(post_factory, user_factory):
post = post_factory()
user = user_factory()
previous_featured_post = posts.try_get_featured_post()
db.session.flush()
posts.feature_post(post, user)
db.session.flush()
new_featured_post = posts.try_get_featured_post()
assert previous_featured_post is None
assert new_featured_post == post
@ -547,6 +551,8 @@ def test_feature_post(post_factory, user_factory):
def test_delete(post_factory):
post = post_factory()
db.session.add(post)
db.session.flush()
assert posts.get_post_count() == 1
posts.delete(post)
db.session.flush()
assert posts.get_post_count() == 0

View file

@ -131,6 +131,7 @@ def test_create(tag_factory, user_factory):
with patch('szurubooru.func.snapshots.get_tag_snapshot'):
snapshots.get_tag_snapshot.return_value = 'mocked'
snapshots.create(tag, user_factory())
db.session.flush()
results = db.session.query(db.Snapshot).all()
assert len(results) == 1
assert results[0].operation == db.Snapshot.OPERATION_CREATED
@ -151,6 +152,7 @@ def test_modify_saves_non_empty_diffs(post_factory, user_factory):
post.notes = [db.PostNote(polygon=[(0, 0), (0, 1), (1, 1)], text='new')]
db.session.flush()
snapshots.modify(post, user)
db.session.flush()
results = db.session.query(db.Snapshot).all()
assert len(results) == 1
assert results[0].data == {
@ -178,6 +180,7 @@ def test_modify_doesnt_save_empty_diffs(tag_factory, user_factory):
db.session.add_all([tag, user])
db.session.commit()
snapshots.modify(tag, user)
db.session.flush()
assert db.session.query(db.Snapshot).count() == 0
@ -188,6 +191,7 @@ def test_delete(tag_factory, user_factory):
with patch('szurubooru.func.snapshots.get_tag_snapshot'):
snapshots.get_tag_snapshot.return_value = 'mocked'
snapshots.delete(tag, user_factory())
db.session.flush()
results = db.session.query(db.Snapshot).all()
assert len(results) == 1
assert results[0].operation == db.Snapshot.OPERATION_DELETED
@ -200,6 +204,7 @@ def test_merge(tag_factory, user_factory):
db.session.add_all([source_tag, target_tag])
db.session.flush()
snapshots.merge(source_tag, target_tag, user_factory())
db.session.flush()
result = db.session.query(db.Snapshot).one()
assert result.operation == db.Snapshot.OPERATION_MERGED
assert result.data == ['tag', 'target']

View file

@ -44,6 +44,7 @@ def test_create_category_when_first():
def test_create_category_when_subsequent(tag_category_factory):
db.session.add(tag_category_factory())
db.session.flush()
with patch('szurubooru.func.tag_categories.update_category_name'), \
patch('szurubooru.func.tag_categories.update_category_color'):
category = tag_categories.create_category('name', 'color')
@ -80,6 +81,7 @@ def test_update_category_name_reusing_other_name(
config_injector, tag_category_factory):
config_injector({'tag_category_name_regex': '.*'})
db.session.add(tag_category_factory(name='name'))
db.session.flush()
category = tag_category_factory()
with pytest.raises(tag_categories.TagCategoryAlreadyExistsError):
tag_categories.update_category_name(category, 'name')
@ -127,6 +129,7 @@ def test_update_category_color(attempt, tag_category_factory):
def test_try_get_category_by_name(tag_category_factory):
category = tag_category_factory(name='test')
db.session.add(category)
db.session.flush()
assert tag_categories.try_get_category_by_name('test') == category
assert tag_categories.try_get_category_by_name('TEST') == category
assert tag_categories.try_get_category_by_name('-') is None
@ -135,6 +138,7 @@ def test_try_get_category_by_name(tag_category_factory):
def test_get_category_by_name(tag_category_factory):
category = tag_category_factory(name='test')
db.session.add(category)
db.session.flush()
assert tag_categories.get_category_by_name('test') == category
assert tag_categories.get_category_by_name('TEST') == category
with pytest.raises(tag_categories.TagCategoryNotFoundError):
@ -145,6 +149,7 @@ def test_get_all_category_names(tag_category_factory):
category1 = tag_category_factory(name='cat1')
category2 = tag_category_factory(name='cat2')
db.session.add_all([category1, category2])
db.session.flush()
assert tag_categories.get_all_category_names() == ['cat1', 'cat2']
@ -152,6 +157,7 @@ def test_get_all_categories(tag_category_factory):
category1 = tag_category_factory(name='cat1')
category2 = tag_category_factory(name='cat2')
db.session.add_all([category1, category2])
db.session.flush()
assert tag_categories.get_all_categories() == [category1, category2]
@ -159,6 +165,7 @@ def test_try_get_default_category_when_no_default(tag_category_factory):
category1 = tag_category_factory(default=False)
category2 = tag_category_factory(default=False)
db.session.add_all([category1, category2])
db.session.flush()
actual_default_category = tag_categories.try_get_default_category()
assert actual_default_category == category1
assert actual_default_category != category2
@ -168,6 +175,7 @@ def test_try_get_default_category_when_default(tag_category_factory):
category1 = tag_category_factory(default=False)
category2 = tag_category_factory(default=True)
db.session.add_all([category1, category2])
db.session.flush()
actual_default_category = tag_categories.try_get_default_category()
assert actual_default_category == category2
assert actual_default_category != category1
@ -177,6 +185,7 @@ def test_try_get_default_category_from_cache(tag_category_factory):
category1 = tag_category_factory()
category2 = tag_category_factory()
db.session.add_all([category1, category2])
db.session.flush()
tag_categories.try_get_default_category()
db.session.query(db.TagCategory).delete()
assert tag_categories.try_get_default_category() == category1
@ -197,6 +206,7 @@ def test_set_default_category_with_previous_default(tag_category_factory):
category1 = tag_category_factory(default=True)
category2 = tag_category_factory()
db.session.add_all([category1, category2])
db.session.flush()
tag_categories.set_default_category(category2)
assert not category1.default
assert category2.default
@ -206,6 +216,7 @@ def test_set_default_category_without_previous_default(tag_category_factory):
category1 = tag_category_factory()
category2 = tag_category_factory()
db.session.add_all([category1, category2])
db.session.flush()
tag_categories.set_default_category(category2)
assert category2.default
@ -213,6 +224,7 @@ def test_set_default_category_without_previous_default(tag_category_factory):
def test_delete_category_with_no_other_categories(tag_category_factory):
category = tag_category_factory()
db.session.add(category)
db.session.flush()
with pytest.raises(tag_categories.TagCategoryIsInUseError):
tag_categories.delete_category(category)
@ -221,6 +233,7 @@ def test_delete_category_with_usages(tag_category_factory, tag_factory):
db.session.add(tag_category_factory())
category = tag_category_factory()
db.session.add(tag_factory(category=category))
db.session.flush()
with pytest.raises(tag_categories.TagCategoryIsInUseError):
tag_categories.delete_category(category)
@ -229,6 +242,7 @@ def test_delete_category(tag_category_factory):
db.session.add(tag_category_factory())
category = tag_category_factory(name='target')
db.session.add(category)
db.session.commit()
db.session.flush()
tag_categories.delete_category(category)
db.session.flush()
assert tag_categories.try_get_category_by_name('target') is None

View file

@ -35,6 +35,7 @@ def test_sort_tags(
category=tag_category_factory(
name=category_name, default=category_is_default)))
db.session.add_all(db_tags)
db.session.flush()
actual_tag_names = [tag.names[0].name for tag in tags.sort_tags(db_tags)]
assert actual_tag_names == expected_tag_names
@ -143,6 +144,7 @@ def test_export_to_json(
def test_try_get_tag_by_name(name_to_search, expected_to_find, tag_factory):
tag = tag_factory(names=['name', 'ALIAS'])
db.session.add(tag)
db.session.flush()
if expected_to_find:
assert tags.try_get_tag_by_name(name_to_search) == tag
else:
@ -159,6 +161,7 @@ def test_try_get_tag_by_name(name_to_search, expected_to_find, tag_factory):
def test_get_tag_by_name(name_to_search, expected_to_find, tag_factory):
tag = tag_factory(names=['name', 'ALIAS'])
db.session.add(tag)
db.session.flush()
if expected_to_find:
assert tags.get_tag_by_name(name_to_search) == tag
else:
@ -239,6 +242,7 @@ def test_get_or_create_tags_by_names(
tag_factory(names=['name2', 'ALIAS2'], category=category),
]
db.session.add_all(input_tags)
db.session.flush()
result = tags.get_or_create_tags_by_names(names)
expected_ids = [input_tags[i].tag_id for i in expected_indexes]
actual_ids = [tag.tag_id for tag in result[0]]
@ -302,6 +306,7 @@ def test_delete(tag_factory):
db.session.flush()
assert db.session.query(db.Tag).count() == 3
tags.delete(tag)
db.session.flush()
assert db.session.query(db.Tag).count() == 2
@ -309,9 +314,9 @@ def test_merge_tags_without_usages(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
db.session.add_all([source_tag, target_tag])
db.session.commit()
db.session.flush()
tags.merge_tags(source_tag, target_tag)
db.session.commit()
db.session.flush()
assert tags.try_get_tag_by_name('source') is None
tag = tags.get_tag_by_name('target')
assert tag is not None
@ -335,7 +340,7 @@ def test_merge_tags_with_usages(tag_factory, post_factory):
def test_merge_tags_with_itself(tag_factory):
source_tag = tag_factory(names=['source'])
db.session.add(source_tag)
db.session.commit()
db.session.flush()
with pytest.raises(tags.InvalidTagRelationError):
tags.merge_tags(source_tag, source_tag)
@ -348,9 +353,9 @@ def test_merge_tags_with_its_child_relation(tag_factory, post_factory):
post = post_factory()
post.tags = [source_tag, target_tag]
db.session.add_all([source_tag, post])
db.session.commit()
db.session.flush()
tags.merge_tags(source_tag, target_tag)
db.session.commit()
db.session.flush()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').post_count == 1
@ -363,9 +368,9 @@ def test_merge_tags_with_its_parent_relation(tag_factory, post_factory):
post = post_factory()
post.tags = [source_tag, target_tag]
db.session.add_all([source_tag, target_tag, post])
db.session.commit()
db.session.flush()
tags.merge_tags(source_tag, target_tag)
db.session.commit()
db.session.flush()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').post_count == 1
@ -377,7 +382,7 @@ def test_merge_tags_clears_relations(tag_factory):
referring_tag.suggestions = [source_tag]
referring_tag.implications = [source_tag]
db.session.add_all([source_tag, target_tag, referring_tag])
db.session.commit()
db.session.flush()
assert tags.try_get_tag_by_name('parent').implications != []
assert tags.try_get_tag_by_name('parent').suggestions != []
tags.merge_tags(source_tag, target_tag)
@ -393,11 +398,11 @@ def test_merge_tags_when_target_exists(tag_factory, post_factory):
post = post_factory()
post.tags = [source_tag, target_tag]
db.session.add_all([source_tag, target_tag, post])
db.session.commit()
db.session.flush()
assert source_tag.post_count == 1
assert target_tag.post_count == 1
tags.merge_tags(source_tag, target_tag)
db.session.commit()
db.session.flush()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').post_count == 1
@ -460,6 +465,7 @@ def test_update_tag_names_trying_to_use_taken_name(
db.session.add(existing_tag)
tag = tag_factory()
db.session.add(tag)
db.session.flush()
with pytest.raises(tags.TagAlreadyExistsError):
tags.update_tag_names(tag, ['a'])
with pytest.raises(tags.TagAlreadyExistsError):

View file

@ -172,6 +172,7 @@ def test_get_user_count(count, user_factory):
def test_try_get_user_by_name(user_factory):
user = user_factory(name='name', email='email')
db.session.add(user)
db.session.flush()
assert users.try_get_user_by_name('non-existing') is None
assert users.try_get_user_by_name('email') is None
assert users.try_get_user_by_name('name') is user
@ -181,6 +182,7 @@ def test_try_get_user_by_name(user_factory):
def test_get_user_by_name(user_factory):
user = user_factory(name='name', email='email')
db.session.add(user)
db.session.flush()
with pytest.raises(users.UserNotFoundError):
assert users.get_user_by_name('non-existing')
with pytest.raises(users.UserNotFoundError):
@ -192,6 +194,7 @@ def test_get_user_by_name(user_factory):
def test_try_get_user_by_name_or_email(user_factory):
user = user_factory(name='name', email='email')
db.session.add(user)
db.session.flush()
assert users.try_get_user_by_name_or_email('non-existing') is None
assert users.try_get_user_by_name_or_email('email') is user
assert users.try_get_user_by_name_or_email('EMAIL') is user
@ -202,6 +205,7 @@ def test_try_get_user_by_name_or_email(user_factory):
def test_get_user_by_name_or_email(user_factory):
user = user_factory(name='name', email='email')
db.session.add(user)
db.session.flush()
with pytest.raises(users.UserNotFoundError):
assert users.get_user_by_name_or_email('non-existing')
assert users.get_user_by_name_or_email('email') is user
@ -372,6 +376,7 @@ def test_update_user_rank_with_invalid_string(user_factory):
def test_update_user_rank_with_higher_rank_than_possible(user_factory):
db.session.add(user_factory())
db.session.flush()
user = user_factory()
auth_user = user_factory()
auth_user.rank = db.User.RANK_ANONYMOUS
@ -383,6 +388,7 @@ def test_update_user_rank_with_higher_rank_than_possible(user_factory):
def test_update_user_rank(user_factory):
db.session.add(user_factory())
db.session.flush()
user = user_factory()
auth_user = user_factory()
auth_user.rank = db.User.RANK_ADMINISTRATOR

View file

@ -33,6 +33,7 @@ def test_filter_by_creation_time(
comment2.creation_time = datetime(2014, 6, 1)
comment3.creation_time = datetime(2015, 1, 1)
db.session.add_all([comment1, comment2, comment3])
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -47,6 +48,7 @@ def test_filter_by_text(
comment1 = comment_factory(text='t1')
comment2 = comment_factory(text='t2')
db.session.add_all([comment1, comment2])
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -63,6 +65,7 @@ def test_filter_by_user(
expected_comment_text):
db.session.add(comment_factory(text='t2', user=user_factory(name='u2')))
db.session.add(comment_factory(text='t1', user=user_factory(name='u1')))
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -79,6 +82,7 @@ def test_filter_by_post(
expected_comment_text):
db.session.add(comment_factory(text='t1', post=post_factory(id=1)))
db.session.add(comment_factory(text='t2', post=post_factory(id=2)))
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -92,6 +96,7 @@ def test_anonymous(
verify_unpaged, comment_factory, input, expected_comment_text):
db.session.add(comment_factory(text='t1'))
db.session.add(comment_factory(text='t2'))
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -106,6 +111,7 @@ def test_sort_by_user(
expected_comment_text):
db.session.add(comment_factory(text='t2', user=user_factory(name='u2')))
db.session.add(comment_factory(text='t1', user=user_factory(name='u1')))
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -120,6 +126,7 @@ def test_sort_by_post(
expected_comment_text):
db.session.add(comment_factory(text='t1', post=post_factory(id=1)))
db.session.add(comment_factory(text='t2', post=post_factory(id=2)))
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -137,6 +144,7 @@ def test_sort_by_creation_time(
comment2.creation_time = datetime(1991, 1, 2)
comment3.creation_time = datetime(1991, 1, 3)
db.session.add_all([comment3, comment1, comment2])
db.session.flush()
verify_unpaged(input, expected_comment_text)
@ -155,4 +163,5 @@ def test_sort_by_last_edit_time(
comment2.last_edit_time = datetime(1991, 1, 2)
comment3.last_edit_time = datetime(1991, 1, 3)
db.session.add_all([comment3, comment1, comment2])
db.session.flush()
verify_unpaged(input, expected_comment_text)

View file

@ -85,6 +85,7 @@ def test_filter_by_id(verify_unpaged, post_factory, input, expected_post_ids):
post2 = post_factory(id=2)
post3 = post_factory(id=3)
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -106,6 +107,7 @@ def test_filter_by_tag(
post3.tags = [tag_factory(names=['t3'])]
post4.tags = [tag_factory(names=['t4a', 't4b'])]
db.session.add_all([post1, post2, post3, post4])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -127,6 +129,7 @@ def test_filter_by_score(
post=post,
user=user_factory()))
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -154,6 +157,7 @@ def test_filter_by_uploader(
post2.user = user_factory(name='u2')
post3.user = user_factory(name='u3')
db.session.add_all([post1, post2, post3, post4])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -178,6 +182,7 @@ def test_filter_by_commenter(
comment_factory(post=post3, user=user_factory(name='u3')),
post1, post2, post3,
])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -201,6 +206,7 @@ def test_filter_by_favorite(
fav_factory(post=post2, user=user_factory(name='u2')),
fav_factory(post=post3, user=user_factory(name='u3')),
post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -218,6 +224,7 @@ def test_filter_by_tag_count(
post2.tags = [tag_factory(), tag_factory()]
post3.tags = [tag_factory(), tag_factory(), tag_factory()]
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -243,6 +250,7 @@ def test_filter_by_comment_count(
comment_factory(post=post3),
comment_factory(post=post3),
post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -264,6 +272,7 @@ def test_filter_by_favorite_count(
fav_factory(post=post3),
fav_factory(post=post3),
post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -281,6 +290,7 @@ def test_filter_by_note_count(
post2.notes = [note_factory(), note_factory()]
post3.notes = [note_factory(), note_factory(), note_factory()]
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -302,6 +312,7 @@ def test_filter_by_feature_count(
post2.features = [feature_factory(), feature_factory()]
post3.features = [feature_factory(), feature_factory(), feature_factory()]
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -326,6 +337,7 @@ def test_filter_by_type(
post3.type = db.Post.TYPE_VIDEO
post4.type = db.Post.TYPE_FLASH
db.session.add_all([post1, post2, post3, post4])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -344,6 +356,7 @@ def test_filter_by_safety(
post2.safety = db.Post.SAFETY_SKETCHY
post3.safety = db.Post.SAFETY_UNSAFE
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -366,6 +379,7 @@ def test_filter_by_file_size(
post2.file_size = 101
post3.file_size = 102
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -392,6 +406,7 @@ def test_filter_by_image_size(
post2.canvas_height = 201
post3.canvas_height = 202
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -418,6 +433,7 @@ def test_filter_by_creation_time(
post2.creation_time = datetime(2015, 1, 1)
post3.creation_time = datetime(2016, 1, 1)
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -444,6 +460,7 @@ def test_filter_by_last_edit_time(
post2.last_edit_time = datetime(2015, 1, 1)
post3.last_edit_time = datetime(2016, 1, 1)
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -471,6 +488,7 @@ def test_filter_by_comment_date(
comment2.creation_time = datetime(2015, 1, 1)
comment3.creation_time = datetime(2016, 1, 1)
db.session.add_all([post1, post2, post3, comment1, comment2, comment3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -494,6 +512,7 @@ def test_filter_by_fav_date(
fav2.time = datetime(2015, 1, 1)
fav3.time = datetime(2016, 1, 1)
db.session.add_all([post1, post2, post3, fav1, fav2, fav3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -521,6 +540,7 @@ def test_filter_by_feature_date(
feature2.time = datetime(2015, 1, 1)
feature3.time = datetime(2016, 1, 1)
db.session.add_all([post1, post2, post3, feature1, feature2, feature3])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -560,6 +580,7 @@ def test_sort_tokens(verify_unpaged, post_factory, input):
post2 = post_factory(id=2)
post3 = post_factory(id=3)
db.session.add_all([post1, post2, post3])
db.session.flush()
verify_unpaged(input, [1, 2, 3])
@ -582,6 +603,7 @@ def test_anonymous(
post3.tags = [tag_factory(names=['t3'])]
post4.tags = [tag_factory(names=['t4a', 't4b'])]
db.session.add_all([post1, post2, post3, post4])
db.session.flush()
verify_unpaged(input, expected_post_ids)
@ -601,6 +623,7 @@ def test_own_liked(
score_factory(post=post3, user=auth_user, score=-1),
post1, post2, post3,
])
db.session.flush()
verify_unpaged('special:liked', [1])
verify_unpaged('-special:liked', [2, 3])
@ -621,6 +644,7 @@ def test_own_disliked(
score_factory(post=post3, user=auth_user, score=1),
post1, post2, post3,
])
db.session.flush()
verify_unpaged('special:disliked', [1])
verify_unpaged('-special:disliked', [2, 3])
@ -648,6 +672,7 @@ def test_own_fav(
fav_factory(post=post2, user=user_factory(name='unrelated')),
post1, post2,
])
db.session.flush()
verify_unpaged('special:fav', [1])
verify_unpaged('-special:fav', [2])
@ -668,5 +693,6 @@ def test_tumbleweed(
fav_factory(post=post3),
post1, post2, post3, post4,
])
db.session.flush()
verify_unpaged('special:tumbleweed', [4])
verify_unpaged('-special:tumbleweed', [1, 2, 3])

View file

@ -31,11 +31,13 @@ def test_filter_anonymous(
verify_unpaged, tag_factory, input, expected_tag_names):
db.session.add(tag_factory(names=['t1']))
db.session.add(tag_factory(names=['t2']))
db.session.flush()
verify_unpaged(input, expected_tag_names)
def test_filter_anonymous_starting_with_colon(verify_unpaged, tag_factory):
db.session.add(tag_factory(names=[':t']))
db.session.flush()
verify_unpaged(':t', [':t'])
@ -66,6 +68,7 @@ def test_filter_by_name(
db.session.add(tag_factory(names=['tag2']))
db.session.add(tag_factory(names=['tag3']))
db.session.add(tag_factory(names=['tag4', 'tag5', 'tag6']))
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -86,6 +89,7 @@ def test_filter_by_category(
tag2 = tag_factory(names=['t2'], category=cat1)
tag3 = tag_factory(names=['t3'], category=cat2)
db.session.add_all([tag1, tag2, tag3])
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -120,6 +124,7 @@ def test_filter_by_creation_time(
tag2.creation_time = datetime(2014, 6, 1)
tag3.creation_time = datetime(2015, 1, 1)
db.session.add_all([tag1, tag2, tag3])
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -138,6 +143,7 @@ def test_filter_by_edit_time(
tag2.last_edit_time = datetime(2015, 1, 1)
tag3.last_edit_time = datetime(2014, 1, 1)
db.session.add_all([tag1, tag2, tag3])
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -160,10 +166,10 @@ def test_filter_by_post_count(
tag1 = tag_factory(names=['t1'])
tag2 = tag_factory(names=['t2'])
db.session.add_all([post1, post2, tag1, tag2])
db.session.commit()
post1.tags.append(tag1)
post1.tags.append(tag2)
post2.tags.append(tag1)
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -193,10 +199,10 @@ def test_filter_by_suggestion_count(
tag1 = tag_factory(names=['t1'])
tag2 = tag_factory(names=['t2'])
db.session.add_all([sug1, sug3, tag2, sug2, tag1])
db.session.commit()
tag1.suggestions.append(sug1)
tag1.suggestions.append(sug2)
tag2.suggestions.append(sug3)
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -213,10 +219,10 @@ def test_filter_by_implication_count(
tag1 = tag_factory(names=['t1'])
tag2 = tag_factory(names=['t2'])
db.session.add_all([sug1, sug3, tag2, sug2, tag1])
db.session.commit()
tag1.implications.append(sug1)
tag1.implications.append(sug2)
tag2.implications.append(sug3)
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -232,6 +238,7 @@ def test_filter_by_implication_count(
def test_sort_by_name(verify_unpaged, tag_factory, input, expected_tag_names):
db.session.add(tag_factory(names=['t2']))
db.session.add(tag_factory(names=['t1']))
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -249,6 +256,7 @@ def test_sort_by_creation_time(
tag2.creation_time = datetime(1991, 1, 2)
tag3.creation_time = datetime(1991, 1, 3)
db.session.add_all([tag3, tag1, tag2])
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -268,6 +276,7 @@ def test_sort_by_last_edit_time(
tag2.last_edit_time = datetime(1991, 1, 2)
tag3.last_edit_time = datetime(1991, 1, 3)
db.session.add_all([tag3, tag1, tag2])
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -283,10 +292,10 @@ def test_sort_by_post_count(
tag1 = tag_factory(names=['t1'])
tag2 = tag_factory(names=['t2'])
db.session.add_all([post1, post2, tag1, tag2])
db.session.commit()
post1.tags.append(tag1)
post1.tags.append(tag2)
post2.tags.append(tag2)
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -301,10 +310,10 @@ def test_sort_by_suggestion_count(
tag1 = tag_factory(names=['t1'])
tag2 = tag_factory(names=['t2'])
db.session.add_all([sug1, sug3, tag2, sug2, tag1])
db.session.commit()
tag1.suggestions.append(sug1)
tag1.suggestions.append(sug2)
tag2.suggestions.append(sug3)
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -319,10 +328,10 @@ def test_sort_by_implication_count(
tag1 = tag_factory(names=['t1'])
tag2 = tag_factory(names=['t2'])
db.session.add_all([sug1, sug3, tag2, sug2, tag1])
db.session.commit()
tag1.implications.append(sug1)
tag1.implications.append(sug2)
tag2.implications.append(sug3)
db.session.flush()
verify_unpaged(input, expected_tag_names)
@ -341,4 +350,5 @@ def test_sort_by_category(
tag2 = tag_factory(names=['t2'], category=cat2)
tag3 = tag_factory(names=['t3'], category=cat1)
db.session.add_all([tag1, tag2, tag3])
db.session.flush()
verify_unpaged(input, expected_tag_names)

View file

@ -53,6 +53,7 @@ def test_filter_by_creation_time(
user2.creation_time = datetime(2014, 6, 1)
user3.creation_time = datetime(2015, 1, 1)
db.session.add_all([user1, user2, user3])
db.session.flush()
verify_unpaged(input, expected_user_names)
@ -79,6 +80,7 @@ def test_filter_by_name(
db.session.add(user_factory(name='user1'))
db.session.add(user_factory(name='user2'))
db.session.add(user_factory(name='user3'))
db.session.flush()
verify_unpaged(input, expected_user_names)
@ -92,6 +94,7 @@ def test_anonymous(
verify_unpaged, input, expected_user_names, user_factory):
db.session.add(user_factory(name='u1'))
db.session.add(user_factory(name='u2'))
db.session.flush()
verify_unpaged(input, expected_user_names)
@ -109,6 +112,7 @@ def test_combining_tokens(
user2.creation_time = datetime(2014, 6, 1)
user3.creation_time = datetime(2015, 1, 1)
db.session.add_all([user1, user2, user3])
db.session.flush()
verify_unpaged(input, expected_user_names)
@ -125,6 +129,7 @@ def test_paging(
expected_total_count, expected_user_names):
db.session.add(user_factory(name='u1'))
db.session.add(user_factory(name='u2'))
db.session.flush()
actual_count, actual_users = executor.execute(
'', page=page, page_size=page_size)
actual_user_names = [u.name for u in actual_users]
@ -145,6 +150,7 @@ def test_sort_by_name(
verify_unpaged, input, expected_user_names, user_factory):
db.session.add(user_factory(name='u2'))
db.session.add(user_factory(name='u1'))
db.session.flush()
verify_unpaged(input, expected_user_names)
@ -167,6 +173,7 @@ def test_sort_by_creation_time(
user2.creation_time = datetime(1991, 1, 2)
user3.creation_time = datetime(1991, 1, 3)
db.session.add_all([user3, user1, user2])
db.session.flush()
verify_unpaged(input, expected_user_names)
@ -186,6 +193,7 @@ def test_sort_by_last_login_time(
user2.last_login_time = datetime(1991, 1, 2)
user3.last_login_time = datetime(1991, 1, 3)
db.session.add_all([user3, user1, user2])
db.session.flush()
verify_unpaged(input, expected_user_names)
@ -194,6 +202,7 @@ def test_random_sort(executor, user_factory):
user2 = user_factory(name='u2')
user3 = user_factory(name='u3')
db.session.add_all([user3, user1, user2])
db.session.flush()
actual_count, actual_users = executor.execute(
'sort:random', page=1, page_size=100)
actual_user_names = [u.name for u in actual_users]