Added auto conversion options to the config for gif to mp4 and webm

* webm conversion is slow, but better quality than mp4 conversion and with a typically smaller file size
* tags are copied over from the original upload
* Snapshots are generated for the new auto posts
This commit is contained in:
ReAnzu 2018-02-24 01:06:11 -06:00
parent a1fbeb91a0
commit ac8d683581
6 changed files with 153 additions and 25 deletions

View file

@ -27,6 +27,12 @@ thumbnails:
post_height: 300 post_height: 300
convert:
gif:
to_webm: false
to_mp4: false
# used to send password reset e-mails # used to send password reset e-mails
smtp: smtp:
host: # example: localhost host: # example: localhost

View file

@ -1,4 +1,4 @@
from typing import Optional, Dict from typing import Optional, Dict, List
from datetime import datetime from datetime import datetime
from szurubooru import db, model, errors, rest, search from szurubooru import db, model, errors, rest, search
from szurubooru.func import ( from szurubooru.func import (
@ -69,13 +69,20 @@ def create_post(
posts.update_post_thumbnail(post, ctx.get_file('thumbnail')) posts.update_post_thumbnail(post, ctx.get_file('thumbnail'))
ctx.session.add(post) ctx.session.add(post)
ctx.session.flush() ctx.session.flush()
snapshots.create(post, None if anonymous else ctx.user) create_snapshots_for_post(post, new_tags, None if anonymous else ctx.user)
for tag in new_tags: alternate_format_posts = posts.generate_alternate_formats(post, content)
snapshots.create(tag, None if anonymous else ctx.user) for alternate_post, alternate_post_new_tags in alternate_format_posts:
create_snapshots_for_post(alternate_post, alternate_post_new_tags, None if anonymous else ctx.user)
ctx.session.commit() ctx.session.commit()
return _serialize_post(ctx, post) return _serialize_post(ctx, post)
def create_snapshots_for_post(post: model.Post, new_tags: List[model.Tag], user: Optional[model.User]):
snapshots.create(post, user)
for tag in new_tags:
snapshots.create(tag, user)
@rest.routes.get('/post/(?P<post_id>[^/]+)/?') @rest.routes.get('/post/(?P<post_id>[^/]+)/?')
def get_post(ctx: rest.Context, params: Dict[str, str]) -> rest.Response: def get_post(ctx: rest.Context, params: Dict[str, str]) -> rest.Response:
auth.verify_privilege(ctx.user, 'posts:view') auth.verify_privilege(ctx.user, 'posts:view')

View file

@ -79,6 +79,72 @@ class Image:
'-', '-',
]) ])
def to_webm(self) -> bytes:
with util.create_temp_file_path(suffix='.log') as phase_log_path:
# Pass 1
self._execute([
'-i', '{path}',
'-pass', '1',
'-passlogfile', phase_log_path,
'-vcodec', 'libvpx-vp9',
'-crf', '4',
'-b:v', '2500K',
'-acodec', 'libvorbis',
'-f', 'webm',
'-y', '/dev/null'
])
# Pass 2
return self._execute([
'-i', '{path}',
'-pass', '2',
'-passlogfile', phase_log_path,
'-vcodec', 'libvpx-vp9',
'-crf', '4',
'-b:v', '2500K',
'-acodec', 'libvorbis',
'-f', 'webm',
'-'
])
def to_mp4(self) -> bytes:
with util.create_temp_file_path(suffix='.dat') as mp4_temp_path:
width = self.width
height = self.height
altered_dimensions = False
if self.width % 2 != 0:
width = self.width - 1
altered_dimensions = True
if self.height % 2 != 0:
height = self.height - 1
altered_dimensions = True
args = [
'-i', '{path}',
'-vcodec', 'libx264',
'-preset', 'slow',
'-crf', '22',
'-b:v', '200K',
'-profile:v', 'main',
'-pix_fmt', 'yuv420p',
'-acodec', 'aac',
'-f', 'mp4'
]
if altered_dimensions:
args = args + [
'-filter:v', 'scale=\'%d:%d\'' % (width, height)
]
self._execute(args + ['-y', mp4_temp_path])
with open(mp4_temp_path, 'rb') as mp4_temp:
return mp4_temp.read()
def _execute(self, cli: List[str], program: str = 'ffmpeg') -> bytes: def _execute(self, cli: List[str], program: str = 'ffmpeg') -> bytes:
extension = mime.get_extension(mime.get_mime_type(self.content)) extension = mime.get_extension(mime.get_mime_type(self.content))
assert extension assert extension

View file

@ -1,58 +1,66 @@
import re import re
from typing import Optional from typing import Optional
APPLICATION_SWF = 'application/x-shockwave-flash'
IMAGE_JPEG = 'image/jpeg'
IMAGE_PNG = 'image/png'
IMAGE_GIF = 'image/gif'
VIDEO_WEBM = 'video/webm'
VIDEO_MP4 = 'video/mp4'
APPLICATION_OCTET_STREAM = 'application/octet-stream'
def get_mime_type(content: bytes) -> str: def get_mime_type(content: bytes) -> str:
if not content: if not content:
return 'application/octet-stream' return APPLICATION_OCTET_STREAM
if content[0:3] in (b'CWS', b'FWS', b'ZWS'): if content[0:3] in (b'CWS', b'FWS', b'ZWS'):
return 'application/x-shockwave-flash' return APPLICATION_SWF
if content[0:3] == b'\xFF\xD8\xFF': if content[0:3] == b'\xFF\xD8\xFF':
return 'image/jpeg' return IMAGE_JPEG
if content[0:6] == b'\x89PNG\x0D\x0A': if content[0:6] == b'\x89PNG\x0D\x0A':
return 'image/png' return IMAGE_PNG
if content[0:6] in (b'GIF87a', b'GIF89a'): if content[0:6] in (b'GIF87a', b'GIF89a'):
return 'image/gif' return IMAGE_GIF
if content[0:4] == b'\x1A\x45\xDF\xA3': if content[0:4] == b'\x1A\x45\xDF\xA3':
return 'video/webm' return VIDEO_WEBM
if content[4:12] in (b'ftypisom', b'ftypmp42'): if content[4:12] in (b'ftypisom', b'ftypmp42'):
return 'video/mp4' return VIDEO_MP4
return 'application/octet-stream' return APPLICATION_OCTET_STREAM
def get_extension(mime_type: str) -> Optional[str]: def get_extension(mime_type: str) -> Optional[str]:
extension_map = { extension_map = {
'application/x-shockwave-flash': 'swf', APPLICATION_SWF: 'swf',
'image/gif': 'gif', IMAGE_GIF: 'gif',
'image/jpeg': 'jpg', IMAGE_JPEG: 'jpg',
'image/png': 'png', IMAGE_PNG: 'png',
'video/mp4': 'mp4', VIDEO_MP4: 'mp4',
'video/webm': 'webm', VIDEO_WEBM: 'webm',
'application/octet-stream': 'dat', APPLICATION_OCTET_STREAM: 'dat',
} }
return extension_map.get((mime_type or '').strip().lower(), None) return extension_map.get((mime_type or '').strip().lower(), None)
def is_flash(mime_type: str) -> bool: def is_flash(mime_type: str) -> bool:
return mime_type.lower() == 'application/x-shockwave-flash' return mime_type.lower() == APPLICATION_SWF
def is_video(mime_type: str) -> bool: def is_video(mime_type: str) -> bool:
return mime_type.lower() in ('application/ogg', 'video/mp4', 'video/webm') return mime_type.lower() in ('application/ogg', VIDEO_MP4, VIDEO_WEBM)
def is_image(mime_type: str) -> bool: def is_image(mime_type: str) -> bool:
return mime_type.lower() in ('image/jpeg', 'image/png', 'image/gif') return mime_type.lower() in (IMAGE_JPEG, IMAGE_PNG, IMAGE_GIF)
def is_animated_gif(content: bytes) -> bool: def is_animated_gif(content: bytes) -> bool:
pattern = b'\x21\xF9\x04[\x00-\xFF]{4}\x00[\x2C\x21]' pattern = b'\x21\xF9\x04[\x00-\xFF]{4}\x00[\x2C\x21]'
return get_mime_type(content) == 'image/gif' \ return get_mime_type(content) == IMAGE_GIF \
and len(re.findall(pattern, content)) > 1 and len(re.findall(pattern, content)) > 1

View file

@ -5,7 +5,7 @@ import sqlalchemy as sa
from szurubooru import config, db, model, errors, rest from szurubooru import config, db, model, errors, rest
from szurubooru.func import ( from szurubooru.func import (
users, scores, comments, tags, util, users, scores, comments, tags, util,
mime, images, files, image_hash, serialization) mime, images, files, image_hash, serialization, snapshots)
EMPTY_PIXEL = ( EMPTY_PIXEL = (
@ -364,7 +364,7 @@ def create_post(
update_post_content(post, content) update_post_content(post, content)
new_tags = update_post_tags(post, tag_names) new_tags = update_post_tags(post, tag_names)
return (post, new_tags) return post, new_tags
def update_post_safety(post: model.Post, safety: str) -> None: def update_post_safety(post: model.Post, safety: str) -> None:
@ -429,6 +429,37 @@ def _sync_post_content(post: model.Post) -> None:
generate_post_thumbnail(post) generate_post_thumbnail(post)
def generate_alternate_formats(post: model.Post, content: bytes) -> List[Tuple[model.Post, List[model.Tag]]]:
assert post
assert content
new_posts = []
if mime.is_animated_gif(content):
tag_names = [tag_name.name for tag_name in [tag.names for tag in post.tags]]
if config.config['convert']['gif']['to_mp4']:
mp4_post, new_tags = create_post(images.Image(content).to_mp4(), tag_names, post.user)
update_post_flags(mp4_post, ['loop'])
update_post_safety(mp4_post, post.safety)
update_post_source(mp4_post, post.source)
new_posts += [(mp4_post, new_tags)]
if config.config['convert']['gif']['to_webm']:
webm_post, new_tags = create_post(images.Image(content).to_webm(), tag_names, post.user)
update_post_flags(webm_post, ['loop'])
update_post_safety(webm_post, post.safety)
update_post_source(webm_post, post.source)
new_posts += [(webm_post, new_tags)]
db.session.flush()
new_posts = list(filter(lambda i: i[0] is not None, new_posts))
new_relations = [p[0].post_id for p in new_posts]
update_post_relations(post, new_relations) if len(new_relations) > 0 else None
return new_posts
def update_post_content(post: model.Post, content: Optional[bytes]) -> None: def update_post_content(post: model.Post, content: Optional[bytes]) -> None:
assert post assert post
if not content: if not content:

View file

@ -41,6 +41,16 @@ def create_temp_file(**kwargs: Any) -> Generator:
os.remove(path) os.remove(path)
@contextmanager
def create_temp_file_path(**kwargs: Any) -> Generator:
(descriptor, path) = tempfile.mkstemp(**kwargs)
os.close(descriptor)
try:
yield path
finally:
os.remove(path)
def unalias_dict(source: List[Tuple[List[str], T]]) -> Dict[str, T]: def unalias_dict(source: List[Tuple[List[str], T]]) -> Dict[str, T]:
output_dict = {} # type: Dict[str, T] output_dict = {} # type: Dict[str, T]
for aliases, value in source: for aliases, value in source: