This repository has been archived on 2020-11-22. You can view files and clone it, but cannot push or open issues or pull requests.
mangareader/backend.py
Simon Bruder 79a348acc9
All checks were successful
continuous-integration/drone/push Build is passing
Add disk cache
2019-08-07 20:54:55 +00:00

261 lines
7.8 KiB
Python

from PIL import Image
from disk_cache import DiskCache
from io import BytesIO
from mimetypes import types_map as mimetypes
from zipfile import ZipFile
from zlib import crc32
import os.path
import sqlite3
import webp
import werkzeug.exceptions as exceptions
mimetypes['.webp'] = 'image/webp'
if os.environ.get('DISK_CACHE', '1') == '0':
disk_cache = DiskCache(enabled=False)
else:
disk_cache = DiskCache()
thumbnail_cache = {}
# https://docs.python.org/3.7/library/sqlite3.html#sqlite3.Connection.row_factory
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def generate_thumbnail(filepath):
if filepath not in thumbnail_cache:
image = Image.open(filepath)
image.thumbnail((512, 512))
thumbnail = BytesIO()
image.save(thumbnail, 'webp')
thumbnail.seek(0)
data = thumbnail.read()
etag = str(crc32(data))
thumbnail_cache[filepath] = {
'data_raw': data,
'etag': etag,
'mimetype': 'image/webp'
}
thumbnail = thumbnail_cache[filepath]
thumbnail['data'] = BytesIO()
thumbnail['data'].write(thumbnail['data_raw'])
thumbnail['data'].seek(0)
return thumbnail
class CalibreDB:
def __init__(self, path='metadata.db', enable_webp=True, webp_quality=80, webp_size=2048):
self.database_path = f'file:{path}?mode=ro'
self.webp = enable_webp
if self.webp is True:
if webp_quality == 101:
lossless = True
webp_quality = 100
else:
lossless = False
self.webp_config = webp.WebPConfig.new(
preset=webp.WebPPreset.DRAWING,
quality=webp_quality,
lossless=lossless
)
self.webp_size = webp_size
def create_cursor(self):
conn = sqlite3.connect(self.database_path, uri=True)
conn.row_factory = dict_factory
return conn.cursor()
def get_series_list(self):
cursor = self.create_cursor()
series = cursor.execute('''
select
series.id as id,
series.name as name,
count(*) as volumes
from
books,
books_series_link,
data,
series
where
books_series_link.series = series.id and
books_series_link.book = books.id and
books.id = data.book and
data.format = \'CBZ\'
group by series.name
having min(books.series_index)
''')
return series
def get_series_cover(self, series_id):
cursor = self.create_cursor()
first_volume = cursor.execute('''
select
books.id
from
books,
books_series_link,
series
where
books_series_link.book = books.id and
books_series_link.series = series.id and
series.id = ?
group by series.name
having min(books.series_index)
''', (str(series_id),)).fetchone()
if first_volume is None:
raise exceptions.NotFound()
return self.get_volume_cover(first_volume['id'])
def get_series_cover_thumbnail(self, series_id):
return generate_thumbnail(self.get_series_cover(series_id))
def get_series_volumes(self, series_id):
cursor = self.create_cursor()
title = cursor.execute('''
select
series.name
from
series
where
series.id = ?
''', (str(series_id),)).fetchone()['name']
volumes = cursor.execute('''
select
books.id,
books.title,
books.series_index as "index"
from
books,
books_series_link,
series
where
books_series_link.book = books.id and
books_series_link.series = series.id and
series.id = ?
order by books.series_index
''', (str(series_id),)).fetchall()
return {
'title': title,
'volumes': volumes
}
def get_volume_cover(self, volume_id):
cursor = self.create_cursor()
volume = cursor.execute('''
select
books.has_cover as has_cover,
books.path as path
from
books
where
books.id = ?
order by books.series_index
''', (str(volume_id),)).fetchone()
if volume['has_cover']:
return volume['path'] + '/cover.jpg'
else:
raise exceptions.NotFound()
def get_volume_cover_thumbnail(self, volume_id):
return generate_thumbnail(self.get_volume_cover(volume_id))
def get_volume_filepath(self, volume_id):
cursor = self.create_cursor()
location = cursor.execute('''
select
books.path as path,
data.name as filename,
lower(data.format) as extension
from
books,
data
where
data.book = books.id and
books.id = ?
''', (str(volume_id),)).fetchone()
if location is None:
raise exceptions.NotFound()
return location['path'] + '/' + location['filename'] + '.' + location['extension']
def get_volume_info(self, volume_id):
cursor = self.create_cursor()
volume_info = cursor.execute('''
select
books.title,
books_series_link.series
from
books,
books_series_link
where
books_series_link.book = books.id and
books.id = ?
''', (str(volume_id),)).fetchone()
volume_info['pages'] = self.get_volume_page_number(volume_id)
return volume_info
def get_volume_page_number(self, volume_id):
path = self.get_volume_filepath(volume_id)
with ZipFile(path, 'r') as volume:
return len(volume.filelist)
def get_volume_page(self, volume_id, page_number):
if page_number < 1:
raise exceptions.NotFound()
path = self.get_volume_filepath(volume_id)
with ZipFile(path, 'r') as volume:
try:
zip_info = volume.filelist[page_number - 1]
except IndexError:
raise exceptions.NotFound()
return None
page_filename = zip_info.filename
mimetype = mimetypes[os.path.splitext(page_filename)[1]]
if self.webp is True and mimetype != 'image/webp':
mimetype = 'image/webp'
try:
page_data = disk_cache.get(f'{volume_id}-{page_number}')
except FileNotFoundError:
with volume.open(page_filename) as orig_page_data:
image = Image.open(orig_page_data)
image.thumbnail((self.webp_size, self.webp_size))
image = image.convert('RGB')
image = webp.WebPPicture.from_pil(image)
page_data = BytesIO(image.encode(self.webp_config).buffer())
disk_cache.set(f'{volume_id}-{page_number}', page_data)
page_data.seek(0)
else:
page_data = BytesIO()
page_data.write(volume.read(page_filename))
page_data.seek(0)
return {
'data': page_data,
'mimetype': mimetype,
'etag': str(zip_info.CRC)
}