site/encode.py

168 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python3
from datetime import datetime
from mutagen.flac import Picture
from mutagen.oggopus import OggOpus
from mutagen.oggvorbis import OggVorbis
from subprocess import run
from urllib.parse import urlparse
import base64
import os
import threading
import xml.etree.ElementTree as ET
import common
def encode_episode(podcast, episode, format):
format, options = format
infile = common.path_to_episode(episode["file_base"], "original")
outfile = common.path_to_episode(episode["file_base"], format)
content_file = common.path_to_episode(episode["file_base"], "md")
try:
changed = any(
os.path.getmtime(file) > os.path.getmtime(outfile)
for file in [infile, content_file, episode["poster"]]
)
except FileNotFoundError:
changed = True
if changed:
tags = {
"TITLE": episode["title"],
"ARTIST": ", ".join(episode["contributors"]),
"ALBUM": podcast["title"],
"TRACK": episode["number"],
"GENRE": "podcast",
"DATE": datetime.strftime(episode["date"], "%Y-%m-%d"),
"URL": podcast["link"],
"COMMENT": episode["summary"],
}
chapters = list(common.get_chapters(episode["file_base"]))
duration = float(
common.get_episode_info(episode["file_base"], "original")["duration"]
)
ffmpeg_chapters = b";FFMETADATA1\n\n"
for idx, chapter in enumerate(chapters):
ffmpeg_chapters += b"[CHAPTER]\nTIMEBASE=1/1000\n"
ffmpeg_chapters += f"START={int(chapter['start'] * 1000)}\n".encode("utf-8")
try:
ffmpeg_chapters += f"END={int(chapters[idx+1]['start'] * 1000)}\n".encode(
"utf-8"
)
except:
ffmpeg_chapters += f"END={int(duration * 1000)}\n".encode("utf-8")
escaped_title = (
chapter["title"]
.replace("=", "\=")
.replace(";", "\;")
.replace("#", "\#")
.replace("\\", "\\\\")
.replace("\n", "\\\n")
)
ffmpeg_chapters += f"title={escaped_title}\n\n".encode("utf-8")
command = ["./vendor/ffmpeg", "-y", "-loglevel", "error"]
command.extend(["-i", infile])
if not format in ["oga", "opus"]:
command.extend(["-i", episode["poster"]])
command.extend(["-i", "-"])
command.extend(["-c:v", "copy"])
command.extend(["-map_metadata", "1"])
command.extend(options)
for k, v in tags.items():
command.extend(["-metadata", f"{k}={v}"])
command.append(outfile)
run(command, input=ffmpeg_chapters, check=True)
if format in ["oga", "opus"]:
if format == "oga":
audio = OggVorbis(outfile)
else:
audio = OggOpus(outfile)
# poster
picture = Picture()
with open(episode["poster"], "rb") as f:
picture.data = f.read()
picture.type = 17
picture.desc = ""
picture.mime = "image/jpeg"
picture.width = 500
picture.height = 500
picture.depth = 24
audio["metadata_block_picture"] = [
base64.b64encode(picture.write()).decode("ascii")
]
# chapters for vorbis
if format == "oga":
for idx, chapter in enumerate(chapters):
audio[f"CHAPTER{idx:03}"] = common.sexagesimal(chapter["start"])
audio[f"CHAPTER{idx:03}NAME"] = chapter["title"]
audio.save()
print(f" {format}", end="", flush=True)
else:
print(f" ({format})", end="", flush=True)
os.makedirs("static/episodes", exist_ok=True)
tree = ET.parse("public/formats/opus/rss.xml")
root = tree.getroot()
channel = root.find("channel")
podcast = {
"title": channel.find("title").text,
"link": channel.find("link").text,
"poster": "static" + urlparse(channel.find("image").find("url").text).path,
}
for item in channel.findall("item"):
episode = {
"title": item.find("title").text,
"number": item.find("{http://www.itunes.com/dtds/podcast-1.0.dtd}episode").text,
"date": datetime.strptime(
item.find("pubDate").text, "%a, %d %b %Y %H:%M:%S %z"
),
"contributors": [
contributor.find("{http://www.w3.org/2005/Atom}name").text
for contributor in item.findall("{http://www.w3.org/2005/Atom}contributor")
],
"summary": item.find(
"{http://www.itunes.com/dtds/podcast-1.0.dtd}summary"
).text,
"file_base": os.path.splitext(
os.path.basename(item.find("enclosure").attrib["url"])
)[0],
}
episode_poster = f"content/{episode['file_base']}.jpg"
if os.path.isfile(episode_poster):
episode["poster"] = episode_poster
else:
episode["poster"] = podcast["poster"]
print(episode["file_base"], end="", flush=True)
threads = []
for format in common.FORMATS.items():
thread = threading.Thread(
target=encode_episode, args=(podcast, episode, format), daemon=True
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print()