#!/usr/bin/env python3 import base64 import os import threading import xml.etree.ElementTree as ET from datetime import datetime from subprocess import run from urllib.parse import urlparse from mutagen.flac import Picture from mutagen.oggopus import OggOpus from mutagen.oggvorbis import OggVorbis import common def encode_episode(podcast, episode, format): format, options = format infile = common.path_to_episode(episode["file_base"], "original") outfile = common.path_to_episode(episode["file_base"], format) content_file = common.path_to_episode(episode["file_base"], "md") try: changed = any( os.path.getmtime(file) > os.path.getmtime(outfile) for file in [infile, content_file, episode["poster"]] ) except FileNotFoundError: changed = True if changed: tags = { "TITLE": episode["title"], "ARTIST": ", ".join(episode["contributors"]), "ALBUM": podcast["title"], "TRACK": episode["number"], "GENRE": "podcast", "DATE": datetime.strftime(episode["date"], "%Y-%m-%d"), "URL": podcast["link"], "COMMENT": episode["summary"], } command = ["ffmpeg", "-y", "-loglevel", "error"] command.extend(["-i", infile]) if format not in ["oga", "opus"]: command.extend(["-i", episode["poster"]]) command.extend(["-c:v", "copy"]) command.extend(options) for k, v in tags.items(): command.extend(["-metadata", f"{k}={v}"]) command.append(outfile) run(command, check=True) if format in ["oga", "opus"]: if format == "oga": audio = OggVorbis(outfile) else: audio = OggOpus(outfile) # poster picture = Picture() with open(episode["poster"], "rb") as f: picture.data = f.read() picture.type = 17 picture.desc = "" picture.mime = "image/jpeg" picture.width = 500 picture.height = 500 picture.depth = 24 audio["metadata_block_picture"] = [ base64.b64encode(picture.write()).decode("ascii") ] audio.save() print(f" {format}", end="", flush=True) else: print(f" ({format})", end="", flush=True) os.makedirs("static/episodes", exist_ok=True) tree = ET.parse("public/formats/opus/rss.xml") root = tree.getroot() channel = root.find("channel") podcast = { "title": channel.find("title").text, "link": channel.find("link").text, "poster": "static" + urlparse(channel.find("image").find("url").text).path, } for item in channel.findall("item"): episode = { "title": item.find("title").text, "number": item.find("{http://www.itunes.com/dtds/podcast-1.0.dtd}episode").text, "date": datetime.strptime( item.find("pubDate").text, "%a, %d %b %Y %H:%M:%S %z" ), "contributors": [ contributor.find("{http://www.w3.org/2005/Atom}name").text for contributor in item.findall("{http://www.w3.org/2005/Atom}contributor") ], "summary": item.find( "{http://www.itunes.com/dtds/podcast-1.0.dtd}summary" ).text, "file_base": os.path.splitext( os.path.basename(item.find("enclosure").attrib["url"]) )[0], } episode_poster = f"content/{episode['file_base']}.jpg" if os.path.isfile(episode_poster): episode["poster"] = episode_poster else: episode["poster"] = podcast["poster"] print(episode["file_base"], end="", flush=True) threads = [] for format in common.FORMATS.items(): thread = threading.Thread( target=encode_episode, args=(podcast, episode, format), daemon=True ) thread.start() threads.append(thread) for thread in threads: thread.join() print()