server: metadata.py -> images.py [WIP, broken]
[server] `func/images.py`: - `Image` is now `Image` and `Video`, with their own post-type-specific operations; some of them from `func/metadata.py`. as a result, post type checking will need to be done outside of the classes - apply orientation to [fixed] thumbnail generation and image conversion - moved `_execute` to a global function; it now purely uses streams instead of tempfiles - `func/images.py::Video::to_mp4()` removed code that downscales resolution to an even number since ffmpeg does it automatically - use fstrings instead of .format and % in certain places - DEPRECATING FLASH FILES; there really is no place for them in a photo-oriented booru such as this. will slowly phase it out overtime. added an error message for it - changed EXIF orientation reference URL [server] `func/posts.py`: - `update_post_content()`: updated to use `func/images.py`'s new features
This commit is contained in:
parent
a4ea05a0e4
commit
aa03eaba44
3 changed files with 321 additions and 396 deletions
|
@ -3,10 +3,12 @@ import logging
|
|||
import math
|
||||
import re
|
||||
import shlex
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from subprocess import PIPE, Popen
|
||||
from typing import List
|
||||
|
||||
from exif import Image as EXIFImage
|
||||
from PIL import Image as PILImage
|
||||
|
||||
from szurubooru import errors
|
||||
|
@ -15,6 +17,19 @@ from szurubooru.func import mime, util
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Refer to: https://www.impulseadventure.com/photo/images/orient_flag.gif
|
||||
# and https://ffmpeg.org/ffmpeg-filters.html#transpose-1
|
||||
EXIF_ORIENTATION_TRANSPOSES = (
|
||||
"transpose=clock_flip,transpose=cclock", # Starts from orientation 2
|
||||
"transpose=clock,transpose=clock",
|
||||
"transpose=clock_flip,transpose=clock",
|
||||
"transpose=cclock_flip,transpose=clock,transpose=clock",
|
||||
"transpose=clock",
|
||||
"transpose=clock_flip,transpose=clock,transpose=clock",
|
||||
"transpose=cclock",
|
||||
)
|
||||
|
||||
|
||||
def convert_heif_to_png(content: bytes) -> bytes:
|
||||
img = PILImage.open(BytesIO(content))
|
||||
img_byte_arr = BytesIO()
|
||||
|
@ -22,105 +37,224 @@ def convert_heif_to_png(content: bytes) -> bytes:
|
|||
return img_byte_arr.getvalue()
|
||||
|
||||
|
||||
def _execute_ffmpeg(
|
||||
content: bytes,
|
||||
cli: List[str],
|
||||
program: str = "ffmpeg",
|
||||
ignore_error_if_data: bool = False,
|
||||
get_logs: bool = False,
|
||||
) -> bytes:
|
||||
mime_type = mime.get_mime_type(content)
|
||||
if mime.is_heif(mime_type):
|
||||
# FFmpeg does not support HEIF.
|
||||
# https://trac.ffmpeg.org/ticket/6521
|
||||
content = convert_heif_to_png(content)
|
||||
|
||||
cli = [program, "-loglevel", "32" if get_logs else "24"] + cli
|
||||
|
||||
proc = Popen(cli, stdout=PIPE, stdin=PIPE, stderr=PIPE)
|
||||
out, err = proc.communicate(input=content)
|
||||
|
||||
if proc.returncode != 0:
|
||||
args = " ".join(shlex.quote(arg) for arg in cli)
|
||||
logger.warning(
|
||||
f"Failed to execute {program} command (cli={args}, err={err})"
|
||||
)
|
||||
if (len(out) > 0 and not ignore_error_if_data) or len(out) == 0:
|
||||
raise errors.ProcessingError(
|
||||
"Error while processing media.\n" + err.decode("utf-8")
|
||||
)
|
||||
return err if get_logs else out
|
||||
|
||||
|
||||
class Image:
|
||||
def __init__(self, content: bytes) -> None:
|
||||
self.content = content
|
||||
|
||||
self.width = None
|
||||
self.height = None
|
||||
self.orientation = 1
|
||||
self.date_taken = None
|
||||
self.camera = None
|
||||
|
||||
self._reload_info()
|
||||
|
||||
@property
|
||||
def width(self) -> int:
|
||||
return self.info["streams"][0]["width"]
|
||||
|
||||
@property
|
||||
def height(self) -> int:
|
||||
return self.info["streams"][0]["height"]
|
||||
|
||||
@property
|
||||
def frames(self) -> int:
|
||||
return self.info["streams"][0]["nb_read_frames"]
|
||||
|
||||
def resize_fill(self, width: int, height: int) -> None:
|
||||
def to_thumbnail(self, width: int, height: int) -> bytes:
|
||||
width_greater = self.width > self.height
|
||||
width, height = (-1, height) if width_greater else (width, -1)
|
||||
|
||||
cli = ["-i", "-"]
|
||||
self._add_orientation_filters(cli, f"scale='{width}:{height}'")
|
||||
cli += ["-f", "image2", "-vframes", "1", "-vcodec", "png", "-"]
|
||||
|
||||
content = _execute_ffmpeg(self.content, cli, ignore_error_if_data=True)
|
||||
|
||||
if not content:
|
||||
raise errors.ProcessingError(
|
||||
"Error while creating thumbnail from image."
|
||||
)
|
||||
|
||||
return content
|
||||
|
||||
def to_png(self) -> bytes:
|
||||
cli = ["-i", "-"]
|
||||
self._add_orientation_filters(cli)
|
||||
cli += ["-f", "image2", "-vframes", "1", "-vcodec", "mjpeg", "-"]
|
||||
|
||||
return _execute_ffmpeg(self.content, cli, ignore_error_if_data=True)
|
||||
|
||||
def to_jpeg(self) -> bytes:
|
||||
cli = [
|
||||
"-f",
|
||||
"lavfi",
|
||||
"-i",
|
||||
"{path}",
|
||||
f"color=white:s={self.width}x{self.height}",
|
||||
"-i",
|
||||
"-",
|
||||
]
|
||||
self._add_orientation_filters(cli)
|
||||
cli += [
|
||||
"-f",
|
||||
"image2",
|
||||
"-filter:v",
|
||||
"scale='{width}:{height}'".format(width=width, height=height),
|
||||
"-map",
|
||||
"0:v:0",
|
||||
"-filter_complex",
|
||||
"overlay",
|
||||
"-vframes",
|
||||
"1",
|
||||
"-vcodec",
|
||||
"png",
|
||||
"mjpeg",
|
||||
"-",
|
||||
]
|
||||
if (
|
||||
"duration" in self.info["format"]
|
||||
and self.info["format"]["format_name"] != "swf"
|
||||
):
|
||||
duration = float(self.info["format"]["duration"])
|
||||
if duration > 3:
|
||||
cli = [
|
||||
"-ss",
|
||||
"%d" % math.floor(duration * 0.3),
|
||||
] + cli
|
||||
content = self._execute(cli, ignore_error_if_data=True)
|
||||
if not content:
|
||||
raise errors.ProcessingError("Error while resizing image.")
|
||||
|
||||
return _execute_ffmpeg(self.content, cli, ignore_error_if_data=True)
|
||||
|
||||
def check_for_sound(self) -> bool:
|
||||
return False
|
||||
|
||||
def _add_orientation_filters(
|
||||
self, cmd: List[str], extra_filters=""
|
||||
) -> None:
|
||||
if not extra_filters and self.orientation == 1:
|
||||
return
|
||||
|
||||
transpose = EXIF_ORIENTATION_TRANSPOSES[self.orientation - 2]
|
||||
if extra_filters:
|
||||
transpose += "," + extra_filters
|
||||
|
||||
cmd.append("-vf")
|
||||
cmd.append(transpose)
|
||||
|
||||
def _extract_from_exif(self) -> None:
|
||||
tags = EXIFImage(self.content)
|
||||
|
||||
if tags.has_exif and tags.list_all():
|
||||
self.orientation = tags["orientation"]
|
||||
|
||||
# 5, 6, 7, and 8 are orientation values where the image is rotated
|
||||
# 90 degrees CW or CCW.
|
||||
if self.orientation in (5, 6, 7, 8):
|
||||
self.width = tags["pixel_y_dimension"]
|
||||
self.height = tags["pixel_x_dimension"]
|
||||
else:
|
||||
self.width = tags["pixel_x_dimension"]
|
||||
self.height = tags["pixel_y_dimension"]
|
||||
|
||||
for option in ("datetime", "datetime_original"):
|
||||
if option in tags.list_all():
|
||||
self.date_taken = datetime.strptime(
|
||||
tags[option],
|
||||
"%Y:%m:%d %H:%M:%S",
|
||||
)
|
||||
|
||||
camera_string = []
|
||||
|
||||
for option in ("make", "model"):
|
||||
if option in tags.list_all():
|
||||
camera_string.append(tags[option])
|
||||
|
||||
if camera_string:
|
||||
self.camera_string = " ".join(camera_string)
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
def _extract_using_ffmpeg(self) -> None:
|
||||
cmd = ["-i", "-", "-print_format", "json", "-show_streams"]
|
||||
info = json.loads(
|
||||
_execute_ffmpeg(self.content, cmd, program="ffprobe").decode(
|
||||
"utf-8"
|
||||
)
|
||||
)
|
||||
|
||||
assert "streams" in info
|
||||
if len(info["streams"]) > 0:
|
||||
self.width = info["streams"][0]["width"]
|
||||
self.height = info["streams"][0]["height"]
|
||||
|
||||
def _reload_info(self) -> None:
|
||||
try:
|
||||
self._extract_from_exif()
|
||||
except Exception:
|
||||
self._extract_using_ffmpeg()
|
||||
|
||||
assert self.width > 0
|
||||
assert self.height > 0
|
||||
if (not self.width) or (not self.height):
|
||||
logger.warning("Error processing this image.")
|
||||
raise errors.ProcessingError("Error processing this image.")
|
||||
|
||||
|
||||
class Video:
|
||||
def __init__(self, content: bytes) -> None:
|
||||
self.content = content
|
||||
|
||||
self.width = None
|
||||
self.height = None
|
||||
self.date_taken = None
|
||||
self.camera = None
|
||||
self.frames = 0
|
||||
self.duration = 0
|
||||
|
||||
self._reload_info()
|
||||
|
||||
def to_png(self) -> bytes:
|
||||
return self._execute(
|
||||
[
|
||||
"-i",
|
||||
"{path}",
|
||||
"-f",
|
||||
"image2",
|
||||
"-map",
|
||||
"0:v:0",
|
||||
"-vframes",
|
||||
"1",
|
||||
"-vcodec",
|
||||
"png",
|
||||
"-",
|
||||
]
|
||||
)
|
||||
def to_thumbnail(self, width: int, height: int) -> bytes:
|
||||
width_greater = self.width > self.height
|
||||
width, height = (-1, height) if width_greater else (width, -1)
|
||||
|
||||
def to_jpeg(self) -> bytes:
|
||||
return self._execute(
|
||||
[
|
||||
"-f",
|
||||
"lavfi",
|
||||
"-i",
|
||||
"color=white:s=%dx%d" % (self.width, self.height),
|
||||
"-i",
|
||||
"{path}",
|
||||
"-f",
|
||||
"image2",
|
||||
"-filter_complex",
|
||||
"overlay",
|
||||
"-map",
|
||||
"0:v:0",
|
||||
"-vframes",
|
||||
"1",
|
||||
"-vcodec",
|
||||
"mjpeg",
|
||||
"-",
|
||||
]
|
||||
)
|
||||
cli = []
|
||||
|
||||
if float(self.duration) > 3.0:
|
||||
cli += ["-ss", math.floor(self.duration * 0.3)]
|
||||
|
||||
cli += [
|
||||
"-i",
|
||||
"-",
|
||||
"-f",
|
||||
"image2",
|
||||
"-vf",
|
||||
f"scale={width}:{height}",
|
||||
"-vframes",
|
||||
"1",
|
||||
"-vcodec",
|
||||
"mjpeg",
|
||||
"-",
|
||||
]
|
||||
|
||||
content = _execute_ffmpeg(self.content, cli, ignore_error_if_data=True)
|
||||
|
||||
if not content:
|
||||
raise errors.ProcessingError(
|
||||
"Error while creating thumbnail from video."
|
||||
)
|
||||
|
||||
return content
|
||||
|
||||
def to_webm(self) -> bytes:
|
||||
with util.create_temp_file_path(suffix=".log") as phase_log_path:
|
||||
# Pass 1
|
||||
self._execute(
|
||||
_execute_ffmpeg(
|
||||
self.content,
|
||||
[
|
||||
"-i",
|
||||
"{path}",
|
||||
"-",
|
||||
"-pass",
|
||||
"1",
|
||||
"-passlogfile",
|
||||
|
@ -137,14 +271,15 @@ class Image:
|
|||
"webm",
|
||||
"-y",
|
||||
"/dev/null",
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
# Pass 2
|
||||
return self._execute(
|
||||
return _execute_ffmpeg(
|
||||
self.content,
|
||||
[
|
||||
"-i",
|
||||
"{path}",
|
||||
"-",
|
||||
"-pass",
|
||||
"2",
|
||||
"-passlogfile",
|
||||
|
@ -160,58 +295,49 @@ class Image:
|
|||
"-f",
|
||||
"webm",
|
||||
"-",
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
def to_mp4(self) -> bytes:
|
||||
# I would like to know why making ffmpeg output to a tempfile is
|
||||
# necessary here and not when converting webms for example
|
||||
with util.create_temp_file_path(suffix=".dat") as mp4_temp_path:
|
||||
width = self.width
|
||||
height = self.height
|
||||
altered_dimensions = False
|
||||
_execute_ffmpeg(
|
||||
self.content,
|
||||
[
|
||||
"-i",
|
||||
"-",
|
||||
"-vcodec",
|
||||
"libx264",
|
||||
"-preset",
|
||||
"slow",
|
||||
"-crf",
|
||||
"22",
|
||||
"-b:v",
|
||||
"200K",
|
||||
"-profile:v",
|
||||
"main",
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
"-acodec",
|
||||
"aac",
|
||||
"-f",
|
||||
"mp4",
|
||||
"-y",
|
||||
mp4_temp_path,
|
||||
],
|
||||
)
|
||||
|
||||
if self.width % 2 != 0:
|
||||
width = self.width - 1
|
||||
altered_dimensions = True
|
||||
|
||||
if self.height % 2 != 0:
|
||||
height = self.height - 1
|
||||
altered_dimensions = True
|
||||
|
||||
args = [
|
||||
"-i",
|
||||
"{path}",
|
||||
"-vcodec",
|
||||
"libx264",
|
||||
"-preset",
|
||||
"slow",
|
||||
"-crf",
|
||||
"22",
|
||||
"-b:v",
|
||||
"200K",
|
||||
"-profile:v",
|
||||
"main",
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
"-acodec",
|
||||
"aac",
|
||||
"-f",
|
||||
"mp4",
|
||||
]
|
||||
|
||||
if altered_dimensions:
|
||||
args += ["-filter:v", "scale='%d:%d'" % (width, height)]
|
||||
|
||||
self._execute(args + ["-y", mp4_temp_path])
|
||||
|
||||
with open(mp4_temp_path, "rb") as mp4_temp:
|
||||
return mp4_temp.read()
|
||||
with open(mp4_temp_path, "rb") as data:
|
||||
return data.read()
|
||||
|
||||
def check_for_sound(self) -> bool:
|
||||
audioinfo = json.loads(
|
||||
self._execute(
|
||||
_execute_ffmpeg(
|
||||
self.content,
|
||||
[
|
||||
"-i",
|
||||
"{path}",
|
||||
"-",
|
||||
"-of",
|
||||
"json",
|
||||
"-select_streams",
|
||||
|
@ -221,17 +347,19 @@ class Image:
|
|||
program="ffprobe",
|
||||
).decode("utf-8")
|
||||
)
|
||||
|
||||
assert "streams" in audioinfo
|
||||
if len(audioinfo["streams"]) < 1:
|
||||
return False
|
||||
|
||||
log = self._execute(
|
||||
log = _execute_ffmpeg(
|
||||
self.content,
|
||||
[
|
||||
"-hide_banner",
|
||||
"-progress",
|
||||
"-",
|
||||
"-i",
|
||||
"{path}",
|
||||
"-",
|
||||
"-af",
|
||||
"volumedetect",
|
||||
"-max_muxing_queue_size",
|
||||
|
@ -255,66 +383,52 @@ class Image:
|
|||
# -91.0 dB is the minimum for 16-bit audio, assume sound if > -80.0 dB
|
||||
return meanvol > -80.0
|
||||
|
||||
def _execute(
|
||||
self,
|
||||
cli: List[str],
|
||||
program: str = "ffmpeg",
|
||||
ignore_error_if_data: bool = False,
|
||||
get_logs: bool = False,
|
||||
) -> bytes:
|
||||
mime_type = mime.get_mime_type(self.content)
|
||||
if mime.is_heif(mime_type):
|
||||
# FFmpeg does not support HEIF.
|
||||
# https://trac.ffmpeg.org/ticket/6521
|
||||
self.content = convert_heif_to_png(self.content)
|
||||
extension = mime.get_extension(mime_type)
|
||||
assert extension
|
||||
with util.create_temp_file(suffix="." + extension) as handle:
|
||||
handle.write(self.content)
|
||||
handle.flush()
|
||||
cli = [program, "-loglevel", "32" if get_logs else "24"] + cli
|
||||
cli = [part.format(path=handle.name) for part in cli]
|
||||
proc = subprocess.Popen(
|
||||
cli,
|
||||
stdout=subprocess.PIPE,
|
||||
stdin=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
out, err = proc.communicate(input=self.content)
|
||||
if proc.returncode != 0:
|
||||
logger.warning(
|
||||
"Failed to execute ffmpeg command (cli=%r, err=%r)",
|
||||
" ".join(shlex.quote(arg) for arg in cli),
|
||||
err,
|
||||
)
|
||||
if (len(out) > 0 and not ignore_error_if_data) or len(
|
||||
out
|
||||
) == 0:
|
||||
raise errors.ProcessingError(
|
||||
"Error while processing image.\n" + err.decode("utf-8")
|
||||
)
|
||||
return err if get_logs else out
|
||||
def _reload_info(self):
|
||||
cmd = [
|
||||
"-i",
|
||||
"-",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_streams",
|
||||
"-show_format",
|
||||
]
|
||||
|
||||
def _reload_info(self) -> None:
|
||||
self.info = json.loads(
|
||||
self._execute(
|
||||
[
|
||||
"-i",
|
||||
"{path}",
|
||||
"-of",
|
||||
"json",
|
||||
"-select_streams",
|
||||
"v",
|
||||
"-show_format",
|
||||
"-show_streams",
|
||||
],
|
||||
info = json.loads(
|
||||
_execute_ffmpeg(
|
||||
self.content,
|
||||
cmd,
|
||||
program="ffprobe",
|
||||
).decode("utf-8")
|
||||
)
|
||||
assert "format" in self.info
|
||||
assert "streams" in self.info
|
||||
if len(self.info["streams"]) < 1:
|
||||
logger.warning("The video contains no video streams.")
|
||||
|
||||
assert "streams" in info
|
||||
if len(info["streams"]) < 1:
|
||||
logger.warning("This video contains no video streams.")
|
||||
raise errors.ProcessingError(
|
||||
"The video contains no video streams."
|
||||
)
|
||||
|
||||
self.width = info["streams"][0]["width"]
|
||||
self.height = info["streams"][0]["height"]
|
||||
|
||||
assert "format" in info
|
||||
assert "tags" in info["format"]
|
||||
|
||||
self.date_taken = info["format"]["tags"]["creation_time"]
|
||||
|
||||
# List of tuples where only one value can be valid
|
||||
option_tuples = (
|
||||
("manufacturer", "com.android.manufacturer"),
|
||||
("model", "com.android.model"),
|
||||
)
|
||||
|
||||
camera_string = []
|
||||
|
||||
for option_tuple in option_tuples:
|
||||
for option in option_tuple:
|
||||
if option in info["format"]["tags"]:
|
||||
camera_string.append(info["format"]["tags"][option])
|
||||
break
|
||||
|
||||
if camera_string:
|
||||
self.camera = " ".join(camera_string)
|
||||
|
|
|
@ -1,183 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from subprocess import PIPE, Popen
|
||||
from typing import Optional, Tuple, Union
|
||||
|
||||
from exif import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
BASE_FFPROBE_COMMAND = [
|
||||
"ffprobe",
|
||||
"-loglevel",
|
||||
"8",
|
||||
"-print_format",
|
||||
"json",
|
||||
"-show_format",
|
||||
"-show_streams",
|
||||
]
|
||||
|
||||
|
||||
def _open_image(content: bytes) -> Image:
|
||||
tags = Image(content)
|
||||
|
||||
if not tags.has_exif or not tags.list_all():
|
||||
raise Exception
|
||||
|
||||
return tags
|
||||
|
||||
|
||||
def _run_ffprobe(content: Union[bytes, str]) -> Image:
|
||||
if isinstance(content, bytes):
|
||||
proc = Popen(
|
||||
BASE_FFPROBE_COMMAND + ["-"],
|
||||
stdin=PIPE,
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
)
|
||||
|
||||
output = proc.communicate(input=content)[0]
|
||||
else:
|
||||
proc = Popen(
|
||||
BASE_FFPROBE_COMMAND + [content],
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
)
|
||||
|
||||
output = proc.communicate()[0]
|
||||
|
||||
return json.loads(output)["format"]["tags"]
|
||||
|
||||
|
||||
def resolve_image_date_taken(
|
||||
content: Union[bytes, Image]
|
||||
) -> Optional[datetime]:
|
||||
try:
|
||||
if isinstance(content, Image):
|
||||
tags = content
|
||||
else:
|
||||
tags = _open_image(content)
|
||||
|
||||
resolved = None
|
||||
|
||||
for option in ("datetime", "datetime_original"):
|
||||
if option in tags.list_all():
|
||||
resolved = tags[option]
|
||||
break
|
||||
|
||||
if not resolved:
|
||||
raise Exception
|
||||
except Exception:
|
||||
return None
|
||||
else:
|
||||
return datetime.strptime(resolved, "%Y:%m:%d %H:%M:%S")
|
||||
|
||||
|
||||
def resolve_video_date_taken(
|
||||
content: Union[bytes, str, dict]
|
||||
) -> Optional[datetime]:
|
||||
try:
|
||||
if isinstance(content, dict):
|
||||
tags = content
|
||||
else:
|
||||
tags = _run_ffprobe(content)
|
||||
|
||||
creation_time = tags["creation_time"]
|
||||
except Exception:
|
||||
return None
|
||||
else:
|
||||
return datetime.fromisoformat(creation_time.rstrip("Z"))
|
||||
|
||||
|
||||
def resolve_image_camera(content: Union[bytes, Image]) -> Optional[str]:
|
||||
try:
|
||||
if isinstance(content, Image):
|
||||
tags = content
|
||||
else:
|
||||
tags = _open_image(content)
|
||||
|
||||
camera_string = []
|
||||
|
||||
for option in ("make", "model"):
|
||||
if option in tags.list_all():
|
||||
camera_string.append(tags[option])
|
||||
|
||||
if not camera_string:
|
||||
raise Exception
|
||||
except Exception:
|
||||
return None
|
||||
else:
|
||||
return " ".join(camera_string)
|
||||
|
||||
|
||||
def resolve_video_camera(content: Union[bytes, str, dict]) -> Optional[str]:
|
||||
try:
|
||||
if isinstance(content, dict):
|
||||
tags = content
|
||||
else:
|
||||
tags = _run_ffprobe(content)
|
||||
|
||||
# List of tuples where only one value can be valid
|
||||
option_tuples = (
|
||||
("manufacturer", "com.android.manufacturer"),
|
||||
("model", "com.android.model"),
|
||||
)
|
||||
|
||||
camera_string = []
|
||||
|
||||
for option_tuple in option_tuples:
|
||||
for option in option_tuple:
|
||||
if option in tags:
|
||||
camera_string.append(tags[option])
|
||||
break
|
||||
|
||||
if not camera_string:
|
||||
raise Exception
|
||||
except Exception:
|
||||
return None
|
||||
else:
|
||||
return " ".join(camera_string)
|
||||
|
||||
|
||||
def resolve_real_image_dimensions(
|
||||
content: Union[bytes, Image]
|
||||
) -> Optional[Tuple[int, int]]:
|
||||
try:
|
||||
if isinstance(content, Image):
|
||||
tags = content
|
||||
else:
|
||||
tags = _open_image(content)
|
||||
|
||||
orig_w = tags["pixel_x_dimension"]
|
||||
orig_h = tags["pixel_y_dimension"]
|
||||
|
||||
# read: https://jdhao.github.io/2019/07/31/image_rotation_exif_info/
|
||||
# 8, 6, 5, 7 are orientation values where the image is rotated 90
|
||||
# degrees CW or CCW. in this case, we swap the two dimensions.
|
||||
if tags["orientation"] in (8, 6, 5, 7):
|
||||
dimensions = (orig_h, orig_w)
|
||||
else:
|
||||
dimensions = (orig_w, orig_h)
|
||||
except Exception:
|
||||
return (0, 0)
|
||||
else:
|
||||
return dimensions
|
||||
|
||||
|
||||
def resolve_video_dimensions(
|
||||
content: Union[bytes, str, dict]
|
||||
) -> Optional[Tuple[int, int]]:
|
||||
try:
|
||||
if isinstance(content, dict):
|
||||
tags = content
|
||||
else:
|
||||
tags = _run_ffprobe(content)
|
||||
|
||||
stream = tags["format"]["streams"][0]
|
||||
dimensions = (stream["width"], stream["height"])
|
||||
except Exception:
|
||||
return (0, 0)
|
||||
else:
|
||||
return dimensions
|
|
@ -11,7 +11,6 @@ from szurubooru.func import (
|
|||
files,
|
||||
image_hash,
|
||||
images,
|
||||
metadata,
|
||||
mime,
|
||||
pools,
|
||||
scores,
|
||||
|
@ -510,7 +509,7 @@ def generate_alternate_formats(
|
|||
|
||||
if config.config["convert"]["gif"]["to_mp4"]:
|
||||
mp4_post, new_tags = create_post(
|
||||
images.Image(content).to_mp4(), tag_names, post.user
|
||||
images.Video(content).to_mp4(), tag_names, post.user
|
||||
)
|
||||
update_post_flags(mp4_post, ["loop"])
|
||||
update_post_safety(mp4_post, post.safety)
|
||||
|
@ -519,7 +518,7 @@ def generate_alternate_formats(
|
|||
|
||||
if config.config["convert"]["gif"]["to_webm"]:
|
||||
webm_post, new_tags = create_post(
|
||||
images.Image(content).to_webm(), tag_names, post.user
|
||||
images.Video(content).to_webm(), tag_names, post.user
|
||||
)
|
||||
update_post_flags(webm_post, ["loop"])
|
||||
update_post_safety(webm_post, post.safety)
|
||||
|
@ -542,7 +541,7 @@ def get_default_flags(content: bytes) -> List[str]:
|
|||
ret = []
|
||||
if mime.is_video(mime.get_mime_type(content)):
|
||||
ret.append(model.Post.FLAG_LOOP)
|
||||
if images.Image(content).check_for_sound():
|
||||
if images.Video(content).check_for_sound():
|
||||
ret.append(model.Post.FLAG_SOUND)
|
||||
return ret
|
||||
|
||||
|
@ -621,7 +620,10 @@ def update_post_content(post: model.Post, content: Optional[bytes]) -> None:
|
|||
update_signature = False
|
||||
post.mime_type = mime.get_mime_type(content)
|
||||
if mime.is_flash(post.mime_type):
|
||||
post.type = model.Post.TYPE_FLASH
|
||||
raise InvalidPostContentError(
|
||||
"Flash animations are deprecated in this build and are slowly "
|
||||
+ "being phased out."
|
||||
)
|
||||
elif mime.is_image(post.mime_type):
|
||||
update_signature = True
|
||||
if mime.is_animated_gif(content):
|
||||
|
@ -631,9 +633,7 @@ def update_post_content(post: model.Post, content: Optional[bytes]) -> None:
|
|||
elif mime.is_video(post.mime_type):
|
||||
post.type = model.Post.TYPE_VIDEO
|
||||
else:
|
||||
raise InvalidPostContentError(
|
||||
"Unhandled file type: %r" % post.mime_type
|
||||
)
|
||||
raise InvalidPostContentError(f"Unhandled file type: {post.mime_type}")
|
||||
|
||||
post.checksum = util.get_sha1(content)
|
||||
post.checksum_md5 = util.get_md5(content)
|
||||
|
@ -655,43 +655,32 @@ def update_post_content(post: model.Post, content: Optional[bytes]) -> None:
|
|||
post.signature = generate_post_signature(post, content)
|
||||
|
||||
post.file_size = len(content)
|
||||
|
||||
post.canvas_width = None
|
||||
post.canvas_height = None
|
||||
post.date_taken = None
|
||||
post.camera = None
|
||||
|
||||
try:
|
||||
if post.type == model.Post.TYPE_IMAGE:
|
||||
media = metadata._open_image(content)
|
||||
elif post.type == model.Post.TYPE_VIDEO:
|
||||
media = metadata._run_ffprobe(content)
|
||||
media = images.Image(content)
|
||||
elif post.type in (model.Post.TYPE_ANIMATION, model.Post.TYPE_VIDEO):
|
||||
media = images.Video(content)
|
||||
except Exception as ex:
|
||||
logger.exception(ex)
|
||||
if not config.config["allow_broken_uploads"]:
|
||||
raise InvalidPostContentError("Unable to process image metadata")
|
||||
else:
|
||||
post.canvas_width = None
|
||||
post.canvas_height = None
|
||||
else:
|
||||
if post.type == model.Post.TYPE_IMAGE:
|
||||
dimensions = metadata.resolve_real_image_dimensions(media)
|
||||
(post.canvas_width, post.canvas_height) = dimensions
|
||||
post.date_taken = metadata.resolve_image_date_taken(media)
|
||||
post.camera = metadata.resolve_image_camera(media)
|
||||
elif post.type == model.Post.TYPE_VIDEO:
|
||||
dimensions = metadata.resolve_video_dimensions(media)
|
||||
(post.canvas_width, post.canvas_height) = dimensions
|
||||
post.date_taken = metadata.resolve_video_date_taken(media)
|
||||
post.camera = metadata.resolve_video_camera(media)
|
||||
if not media.width or not media.height:
|
||||
if not config.config["allow_broken_uploads"]:
|
||||
raise InvalidPostContentError(
|
||||
"Invalid image dimensions returned during processing"
|
||||
)
|
||||
|
||||
if (post.canvas_width is not None and post.canvas_width <= 0) or (
|
||||
post.canvas_height is not None and post.canvas_height <= 0
|
||||
):
|
||||
if not config.config["allow_broken_uploads"]:
|
||||
raise InvalidPostContentError(
|
||||
"Invalid image dimensions returned during processing"
|
||||
)
|
||||
else:
|
||||
post.canvas_width = None
|
||||
post.canvas_height = None
|
||||
post.canvas_width = media.width
|
||||
post.canvas_height = media.height
|
||||
post.date_taken = media.date_taken
|
||||
post.camera = media.camera
|
||||
|
||||
setattr(post, "__content", content)
|
||||
|
||||
|
@ -711,12 +700,17 @@ def generate_post_thumbnail(post: model.Post) -> None:
|
|||
content = files.get(get_post_content_path(post))
|
||||
try:
|
||||
assert content
|
||||
image = images.Image(content)
|
||||
image.resize_fill(
|
||||
if post.type == model.Post.TYPE_IMAGE:
|
||||
media = images.Image(content)
|
||||
elif post.type == model.Post.TYPE_VIDEO:
|
||||
media = images.Video(content)
|
||||
|
||||
thumb = media.to_thumbnail(
|
||||
int(config.config["thumbnails"]["post_width"]),
|
||||
int(config.config["thumbnails"]["post_height"]),
|
||||
)
|
||||
files.save(get_post_thumbnail_path(post), image.to_jpeg())
|
||||
|
||||
files.save(get_post_thumbnail_path(post), thumb)
|
||||
except errors.ProcessingError:
|
||||
files.save(get_post_thumbnail_path(post), EMPTY_PIXEL)
|
||||
|
||||
|
|
Reference in a new issue