from PIL import Image from disk_cache import DiskCache from io import BytesIO from mimetypes import types_map as mimetypes from zipfile import ZipFile from zlib import crc32 import os.path import sqlite3 import werkzeug.exceptions as exceptions DEFAULT_WEBP_QUALITY = 80 DEFAULT_WEBP_METHOD = 0 DEFAULT_WEBP_SIZE = 1908 # width of FullHD monitor without scroll bar in full screen mimetypes[".webp"] = "image/webp" if os.environ.get("DISK_CACHE", "1") == "0": disk_cache = DiskCache(enabled=False) else: disk_cache = DiskCache() thumbnail_cache = {} # https://docs.python.org/3.7/library/sqlite3.html#sqlite3.Connection.row_factory def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d def filter_zip_filelist(filelist): return [file for file in filelist if not file.is_dir()] class BaseDB: def __init__(self, webp_quality, webp_method, webp_size): # lossy: 0-100 (used as quality) # lossless: 101-201 (101 subtracted and used as quality) if webp_quality > 100: webp_lossless = True webp_quality = webp_quality - 101 else: webp_lossless = False self.webp_config = { "quality": webp_quality, "method": webp_method, "lossless": webp_lossless, "size": webp_size, } def _generate_webp(self, fp, max_size=None): if max_size is None: max_size = tuple([self.webp_config["size"]] * 2) image = Image.open(fp) image.thumbnail(max_size) image_buffer = BytesIO() image.save( image_buffer, format="webp", save_all=True, append_images=[ image ], # https://github.com/python-pillow/Pillow/issues/4042 quality=self.webp_config["quality"], method=self.webp_config["method"], lossless=self.webp_config["lossless"], ) image_buffer.seek(0) return image_buffer def _generate_thumbnail(self, filepath): if filepath not in thumbnail_cache: thumbnail_buffer = self._generate_webp(filepath, max_size=(512, 512)) data = thumbnail_buffer.read() etag = str(crc32(data)) thumbnail_cache[filepath] = { "data_raw": data, "etag": etag, "mimetype": "image/webp", } thumbnail = thumbnail_cache[filepath] thumbnail["buffer"] = BytesIO() thumbnail["buffer"].write(thumbnail["data_raw"]) thumbnail["buffer"].seek(0) return thumbnail def _generate_page(self, page_buffer, volume, page): page_buffer = self._generate_webp(page_buffer) disk_cache.set(f"{volume}-{page}", page_buffer) page_buffer.seek(0) return page_buffer class CalibreDB(BaseDB): def __init__( self, path="metadata.db", webp_quality=DEFAULT_WEBP_QUALITY, webp_method=DEFAULT_WEBP_METHOD, webp_size=DEFAULT_WEBP_SIZE, ): super().__init__( webp_quality=webp_quality, webp_method=webp_method, webp_size=webp_size ) self.database_path = f"file:{path}?mode=ro" def create_cursor(self): conn = sqlite3.connect(self.database_path, uri=True) conn.row_factory = dict_factory return conn.cursor() def get_series_list(self): cursor = self.create_cursor() series = cursor.execute( """ select series.id as id, series.name as name, count(*) as volumes from books, books_series_link, data, series where books_series_link.series = series.id and books_series_link.book = books.id and books.id = data.book and data.format = \'CBZ\' group by series.name having min(books.series_index) """ ) return series def get_series_cover(self, series_id): cursor = self.create_cursor() first_volume = cursor.execute( """ select books.id from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? group by series.name having min(books.series_index) """, (str(series_id),), ).fetchone() if first_volume is None: raise exceptions.NotFound() return self.get_volume_cover(first_volume["id"]) def get_series_cover_thumbnail(self, series_id): return self._generate_thumbnail(self.get_series_cover(series_id)) def get_series_volumes(self, series_id): cursor = self.create_cursor() title = cursor.execute( """ select series.name from series where series.id = ? """, (str(series_id),), ).fetchone()["name"] volumes = cursor.execute( """ select books.id, books.title, books.series_index as "index" from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? order by books.series_index """, (str(series_id),), ).fetchall() return {"title": title, "volumes": volumes} def get_volume_cover(self, volume_id): cursor = self.create_cursor() volume = cursor.execute( """ select books.has_cover as has_cover, books.path as path from books where books.id = ? order by books.series_index """, (str(volume_id),), ).fetchone() if volume["has_cover"]: return volume["path"] + "/cover.jpg" else: raise exceptions.NotFound() def get_volume_cover_thumbnail(self, volume_id): return self._generate_thumbnail(self.get_volume_cover(volume_id)) def get_volume_filepath(self, volume_id): cursor = self.create_cursor() location = cursor.execute( """ select books.path as path, data.name as filename, lower(data.format) as extension from books, data where data.book = books.id and books.id = ? """, (str(volume_id),), ).fetchone() if location is None: raise exceptions.NotFound() return ( location["path"] + "/" + location["filename"] + "." + location["extension"] ) def get_volume_info(self, volume_id): cursor = self.create_cursor() volume_info = cursor.execute( """ select books.title, books_series_link.series from books, books_series_link where books_series_link.book = books.id and books.id = ? """, (str(volume_id),), ).fetchone() volume_info["pages"] = self.get_volume_page_number(volume_id) return volume_info def get_volume_page_number(self, volume_id): path = self.get_volume_filepath(volume_id) with ZipFile(path, "r") as volume: filelist = filter_zip_filelist(volume.filelist) return len(filelist) def get_volume_page(self, volume_id, page_number, original=False): if page_number < 1: raise exceptions.NotFound() path = self.get_volume_filepath(volume_id) with ZipFile(path, "r") as volume: try: filelist = filter_zip_filelist(volume.filelist) zip_info = filelist[page_number - 1] except IndexError: raise exceptions.NotFound() return None page_filename = zip_info.filename if original is True: mimetype = mimetypes[os.path.splitext(page_filename)[1]] page_buffer = BytesIO() page_buffer.write(volume.read(page_filename)) page_buffer.seek(0) else: mimetype = "image/webp" try: page_buffer = disk_cache.get(f"{volume_id}-{page_number}") except FileNotFoundError: with volume.open(page_filename) as orig_page_buffer: page_buffer = self._generate_page( orig_page_buffer, volume_id, page_number ) return { "buffer": page_buffer, "mimetype": mimetype, "etag": str(zip_info.CRC), }