This commit is contained in:
amras0000 2024-11-22 05:22:35 +00:00 committed by GitHub
commit 1a21698bec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 155 additions and 36 deletions

View file

@ -13,6 +13,7 @@ RUN apk --no-cache add \
libheif-dev \
libavif \
libavif-dev \
exiftool \
ffmpeg \
# from requirements.txt:
py3-yaml \

View file

@ -7,7 +7,7 @@ from typing import Any, Callable, List, Optional, Set, Tuple
import HeifImagePlugin
import numpy as np
import pillow_avif
from PIL import Image
from PIL import Image, ImageOps
from szurubooru import config, errors
@ -40,7 +40,7 @@ NpMatrix = np.ndarray
def _preprocess_image(content: bytes) -> NpMatrix:
try:
img = Image.open(BytesIO(content))
img = ImageOps.exif_transpose(Image.open(BytesIO(content)))
return np.asarray(img.convert("L"), dtype=np.uint8)
except (IOError, ValueError):
raise errors.ProcessingError(

View file

@ -5,7 +5,8 @@ import re
import shlex
import subprocess
from io import BytesIO
from typing import List
from typing import List, Optional
import datetime
import HeifImagePlugin
import pillow_avif
@ -16,6 +17,29 @@ from szurubooru.func import mime, util
logger = logging.getLogger(__name__)
# Refer to:
# https://exiftool.org/TagNames/EXIF.html
# https://ffmpeg.org/ffmpeg-filters.html#transpose-1
# https://www.impulseadventure.com/photo/images/orient_flag.gif
ORIENTATION_FILTER = {
"Horizontal (normal)": "null",
"Mirror Horizontal": "transpose=clock_flip,transpose=cclock",
"Rotate 180": "transpose=clock,transpose=clock",
"Mirror vertical": "transpose=clock_flip,transpose=clock",
"Mirror horizontal and rotate 270 CW": "transpose=cclock_flip,transpose=clock,transpose=clock",
"Rotate 90 CW": "transpose=clock",
"Mirror horizontal and rotate 90 CW": "transpose=clock_flip,transpose=clock,transpose=clock",
"Rotate 270 CW": "transpose=cclock",
}
ORTHOGONAL_ORIENTATIONS = (
"Mirror horizontal and rotate 270 CW",
"Rotate 90 CW",
"Mirror horizontal and rotate 90 CW",
"Rotate 270 CW",
)
def convert_heif_to_png(content: bytes) -> bytes:
img = PILImage.open(BytesIO(content))
@ -31,27 +55,72 @@ class Image:
@property
def width(self) -> int:
return self.info["streams"][0]["width"]
if self._is_orthogonal():
return self.info["ImageHeight"]
return self.info["ImageWidth"]
@property
def height(self) -> int:
return self.info["streams"][0]["height"]
if self._is_orthogonal():
return self.info["ImageWidth"]
return self.info["ImageHeight"]
@property
def frames(self) -> int:
return self.info["streams"][0]["nb_read_frames"]
def duration(self) -> Optional[datetime.timedelta]:
try:
duration_data = self.info["Duration"]
except KeyError:
return None
time_formats = [
"%H:%M:%S",
"%H:%M:%S.%f",
"%M:%S",
"%M:%S.%f",
"%S.%f s",
]
for time_format in time_formats:
try:
duration = datetime.datetime.strptime(
duration_data, time_format).time()
return datetime.timedelta(
hours=duration.hour,
minutes=duration.minute,
seconds = duration.second,
microseconds=duration.microsecond)
except ValueError:
pass
logger.warning("Unexpected time format(duration=%r)", duration_data)
return None
def _orientation_filter(self) -> str:
# This filter should be omitted in ffmpeg>=6.0,
# where it is automatically applied.
try:
return ORIENTATION_FILTER[self.info["Orientation"]]
except KeyError:
return "null"
def _is_orthogonal(self) -> bool:
try:
return self.info["Orientation"] in ORTHOGONAL_ORIENTATIONS
except KeyError:
return False
def resize_fill(self, width: int, height: int) -> None:
width_greater = self.width > self.height
width, height = (-1, height) if width_greater else (width, -1)
filters = "{orientation},scale='{width}:{height}'".format(
orientation=self._orientation_filter(), width=width, height=height)
cli = [
"-i",
"{path}",
"-f",
"image2",
"-filter:v",
"scale='{width}:{height}'".format(width=width, height=height),
filters,
"-map",
"0:v:0",
"-vframes",
@ -60,15 +129,13 @@ class Image:
"png",
"-",
]
if (
"duration" in self.info["format"]
and self.info["format"]["format_name"] != "swf"
):
duration = float(self.info["format"]["duration"])
if duration > 3:
duration = self.duration
if duration is not None and self.info["FileType"] != "SWF":
total_seconds = duration.total_seconds()
if total_seconds > 3:
cli = [
"-ss",
"%d" % math.floor(duration * 0.3),
"%d" % math.floor(total_seconds * 0.3),
] + cli
content = self._execute(cli, ignore_error_if_data=True)
if not content:
@ -83,6 +150,8 @@ class Image:
"{path}",
"-f",
"image2",
"-filter:v",
self._orientation_filter(),
"-map",
"0:v:0",
"-vframes",
@ -105,7 +174,7 @@ class Image:
"-f",
"image2",
"-filter_complex",
"overlay",
"overlay," + self._orientation_filter(),
"-map",
"0:v:0",
"-vframes",
@ -117,6 +186,7 @@ class Image:
)
def to_webm(self) -> bytes:
filters = self._orientation_filter()
with util.create_temp_file_path(suffix=".log") as phase_log_path:
# Pass 1
self._execute(
@ -127,6 +197,8 @@ class Image:
"1",
"-passlogfile",
phase_log_path,
"-filter:v",
filters,
"-vcodec",
"libvpx-vp9",
"-crf",
@ -151,6 +223,8 @@ class Image:
"2",
"-passlogfile",
phase_log_path,
"-filter:v",
filters,
"-vcodec",
"libvpx-vp9",
"-crf",
@ -179,6 +253,10 @@ class Image:
height = self.height - 1
altered_dimensions = True
filters = self._orientation_filter()
if altered_dimensions:
filters += ",scale='%d:%d'" % (width, height)
args = [
"-i",
"{path}",
@ -198,11 +276,10 @@ class Image:
"aac",
"-f",
"mp4",
"-filter:v",
filters,
]
if altered_dimensions:
args += ["-filter:v", "scale='%d:%d'" % (width, height)]
self._execute(args + ["-y", mp4_temp_path])
with open(mp4_temp_path, "rb") as mp4_temp:
@ -274,8 +351,10 @@ class Image:
with util.create_temp_file(suffix="." + extension) as handle:
handle.write(self.content)
handle.flush()
cli = [program, "-loglevel", "32" if get_logs else "24"] + cli
cli = [part.format(path=handle.name) for part in cli]
if program in ("ffmpeg", "ffprobe"):
cli = ["-loglevel", "32" if get_logs else "24"] + cli
cli = [program] + cli
proc = subprocess.Popen(
cli,
stdout=subprocess.PIPE,
@ -285,7 +364,7 @@ class Image:
out, err = proc.communicate()
if proc.returncode != 0:
logger.warning(
"Failed to execute ffmpeg command (cli=%r, err=%r)",
"Failed to execute command (cli=%r, err=%r)",
" ".join(shlex.quote(arg) for arg in cli),
err,
)
@ -298,25 +377,25 @@ class Image:
return err if get_logs else out
def _reload_info(self) -> None:
self.info = json.loads(
exiftool_data = json.loads(
self._execute(
[
"-i",
"{path}",
"-of",
"json",
"-select_streams",
"v",
"-show_format",
"-show_streams",
"-json",
],
program="ffprobe",
program="exiftool",
).decode("utf-8")
)
assert "format" in self.info
assert "streams" in self.info
if len(self.info["streams"]) < 1:
logger.warning("The video contains no video streams.")
if len(exiftool_data) != 1:
logger.warning("Unexpected output from exiftool")
self.info = exiftool_data[0]
if "Error" in self.info:
raise errors.ProcessingError(
"The video contains no video streams."
)
"Error in metadata:" + str(self.info["Error"]))
if "Warning" in self.info:
raise errors.ProcessingError(
"Warning in metadata:" + str(self.info["Warning"]))

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

View file

@ -773,6 +773,45 @@ def test_update_post_content_convert_heif_to_png_when_processing(
assert os.path.exists(generated_path)
def test_update_post_content_with_exif_orientation(
tmpdir, config_injector, read_asset, post_factory):
# exif.jpg is a copy of jpeg.jpg,
# rotated counter-clockwise by 90 degrees,
# and assigned the EXIF Orientation tag "Rotate 90 CW"
config_injector(
{
"data_dir": str(tmpdir.mkdir("data")),
"thumbnails": {
"post_width": 300,
"post_height": 300,
},
"secret": "test",
"allow_broken_uploads": False,
}
)
post = post_factory(id=1)
db.session.add(post)
posts.update_post_content(post, read_asset("exif.jpg"))
thumbnail_path = (
"{}/data/generated-thumbnails/1_244c8840887984c4.jpg".format(tmpdir))
db.session.flush()
assert post.canvas_width == 100
assert post.canvas_height == 75
with open(thumbnail_path, "rb") as handle:
thumbnail = images.Image(handle.read())
assert thumbnail.width == 400
assert thumbnail.height == 300
search_result = posts.search_by_image(read_asset("jpeg.jpg"))
assert len(search_result) == 1
search_distance, search_post = search_result[0]
assert search_post.post_id == post.post_id
assert abs(search_distance) < 0.15
def test_update_post_tags(tag_factory):
post = model.Post()
with patch("szurubooru.func.tags.get_or_create_tags_by_names"):