03a513b8b6
Added the possibility to delete single posts or a range of multiple posts at once using the command as following:<br/> `docker-compose run server ./szuru-admin --delete posts 35 36 40-45`<br/> Each space represents a single post id, using a `-` between two ids will delete all posts within range, including the first and the last one
197 lines
6.1 KiB
Python
Executable file
197 lines
6.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Collection of CLI commands for an administrator to use
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from argparse import ArgumentParser
|
|
from getpass import getpass
|
|
from sys import stderr
|
|
|
|
from szurubooru import config, db, errors, model
|
|
from szurubooru.func import files, images, snapshots
|
|
from szurubooru.func import posts as postfuncs
|
|
from szurubooru.func import users as userfuncs
|
|
|
|
|
|
def reset_password(username: str) -> None:
|
|
user = userfuncs.get_user_by_name_or_email(username)
|
|
|
|
new_password = getpass("Enter new password for '%s': " % user.name)
|
|
check_password = getpass("Re-enter password: ")
|
|
|
|
if check_password != new_password:
|
|
raise errors.ValidationError("Passwords do not match")
|
|
|
|
userfuncs.update_user_password(user, new_password)
|
|
db.get_session().commit()
|
|
print("Sucessfully changed password for '%s'" % user.name)
|
|
|
|
|
|
def check_audio() -> None:
|
|
post_list = (
|
|
db.session.query(model.Post)
|
|
.filter(model.Post.type == model.Post.TYPE_VIDEO)
|
|
.order_by(model.Post.post_id)
|
|
.all()
|
|
)
|
|
|
|
for post in post_list:
|
|
print("Checking post %d ..." % post.post_id, end="\r", file=stderr)
|
|
content = files.get(postfuncs.get_post_content_path(post))
|
|
|
|
has_existing_flag = model.Post.FLAG_SOUND in post.flags
|
|
try:
|
|
has_sound_data = images.Image(content).check_for_sound()
|
|
except errors.ProcessingError:
|
|
print(
|
|
"Post %d caused an error when checking for sound"
|
|
% post.post_id
|
|
)
|
|
|
|
if has_sound_data and not has_existing_flag:
|
|
print("Post %d has sound data but is not flagged" % post.post_id)
|
|
if not has_sound_data and has_existing_flag:
|
|
print("Post %d has no sound data but is flagged" % post.post_id)
|
|
|
|
|
|
def reset_filenames() -> None:
|
|
regex = re.compile(r"(\d+)_[0-9a-f]{16}\.(\S+)")
|
|
|
|
def convert_to_new_filename(old_name: str) -> str:
|
|
matches = regex.match(old_name)
|
|
if not matches:
|
|
return None
|
|
post_id = int(matches.group(1))
|
|
post_ext = matches.group(2)
|
|
return "%d_%s.%s" % (
|
|
post_id,
|
|
postfuncs.get_post_security_hash(post_id),
|
|
post_ext,
|
|
)
|
|
|
|
def rename_in_dir(dir: str) -> None:
|
|
for old_path in os.listdir(config.config["data_dir"] + dir):
|
|
new_path = convert_to_new_filename(old_path)
|
|
if not new_path:
|
|
continue
|
|
if old_path != new_path:
|
|
print("%s -> %s" % (dir + old_path, dir + new_path))
|
|
os.rename(
|
|
config.config["data_dir"] + dir + old_path,
|
|
config.config["data_dir"] + dir + new_path,
|
|
)
|
|
|
|
rename_in_dir("posts/")
|
|
rename_in_dir("generated-thumbnails/")
|
|
rename_in_dir("posts/custom-thumbnails/")
|
|
|
|
|
|
def regenerate_thumbnails() -> None:
|
|
for post in db.session.query(model.Post).all():
|
|
print("Generating tumbnail for post %d ..." % post.post_id, end="\r")
|
|
try:
|
|
postfuncs.generate_post_thumbnail(post)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def delete_posts(parameters: list) -> None:
|
|
verification: str = input("Do you really want to delete all posts with the given ID's [y/n]: ").lower()
|
|
|
|
if "y" != verification:
|
|
return
|
|
|
|
def delete_one_post(post_id: int) -> None:
|
|
print("Deleting post %d" % post_id)
|
|
|
|
try:
|
|
post: model.Post = postfuncs.get_post_by_id(post_id)
|
|
except postfuncs.PostNotFoundError:
|
|
print("Post with ID %d not found" % post_id)
|
|
return
|
|
|
|
postfuncs.delete(post)
|
|
snapshots.delete(post, None)
|
|
|
|
def delete_multiple_posts(start_id: int, end_id: int) -> None:
|
|
for post_id in range(start_id, end_id + 1):
|
|
delete_one_post(post_id)
|
|
|
|
for parameter in parameters:
|
|
try:
|
|
if "-" not in parameter:
|
|
delete_one_post(int(parameter))
|
|
continue
|
|
|
|
post_range: list = [int(number) for number in parameter.split("-", 2)]
|
|
delete_multiple_posts(*post_range)
|
|
except ValueError:
|
|
print("One of the specified parameters is not a number")
|
|
return
|
|
|
|
db.get_session().commit()
|
|
|
|
print("All posts were deleted")
|
|
|
|
|
|
def main() -> None:
|
|
parser_top = ArgumentParser(
|
|
description="Collection of CLI commands for an administrator to use",
|
|
epilog="Look at README.md for more info",
|
|
)
|
|
parser = parser_top.add_mutually_exclusive_group(required=True)
|
|
parser.add_argument(
|
|
"--change-password",
|
|
metavar="<username>",
|
|
help="change the password of specified user",
|
|
)
|
|
parser.add_argument(
|
|
"--check-all-audio",
|
|
action="store_true",
|
|
help="check the audio flags of all posts, "
|
|
"noting discrepancies, without modifying posts",
|
|
)
|
|
parser.add_argument(
|
|
"--reset-filenames",
|
|
action="store_true",
|
|
help="reset and rename the content and thumbnail "
|
|
"filenames in case of a lost/changed secret key",
|
|
)
|
|
parser.add_argument(
|
|
"--regenerate-thumbnails",
|
|
action="store_true",
|
|
help="regenerate the thumbnails for posts if the "
|
|
"thumbnail files are missing",
|
|
)
|
|
parser.add_argument(
|
|
"--delete-posts",
|
|
metavar="<post_ids>",
|
|
nargs='+',
|
|
help="Delete all posts with the specified ID, separated by a space. "
|
|
"Multiple posts can be deleted at once by specifying them as follows, "
|
|
"including the upper and lower limits: 37-47"
|
|
)
|
|
command = parser_top.parse_args()
|
|
|
|
try:
|
|
if command.change_password:
|
|
reset_password(command.change_password)
|
|
elif command.check_all_audio:
|
|
check_audio()
|
|
elif command.reset_filenames:
|
|
reset_filenames()
|
|
elif command.regenerate_thumbnails:
|
|
regenerate_thumbnails()
|
|
elif command.delete_posts:
|
|
delete_posts(command.delete_posts)
|
|
except errors.BaseError as e:
|
|
print(e, file=stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|