from PIL import Image from io import BytesIO from zipfile import ZipFile import os.path import sqlite3 import werkzeug.exceptions as exceptions # https://docs.python.org/3.7/library/sqlite3.html#sqlite3.Connection.row_factory def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d def generate_thumbnail(filepath): image = Image.open(filepath) image.thumbnail((512, 512)) thumbnail = BytesIO() image.save(thumbnail, format='jpeg') thumbnail.seek(0) return thumbnail class CalibreDB: def __init__(self, path='metadata.db'): self.database_path = f'file:{path}?mode=ro' def create_cursor(self): conn = sqlite3.connect(self.database_path, uri=True) conn.row_factory = dict_factory return conn.cursor() def get_series_list(self): cursor = self.create_cursor() series = cursor.execute(''' select series.id as id, series.name as name, count(*) as volumes from books, books_series_link, data, series where books_series_link.series = series.id and books_series_link.book = books.id and books.id = data.book and data.format = \'CBZ\' group by series.name having min(books.series_index) ''') return series def get_series_cover(self, series_id): cursor = self.create_cursor() first_volume = cursor.execute(''' select books.id from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? group by series.name having min(books.series_index) ''', (str(series_id),)).fetchone() if first_volume is None: raise exceptions.NotFound() return self.get_volume_cover(first_volume['id']) def get_series_cover_thumbnail(self, series_id): return generate_thumbnail(self.get_series_cover(series_id)) def get_series_volumes(self, series_id): cursor = self.create_cursor() title = cursor.execute(''' select series.name from series where series.id = ? ''', (str(series_id),)).fetchone()['name'] volumes = cursor.execute(''' select books.id, books.title, books.series_index as "index" from books, books_series_link, series where books_series_link.book = books.id and books_series_link.series = series.id and series.id = ? order by books.series_index ''', (str(series_id),)).fetchall() return { 'title': title, 'volumes': volumes } def get_volume_cover(self, volume_id): cursor = self.create_cursor() volume = cursor.execute(''' select books.has_cover as has_cover, books.path as path from books where books.id = ? order by books.series_index ''', (str(volume_id),)).fetchone() if volume['has_cover']: return volume['path'] + '/cover.jpg' else: raise exceptions.NotFound() def get_volume_cover_thumbnail(self, volume_id): return generate_thumbnail(self.get_volume_cover(volume_id)) def get_volume_filepath(self, volume_id): cursor = self.create_cursor() location = cursor.execute(''' select books.path as path, data.name as filename, lower(data.format) as extension from books, data where data.book = books.id and books.id = ? ''', (str(volume_id),)).fetchone() if location is None: raise exceptions.NotFound() return location['path'] + '/' + location['filename'] + '.' + location['extension'] def get_volume_info(self, volume_id): cursor = self.create_cursor() volume_info = cursor.execute(''' select books.title, books_series_link.series from books, books_series_link where books_series_link.book = books.id and books.id = ? ''', (str(volume_id),)).fetchone() volume_info['pages'] = self.get_volume_page_number(volume_id) return volume_info def get_volume_page_number(self, volume_id): path = self.get_volume_filepath(volume_id) with ZipFile(path, 'r') as volume: return len(volume.filelist) def get_volume_page(self, volume_id, page_number): if page_number < 1: raise exceptions.NotFound() path = self.get_volume_filepath(volume_id) with ZipFile(path, 'r') as volume: try: page_filename = volume.filelist[page_number - 1].filename except IndexError: raise exceptions.NotFound() return None page_data = BytesIO() page_data.write(volume.read(page_filename)) page_data.seek(0) return { 'data': page_data, 'extension': os.path.splitext(page_filename)[1] }