from PIL import Image from disk_cache import DiskCache from io import BytesIO from mimetypes import types_map as mimetypes from zipfile import ZipFile from zlib import crc32 import os.path import sqlite3 import werkzeug.exceptions as exceptions mimetypes['.webp'] = 'image/webp' if os.environ.get('DISK_CACHE', '1') == '0': disk_cache = DiskCache(enabled=False) else: disk_cache = DiskCache() thumbnail_cache = {} # https://docs.python.org/3.7/library/sqlite3.html#sqlite3.Connection.row_factory def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d def generate_thumbnail(filepath): if filepath not in thumbnail_cache: image = Image.open(filepath) image.thumbnail((512, 512)) thumbnail = BytesIO() image.save(thumbnail, 'webp') thumbnail.seek(0) data = thumbnail.read() etag = str(crc32(data)) thumbnail_cache[filepath] = { 'data_raw': data, 'etag': etag, 'mimetype': 'image/webp' } thumbnail = thumbnail_cache[filepath] thumbnail['data'] = BytesIO() thumbnail['data'].write(thumbnail['data_raw']) thumbnail['data'].seek(0) return thumbnail class CalibreDB: def __init__(self, path='metadata.db', enable_webp=True, webp_quality=80, webp_method=0, webp_size=2048): self.database_path = f'file:{path}?mode=ro' self.webp = enable_webp if self.webp is True: # lossy: 0-100 (used as quality) # lossless: 101-201 (101 subtracted and used as quality) if webp_quality > 100: webp_lossless = True webp_quality = webp_quality - 101 else: webp_lossless = False self.webp_config = { 'quality': webp_quality, 'method': webp_method, 'lossless': webp_lossless, 'size': webp_size } def create_cursor(self): conn = sqlite3.connect(self.database_path, uri=True) conn.row_factory = dict_factory return conn.cursor() def get_series_list(self): cursor = self.create_cursor() series = cursor.execute(''' select series.id as id, series.name as name, count(*) as volumes from books, books_series_link, data, series where books_series_link.series = series.id and books_series_link.book = books.id and books.id = data.book and data.format = \'CBZ\' group by series.name having min(books.series_index) ''') return series def get_series_cover(self, series_id): cursor = self.create_cursor() first_volume = cursor.execute(''' select books.id from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? group by series.name having min(books.series_index) ''', (str(series_id),)).fetchone() if first_volume is None: raise exceptions.NotFound() return self.get_volume_cover(first_volume['id']) def get_series_cover_thumbnail(self, series_id): return generate_thumbnail(self.get_series_cover(series_id)) def get_series_volumes(self, series_id): cursor = self.create_cursor() title = cursor.execute(''' select series.name from series where series.id = ? ''', (str(series_id),)).fetchone()['name'] volumes = cursor.execute(''' select books.id, books.title, books.series_index as "index" from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? order by books.series_index ''', (str(series_id),)).fetchall() return { 'title': title, 'volumes': volumes } def get_volume_cover(self, volume_id): cursor = self.create_cursor() volume = cursor.execute(''' select books.has_cover as has_cover, books.path as path from books where books.id = ? order by books.series_index ''', (str(volume_id),)).fetchone() if volume['has_cover']: return volume['path'] + '/cover.jpg' else: raise exceptions.NotFound() def get_volume_cover_thumbnail(self, volume_id): return generate_thumbnail(self.get_volume_cover(volume_id)) def get_volume_filepath(self, volume_id): cursor = self.create_cursor() location = cursor.execute(''' select books.path as path, data.name as filename, lower(data.format) as extension from books, data where data.book = books.id and books.id = ? ''', (str(volume_id),)).fetchone() if location is None: raise exceptions.NotFound() return location['path'] + '/' + location['filename'] + '.' + location['extension'] def get_volume_info(self, volume_id): cursor = self.create_cursor() volume_info = cursor.execute(''' select books.title, books_series_link.series from books, books_series_link where books_series_link.book = books.id and books.id = ? ''', (str(volume_id),)).fetchone() volume_info['pages'] = self.get_volume_page_number(volume_id) return volume_info def get_volume_page_number(self, volume_id): path = self.get_volume_filepath(volume_id) with ZipFile(path, 'r') as volume: return len(volume.filelist) def get_volume_page(self, volume_id, page_number): if page_number < 1: raise exceptions.NotFound() path = self.get_volume_filepath(volume_id) with ZipFile(path, 'r') as volume: try: zip_info = volume.filelist[page_number - 1] except IndexError: raise exceptions.NotFound() return None page_filename = zip_info.filename mimetype = mimetypes[os.path.splitext(page_filename)[1]] if self.webp is True and mimetype != 'image/webp': mimetype = 'image/webp' try: page_data = disk_cache.get(f'{volume_id}-{page_number}') except FileNotFoundError: with volume.open(page_filename) as orig_page_data: image = Image.open(orig_page_data) image.thumbnail(tuple([self.webp_config['size']] * 2)) page_data = BytesIO() image.save( page_data, format='webp', save_all=True, # append_images=[image], # https://github.com/python-pillow/Pillow/issues/4042 quality=self.webp_config['quality'], method=self.webp_config['method'], lossless=self.webp_config['lossless'] ) page_data.seek(0) disk_cache.set(f'{volume_id}-{page_number}', page_data) page_data.seek(0) else: page_data = BytesIO() page_data.write(volume.read(page_filename)) page_data.seek(0) return { 'data': page_data, 'mimetype': mimetype, 'etag': str(zip_info.CRC) }