from PIL import Image from disk_cache import DiskCache from io import BytesIO from mimetypes import types_map as mimetypes from zipfile import ZipFile from zlib import crc32 import os.path import sqlite3 import werkzeug.exceptions as exceptions DEFAULT_WEBP_QUALITY = 80 DEFAULT_WEBP_METHOD = 0 DEFAULT_WEBP_SIZE = 1906 # width of FullHD monitor without scroll bar mimetypes['.webp'] = 'image/webp' if os.environ.get('DISK_CACHE', '1') == '0': disk_cache = DiskCache(enabled=False) else: disk_cache = DiskCache() thumbnail_cache = {} # https://docs.python.org/3.7/library/sqlite3.html#sqlite3.Connection.row_factory def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d class BaseDB: def __init__(self, webp_quality, webp_method, webp_size): # lossy: 0-100 (used as quality) # lossless: 101-201 (101 subtracted and used as quality) if webp_quality > 100: webp_lossless = True webp_quality = webp_quality - 101 else: webp_lossless = False self.webp_config = { 'quality': webp_quality, 'method': webp_method, 'lossless': webp_lossless, 'size': webp_size } def _generate_webp(self, fp, max_size=None): if max_size is None: max_size = tuple([self.webp_config['size']] * 2) image = Image.open(fp) image.thumbnail(max_size) image_buffer = BytesIO() image.save( image_buffer, format='webp', save_all=True, # append_images=[image], # https://github.com/python-pillow/Pillow/issues/4042 quality=self.webp_config['quality'], method=self.webp_config['method'], lossless=self.webp_config['lossless'] ) image_buffer.seek(0) return image_buffer def _generate_thumbnail(self, filepath): if filepath not in thumbnail_cache: thumbnail_buffer = self._generate_webp(filepath, max_size=(512, 512)) data = thumbnail_buffer.read() etag = str(crc32(data)) thumbnail_cache[filepath] = { 'data_raw': data, 'etag': etag, 'mimetype': 'image/webp' } thumbnail = thumbnail_cache[filepath] thumbnail['buffer'] = BytesIO() thumbnail['buffer'].write(thumbnail['data_raw']) thumbnail['buffer'].seek(0) return thumbnail def _generate_page(self, page_buffer, volume, page): page_buffer = self._generate_webp(page_buffer) disk_cache.set(f'{volume}-{page}', page_buffer) page_buffer.seek(0) return page_buffer class CalibreDB(BaseDB): def __init__(self, path='metadata.db', webp_quality=DEFAULT_WEBP_QUALITY, webp_method=DEFAULT_WEBP_METHOD, webp_size=DEFAULT_WEBP_SIZE): super().__init__( webp_quality=webp_quality, webp_method=webp_method, webp_size=webp_size ) self.database_path = f'file:{path}?mode=ro' def create_cursor(self): conn = sqlite3.connect(self.database_path, uri=True) conn.row_factory = dict_factory return conn.cursor() def get_series_list(self): cursor = self.create_cursor() series = cursor.execute(''' select series.id as id, series.name as name, count(*) as volumes from books, books_series_link, data, series where books_series_link.series = series.id and books_series_link.book = books.id and books.id = data.book and data.format = \'CBZ\' group by series.name having min(books.series_index) ''') return series def get_series_cover(self, series_id): cursor = self.create_cursor() first_volume = cursor.execute(''' select books.id from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? group by series.name having min(books.series_index) ''', (str(series_id),)).fetchone() if first_volume is None: raise exceptions.NotFound() return self.get_volume_cover(first_volume['id']) def get_series_cover_thumbnail(self, series_id): return self._generate_thumbnail(self.get_series_cover(series_id)) def get_series_volumes(self, series_id): cursor = self.create_cursor() title = cursor.execute(''' select series.name from series where series.id = ? ''', (str(series_id),)).fetchone()['name'] volumes = cursor.execute(''' select books.id, books.title, books.series_index as "index" from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? order by books.series_index ''', (str(series_id),)).fetchall() return { 'title': title, 'volumes': volumes } def get_volume_cover(self, volume_id): cursor = self.create_cursor() volume = cursor.execute(''' select books.has_cover as has_cover, books.path as path from books where books.id = ? order by books.series_index ''', (str(volume_id),)).fetchone() if volume['has_cover']: return volume['path'] + '/cover.jpg' else: raise exceptions.NotFound() def get_volume_cover_thumbnail(self, volume_id): return self._generate_thumbnail(self.get_volume_cover(volume_id)) def get_volume_filepath(self, volume_id): cursor = self.create_cursor() location = cursor.execute(''' select books.path as path, data.name as filename, lower(data.format) as extension from books, data where data.book = books.id and books.id = ? ''', (str(volume_id),)).fetchone() if location is None: raise exceptions.NotFound() return location['path'] + '/' + location['filename'] + '.' + location['extension'] def get_volume_info(self, volume_id): cursor = self.create_cursor() volume_info = cursor.execute(''' select books.title, books_series_link.series from books, books_series_link where books_series_link.book = books.id and books.id = ? ''', (str(volume_id),)).fetchone() volume_info['pages'] = self.get_volume_page_number(volume_id) return volume_info def get_volume_page_number(self, volume_id): path = self.get_volume_filepath(volume_id) with ZipFile(path, 'r') as volume: return len(volume.filelist) def get_volume_page(self, volume_id, page_number, original=False): if page_number < 1: raise exceptions.NotFound() path = self.get_volume_filepath(volume_id) with ZipFile(path, 'r') as volume: try: zip_info = volume.filelist[page_number - 1] except IndexError: raise exceptions.NotFound() return None page_filename = zip_info.filename if original is True: mimetype = mimetypes[os.path.splitext(page_filename)[1]] page_buffer = BytesIO() page_buffer.write(volume.read(page_filename)) page_buffer.seek(0) else: mimetype = 'image/webp' try: page_buffer = disk_cache.get(f'{volume_id}-{page_number}') except FileNotFoundError: with volume.open(page_filename) as orig_page_buffer: page_buffer = self._generate_page(orig_page_buffer, volume_id, page_number) return { 'buffer': page_buffer, 'mimetype': mimetype, 'etag': str(zip_info.CRC) }