server: add post camera attr and functionality

- [overview] add camera attribute to posts. parsing is done in
  `func/metadata.py` and takes any of the available tags corresponding
  to "make" and "model" properties and concatenates them into a string
- [server] improved `func/metadata.py`:
  - added camera resolve functions for photos and videos
  - moved ffmpeg subprocess and exif image opening to separate function
  - optionally reuse existing collection of extracted tags in any of the
    functions
  - iterative approach to checking for tags' existence as opposed to
    imperative
  - (somewhat) better error handling
- [server] created alembic migration in `adb2acef2492_add_camera.py`
  - not only adds columns, but also scans files and updates their camera
    string
- [server] added camera attribute functionality and improved error
  handling in `func/posts.py`
- [server] add camera attribute to `model/post.py`
This commit is contained in:
skybldev 2021-12-02 21:27:26 -05:00
parent a48c792322
commit efe217344d
4 changed files with 203 additions and 41 deletions

View file

@ -2,54 +2,139 @@ import json
import logging import logging
from datetime import datetime from datetime import datetime
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from typing import Optional from typing import Optional, Union
from exif import Image from exif import Image
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def resolve_image_date_taken(content: bytes) -> Optional[datetime]: BASE_FFMPEG_COMMAND = [
try:
img = Image(content)
except Exception:
logger.warning("Error reading image with exif library!")
return None
if img.has_exif:
if "datetime" in img.list_all():
resolved = img.datetime
elif "datetime_original" in img.list_all():
resolved = img.datetime_original
else:
return None
return datetime.strptime(resolved, "%Y:%m:%d %H:%M:%S")
else:
return None
def resolve_video_date_taken(content: bytes) -> Optional[datetime]:
proc = Popen(
[
"ffprobe", "ffprobe",
"-loglevel", "-loglevel",
"8", "8",
"-print_format", "-print_format",
"json", "json",
"-show_format", "-show_format",
"-", ]
],
def _open_image(content: bytes) -> Image:
tags = Image(content)
if not tags.has_exif or not tags.list_all():
raise Exception
return tags
def _run_ffmpeg(content: Union[bytes, str]) -> Image:
if isinstance(content, bytes):
proc = Popen(
BASE_FFMPEG_COMMAND + ["-"],
stdin=PIPE, stdin=PIPE,
stdout=PIPE, stdout=PIPE,
stderr=PIPE, stderr=PIPE,
) )
output = proc.communicate(input=content)[0] output = proc.communicate(input=content)[0]
json_output = json.loads(output) else:
proc = Popen(
BASE_FFMPEG_COMMAND + [content],
stdout=PIPE,
stderr=PIPE,
)
output = proc.communicate()[0]
return json.loads(output)["format"]["tags"]
def resolve_image_date_taken(
content: Union[bytes, Image]
) -> Optional[datetime]:
try: try:
creation_time = json_output["format"]["tags"]["creation_time"] if isinstance(content, Image):
return datetime.fromisoformat(creation_time.rstrip("Z")) tags = content
else:
tags = _open_image(content)
resolved = None
for option in ("datetime", "datetimme_original"):
if option in tags.list_all():
resolved = tags[option]
break
if not resolved:
raise Exception
except Exception: except Exception:
return None return None
else:
return datetime.strptime(resolved, "%Y:%m:%d %H:%M:%S")
def resolve_video_date_taken(
content: Union[bytes, str, dict]
) -> Optional[datetime]:
try:
if isinstance(content, dict):
tags = content
else:
tags = _run_ffmpeg(content)
creation_time = tags["creation_time"]
except Exception:
return None
else:
return datetime.fromisoformat(creation_time.rstrip("Z"))
def resolve_image_camera(content: Union[bytes, Image]) -> Optional[str]:
try:
if isinstance(content, Image):
tags = content
else:
tags = _open_image(content)
camera_string = []
for option in ("make", "model"):
if option in tags.list_all():
camera_string.append(tags[option])
if not camera_string:
raise Exception
except Exception:
return None
else:
return " ".join(camera_string)
def resolve_video_camera(content: Union[bytes, str, dict]) -> Optional[str]:
try:
if isinstance(content, dict):
tags = content
else:
tags = _run_ffmpeg(content)
# List of tuples where only one value can be valid
option_tuples = (
("manufacturer", "com.android.manufacturer"),
("model", "com.android.model"),
)
camera_string = []
for option_tuple in option_tuples:
for option in option_tuple:
if option in tags:
camera_string.append(tags[option])
break
if not camera_string:
raise Exception
except Exception:
return None
else:
return " ".join(camera_string)

View file

@ -201,6 +201,7 @@ class PostSerializer(serialization.BaseSerializer):
"comments": self.serialize_comments, "comments": self.serialize_comments,
"pools": self.serialize_pools, "pools": self.serialize_pools,
"dateTaken": self.serialize_date_taken, "dateTaken": self.serialize_date_taken,
"camera": self.serialize_camera,
} }
def serialize_id(self) -> Any: def serialize_id(self) -> Any:
@ -349,6 +350,9 @@ class PostSerializer(serialization.BaseSerializer):
def serialize_date_taken(self) -> Any: def serialize_date_taken(self) -> Any:
return self.post.date_taken return self.post.date_taken
def serialize_camera(self) -> Any:
return self.post.camera
def serialize_post( def serialize_post(
post: Optional[model.Post], auth_user: model.User, options: List[str] = [] post: Optional[model.Post], auth_user: model.User, options: List[str] = []
@ -674,12 +678,25 @@ def update_post_content(post: model.Post, content: Optional[bytes]) -> None:
post.canvas_height = None post.canvas_height = None
setattr(post, "__content", content) setattr(post, "__content", content)
if post.type == model.Post.TYPE_IMAGE:
post.date_taken = metadata.resolve_image_date_taken(content)
elif post.type == model.Post.TYPE_VIDEO:
post.date_taken = metadata.resolve_video_date_taken(content)
else:
post.date_taken = None post.date_taken = None
post.camera = None
if post.type == model.Post.TYPE_IMAGE:
try:
image_tags = metadata._open_image(content)
except Exception:
pass
post.date_taken = metadata.resolve_image_date_taken(image_tags)
post.camera = metadata.resolve_image_camera(image_tags)
elif post.type == model.Post.TYPE_VIDEO:
try:
video_tags = metadata._run_ffmpeg(content)
except Exception:
pass
post.date_taken = metadata.resolve_video_date_taken(video_tags)
post.camera = metadata.resolve_video_camera(video_tags)
def update_post_thumbnail( def update_post_thumbnail(

View file

@ -0,0 +1,59 @@
"""
add_camera
Revision ID: adb2acef2492
Created at: 2021-12-01 13:06:14.285699
"""
import sqlalchemy as sa
from alembic import op
from szurubooru.func import files, metadata
revision = "adb2acef2492"
down_revision = "57aafb3bf9f0"
branch_labels = None
depends_on = None
def upgrade():
conn = op.get_bind()
op.add_column("post", sa.Column("camera", sa.Text(), nullable=True))
posts = sa.Table(
"post",
sa.MetaData(),
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("camera", sa.Text, nullable=True),
)
for file in list(files.scan("posts")):
filename = file.name
fullpath = files._get_full_path("posts/" + filename)
post_ext = filename.split(".")[1]
if post_ext in ["jpg", "jpeg", "png", "heif", "heic"]:
with open(fullpath, "rb") as img:
camera_string = metadata.resolve_image_camera(img)
elif post_ext in ["webm", "mp4", "avif"]:
camera_string = metadata.resolve_video_camera(
files._get_full_path(fullpath)
)
else:
continue
post_id = int(filename.split("_")[0])
conn.execute(
posts.update()
.where(posts.c.id == post_id)
.values(camera=camera_string)
)
op.alter_column("post", "camera", nullable=True)
def downgrade():
op.drop_column("post", "camera")

View file

@ -225,6 +225,7 @@ class Post(Base):
# EXIF things # EXIF things
date_taken = sa.Column("date_taken", sa.DateTime, nullable=True) date_taken = sa.Column("date_taken", sa.DateTime, nullable=True)
camera = sa.Column("camera", sa.Text, nullable=True)
# foreign tables # foreign tables
user = sa.orm.relationship("User") user = sa.orm.relationship("User")