322 lines
9.2 KiB
Python
322 lines
9.2 KiB
Python
from PIL import Image
|
|
from disk_cache import DiskCache
|
|
from io import BytesIO
|
|
from mimetypes import types_map as mimetypes
|
|
from zipfile import ZipFile
|
|
from zlib import crc32
|
|
import os.path
|
|
import sqlite3
|
|
import werkzeug.exceptions as exceptions
|
|
|
|
DEFAULT_WEBP_QUALITY = 80
|
|
DEFAULT_WEBP_METHOD = 0
|
|
DEFAULT_WEBP_SIZE = 1908 # width of FullHD monitor without scroll bar in full screen
|
|
|
|
mimetypes[".webp"] = "image/webp"
|
|
|
|
if os.environ.get("DISK_CACHE", "1") == "0":
|
|
disk_cache = DiskCache(enabled=False)
|
|
else:
|
|
disk_cache = DiskCache()
|
|
thumbnail_cache = {}
|
|
|
|
|
|
# https://docs.python.org/3.7/library/sqlite3.html#sqlite3.Connection.row_factory
|
|
def dict_factory(cursor, row):
|
|
d = {}
|
|
for idx, col in enumerate(cursor.description):
|
|
d[col[0]] = row[idx]
|
|
return d
|
|
|
|
|
|
def filter_zip_filelist(filelist):
|
|
return [file for file in filelist if not file.is_dir()]
|
|
|
|
|
|
class BaseDB:
|
|
def __init__(self, webp_quality, webp_method, webp_size):
|
|
# lossy: 0-100 (used as quality)
|
|
# lossless: 101-201 (101 subtracted and used as quality)
|
|
if webp_quality > 100:
|
|
webp_lossless = True
|
|
webp_quality = webp_quality - 101
|
|
else:
|
|
webp_lossless = False
|
|
|
|
self.webp_config = {
|
|
"quality": webp_quality,
|
|
"method": webp_method,
|
|
"lossless": webp_lossless,
|
|
"size": webp_size,
|
|
}
|
|
|
|
def _generate_webp(self, fp, max_size=None):
|
|
if max_size is None:
|
|
max_size = tuple([self.webp_config["size"]] * 2)
|
|
|
|
image = Image.open(fp)
|
|
image.thumbnail(max_size)
|
|
image_buffer = BytesIO()
|
|
image.save(
|
|
image_buffer,
|
|
format="webp",
|
|
save_all=True,
|
|
append_images=[
|
|
image
|
|
], # https://github.com/python-pillow/Pillow/issues/4042
|
|
quality=self.webp_config["quality"],
|
|
method=self.webp_config["method"],
|
|
lossless=self.webp_config["lossless"],
|
|
)
|
|
|
|
image_buffer.seek(0)
|
|
|
|
return image_buffer
|
|
|
|
def _generate_thumbnail(self, filepath):
|
|
if filepath not in thumbnail_cache:
|
|
thumbnail_buffer = self._generate_webp(filepath, max_size=(512, 512))
|
|
|
|
data = thumbnail_buffer.read()
|
|
etag = str(crc32(data))
|
|
|
|
thumbnail_cache[filepath] = {
|
|
"data_raw": data,
|
|
"etag": etag,
|
|
"mimetype": "image/webp",
|
|
}
|
|
|
|
thumbnail = thumbnail_cache[filepath]
|
|
thumbnail["buffer"] = BytesIO()
|
|
thumbnail["buffer"].write(thumbnail["data_raw"])
|
|
thumbnail["buffer"].seek(0)
|
|
|
|
return thumbnail
|
|
|
|
def _generate_page(self, page_buffer, volume, page):
|
|
page_buffer = self._generate_webp(page_buffer)
|
|
disk_cache.set(f"{volume}-{page}", page_buffer)
|
|
page_buffer.seek(0)
|
|
|
|
return page_buffer
|
|
|
|
|
|
class CalibreDB(BaseDB):
|
|
def __init__(
|
|
self,
|
|
path="metadata.db",
|
|
webp_quality=DEFAULT_WEBP_QUALITY,
|
|
webp_method=DEFAULT_WEBP_METHOD,
|
|
webp_size=DEFAULT_WEBP_SIZE,
|
|
):
|
|
super().__init__(
|
|
webp_quality=webp_quality, webp_method=webp_method, webp_size=webp_size
|
|
)
|
|
|
|
self.database_path = f"file:{path}?mode=ro"
|
|
|
|
def create_cursor(self):
|
|
conn = sqlite3.connect(self.database_path, uri=True)
|
|
conn.row_factory = dict_factory
|
|
return conn.cursor()
|
|
|
|
def get_series_list(self):
|
|
cursor = self.create_cursor()
|
|
series = cursor.execute(
|
|
"""
|
|
select
|
|
series.id as id,
|
|
series.name as name,
|
|
count(*) as volumes
|
|
from
|
|
books,
|
|
books_series_link,
|
|
data,
|
|
series
|
|
where
|
|
books_series_link.series = series.id and
|
|
books_series_link.book = books.id and
|
|
books.id = data.book and
|
|
data.format = \'CBZ\'
|
|
group by series.name
|
|
having min(books.series_index)
|
|
"""
|
|
)
|
|
|
|
return series
|
|
|
|
def get_series_cover(self, series_id):
|
|
cursor = self.create_cursor()
|
|
first_volume = cursor.execute(
|
|
"""
|
|
select
|
|
books.id
|
|
from
|
|
books,
|
|
books_series_link,
|
|
series
|
|
where
|
|
books_series_link.book = books.id and
|
|
books_series_link.series = series.id and
|
|
series.id = ?
|
|
group by series.name
|
|
having min(books.series_index)
|
|
""",
|
|
(str(series_id),),
|
|
).fetchone()
|
|
|
|
if first_volume is None:
|
|
raise exceptions.NotFound()
|
|
|
|
return self.get_volume_cover(first_volume["id"])
|
|
|
|
def get_series_cover_thumbnail(self, series_id):
|
|
return self._generate_thumbnail(self.get_series_cover(series_id))
|
|
|
|
def get_series_volumes(self, series_id):
|
|
cursor = self.create_cursor()
|
|
title = cursor.execute(
|
|
"""
|
|
select
|
|
series.name
|
|
from
|
|
series
|
|
where
|
|
series.id = ?
|
|
""",
|
|
(str(series_id),),
|
|
).fetchone()["name"]
|
|
volumes = cursor.execute(
|
|
"""
|
|
select
|
|
books.id,
|
|
books.title,
|
|
books.series_index as "index"
|
|
from
|
|
books,
|
|
books_series_link,
|
|
series
|
|
where
|
|
books_series_link.book = books.id and
|
|
books_series_link.series = series.id and
|
|
series.id = ?
|
|
order by books.series_index
|
|
""",
|
|
(str(series_id),),
|
|
).fetchall()
|
|
|
|
return {"title": title, "volumes": volumes}
|
|
|
|
def get_volume_cover(self, volume_id):
|
|
cursor = self.create_cursor()
|
|
volume = cursor.execute(
|
|
"""
|
|
select
|
|
books.has_cover as has_cover,
|
|
books.path as path
|
|
from
|
|
books
|
|
where
|
|
books.id = ?
|
|
order by books.series_index
|
|
""",
|
|
(str(volume_id),),
|
|
).fetchone()
|
|
|
|
if volume["has_cover"]:
|
|
return volume["path"] + "/cover.jpg"
|
|
else:
|
|
raise exceptions.NotFound()
|
|
|
|
def get_volume_cover_thumbnail(self, volume_id):
|
|
return self._generate_thumbnail(self.get_volume_cover(volume_id))
|
|
|
|
def get_volume_filepath(self, volume_id):
|
|
cursor = self.create_cursor()
|
|
location = cursor.execute(
|
|
"""
|
|
select
|
|
books.path as path,
|
|
data.name as filename,
|
|
lower(data.format) as extension
|
|
from
|
|
books,
|
|
data
|
|
where
|
|
data.book = books.id and
|
|
books.id = ?
|
|
""",
|
|
(str(volume_id),),
|
|
).fetchone()
|
|
|
|
if location is None:
|
|
raise exceptions.NotFound()
|
|
|
|
return (
|
|
location["path"] + "/" + location["filename"] + "." + location["extension"]
|
|
)
|
|
|
|
def get_volume_info(self, volume_id):
|
|
cursor = self.create_cursor()
|
|
volume_info = cursor.execute(
|
|
"""
|
|
select
|
|
books.title,
|
|
books_series_link.series
|
|
from
|
|
books,
|
|
books_series_link
|
|
where
|
|
books_series_link.book = books.id and
|
|
books.id = ?
|
|
""",
|
|
(str(volume_id),),
|
|
).fetchone()
|
|
|
|
volume_info["pages"] = self.get_volume_page_number(volume_id)
|
|
|
|
return volume_info
|
|
|
|
def get_volume_page_number(self, volume_id):
|
|
path = self.get_volume_filepath(volume_id)
|
|
with ZipFile(path, "r") as volume:
|
|
filelist = filter_zip_filelist(volume.filelist)
|
|
return len(filelist)
|
|
|
|
def get_volume_page(self, volume_id, page_number, original=False):
|
|
if page_number < 1:
|
|
raise exceptions.NotFound()
|
|
path = self.get_volume_filepath(volume_id)
|
|
with ZipFile(path, "r") as volume:
|
|
try:
|
|
filelist = filter_zip_filelist(volume.filelist)
|
|
zip_info = filelist[page_number - 1]
|
|
except IndexError:
|
|
raise exceptions.NotFound()
|
|
return None
|
|
|
|
page_filename = zip_info.filename
|
|
|
|
if original is True:
|
|
mimetype = mimetypes[os.path.splitext(page_filename)[1]]
|
|
|
|
page_buffer = BytesIO()
|
|
page_buffer.write(volume.read(page_filename))
|
|
page_buffer.seek(0)
|
|
else:
|
|
mimetype = "image/webp"
|
|
|
|
try:
|
|
page_buffer = disk_cache.get(f"{volume_id}-{page_number}")
|
|
except FileNotFoundError:
|
|
with volume.open(page_filename) as orig_page_buffer:
|
|
page_buffer = self._generate_page(
|
|
orig_page_buffer, volume_id, page_number
|
|
)
|
|
|
|
return {
|
|
"buffer": page_buffer,
|
|
"mimetype": mimetype,
|
|
"etag": str(zip_info.CRC),
|
|
}
|