#!/usr/bin/env python3 # -*- encoding: utf-8 -*- # # This script is licensed under GNU GPL version 2.0 or above # (c) 2024 Antonio J. Delgado """Get PeerTube videos from a series of channel's feeds""" import sys import os import json import logging from logging.handlers import SysLogHandler import subprocess import click import click_config_file import requests from transmission_rpc import Client import feedparser class GetPeertubeVideos: '''Get PeerTube videos from a series of URIs containing the JSON feed''' def __init__(self, **kwargs): self.config = kwargs if 'log_file' not in kwargs or kwargs['log_file'] is None: self.config['log_file'] = os.path.join( os.environ.get( 'HOME', os.environ.get( 'USERPROFILE', os.getcwd() ) ), 'log', 'get_peertube_videos.log' ) self._init_log() self.downloaded_items = [] self._get_downloaded_items() self.trans = Client( host=self.config['tr_host'], port=self.config['tr_port'], username=self.config['tr_user'], password=self.config['tr_password'], path=self.config['tr_path'], ) self.session=requests.Session() self._process_uris() def _process_uris(self): for uri in self.config['uris']: if '.json' in uri: result = self.session.get(uri) json_data = result.json() for item in json_data['items']: self._process_item(item) elif '.rss' in uri: feed = feedparser.parse(uri) for entry in feed['entries']: self._process_item(entry) def _process_item(self, item): self._log.debug( "Processing item: '%s' (%s)", item['title'], item['id'] ) if item['id'] not in self.downloaded_items: if 'attachments' in item: attachments = item['attachments'] size_field = 'size_in_bytes' elif 'media_content' in item: attachments = item['media_content'] size_field = 'filesize' else: self._log.error( "Unrecognize item in feed. %s", json.dumps(item, indent=2) ) return None selected={ size_field: 0, } self._log.debug( "%s attachments (videos) for this item", len(attachments) ) for attachment in attachments: if size_field not in attachment: self._log.error( "Attachment without file size. %s", attachment ) if int(attachment[size_field]) > int(selected[size_field]): selected = attachment if 'url' not in selected: self._log.error( "No attachments with size bigger than 0. No torrent to add." ) return False if '.torrent' in selected['url']: self._log.info( "Adding torrent '%s' for video '%s'...", selected['url'], selected['title'] ) result_torrent = self.session.get(selected['url']) torrent_bytes = result_torrent.content self._log.debug( "Torrent file downloaded with %s bytes of data", len(torrent_bytes) ) result_add = self.trans.add_torrent( torrent_bytes, download_dir=self.config['download_dir'], labels=item['tags'] ) self._log.debug( "Torrent added to Transmission with result: %s", result_add ) self.downloaded_items.append(item['id']) self._write_downloaded_items() elif 'video' in selected['type']: file_extension = selected['type'].replace('video/', '.') file_name = os.path.normpath(f"{item['title']}{file_extension}").replace('\\', '-').replace('/', '-').replace(':', '.').replace('...', '') full_file_name = os.path.join(self.config['download_dir'], file_name) self._log.info( "Downloading video '%s' as '%s'...", selected['url'], full_file_name ) result = self.session.get(selected['url']) video_bytes = result.content self._log.debug( 'Saving video to file...' ) with open(full_file_name, mode='wb') as video_file: video_file.write(video_bytes) sel_thumb = '' for thumb in item['media_thumbnail']: if 'preview' in thumb['url']: sel_thumb = thumb if 'url' in sel_thumb: split_url = sel_thumb['url'].split('.') thumb_file_name = file_name.replace(file_extension, f".{split_url[len(split_url)-1]}") full_thumb_file_name = os.path.join(self.config['download_dir'], thumb_file_name) result = self.session.get(sel_thumb['url']) thumb_bytes = result.content self._log.debug( 'Saving thumbnail to file...' ) with open(full_thumb_file_name, mode='wb') as thumb_file: thumb_file.write(thumb_bytes) self._add_thumbnail_to_video(full_file_name, full_thumb_file_name) else: self._log.debug( "Item already downloaded, skipping." ) def _add_thumbnail_to_video(self, video_file, thumb_file): self._log.debug( 'Looking for ffmpeg tool...' ) result = subprocess.run(["which", "ffmpeg"], capture_output=True, check=False) if result.stdout == b'': self._log.error( "Error finding ffmpeg, thumbnail won't be added to video." ) return None self._log.debug( "Adding thumbnail '%s' to file '%s'...", thumb_file, video_file ) result = subprocess.run( [ 'ffmpeg', '-i', video_file, '-i', thumb_file, '-map', '1', '-map', '0', '-c', 'copy', '-disposition:0', 'attached_pic', f"{video_file}_with_thumb" ], capture_output=True, check=False ) if result.returncode != 0: self._log.error( "Error %s adding thumnail. Output: %s. Errors: %s", result.returncode, result.stdout, result.stderr ) os.remove(video_file) os.rename(f"{video_file}_with_thumb", video_file) def _write_downloaded_items(self): with open(self.config['downloaded_database'], 'w', encoding='utf-8') as db_file: for download_item in self.downloaded_items: db_file.write(f"{download_item}\n") def _get_downloaded_items(self): if os.path.exists(self.config['downloaded_database']): self._log.debug( "Reading already downloaded items from '%s'...", self.config['downloaded_database'] ) with open(self.config['downloaded_database'], 'r', encoding='utf-8') as db_file: self.downloaded_items = db_file.read().split('\n') else: self._log.debug( "Initializing downloaded items database '%s'...", self.config['downloaded_database'] ) self._write_downloaded_items() def _init_log(self): ''' Initialize log object ''' self._log = logging.getLogger("get_peertube_videos") self._log.setLevel(logging.DEBUG) sysloghandler = SysLogHandler() sysloghandler.setLevel(logging.DEBUG) self._log.addHandler(sysloghandler) streamhandler = logging.StreamHandler(sys.stdout) streamhandler.setLevel( logging.getLevelName(self.config.get("debug_level", 'INFO')) ) self._log.addHandler(streamhandler) if 'log_file' in self.config: log_file = self.config['log_file'] else: home_folder = os.environ.get( 'HOME', os.environ.get('USERPROFILE', '') ) log_folder = os.path.join(home_folder, "log") log_file = os.path.join(log_folder, "get_peertube_videos.log") if not os.path.exists(os.path.dirname(log_file)): os.mkdir(os.path.dirname(log_file)) filehandler = logging.handlers.RotatingFileHandler( log_file, maxBytes=102400000 ) # create formatter formatter = logging.Formatter( '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' ) filehandler.setFormatter(formatter) filehandler.setLevel(logging.DEBUG) self._log.addHandler(filehandler) return True @click.command() @click.option( "--debug-level", "-d", default="INFO", type=click.Choice( ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], case_sensitive=False, ), help='Set the debug level for the standard output.' ) @click.option('--log-file', '-l', help="File to store all debug messages.") @click.option( '--uris', '-u', multiple=True, required=True, help='PeerTube channels URI to look up' ) @click.option( '--downloaded-database', '-d', default=f"{os.environ.get('HOME', os.environ.get('USERPROFILE', ''))}/.config/downloaded_youtube_videos", help='File to store the IDs of downloaded videos' ) @click.option( '--download-dir', '-f', default=f"{os.environ.get('HOME', os.environ.get('USERPROFILE', ''))}/downloaded_youtube_videos", help='Folder to store the downloaded videos' ) @click.option( '--tr-host', '-H', default='localhost', help='Transmission daemon host' ) @click.option( '--tr-port', '-p', default=12345, help='Transmission daemon RPC port' ) @click.option( '--tr-user', '-u', default='transmission', help='Transmission daemon user name' ) @click.option( '--tr-password', '-p', default='', help='Transmission daemon user password' ) @click.option( '--tr-path', '-P', default='/transmission/', help='Transmission daemon RPC path' ) @click_config_file.configuration_option() def __main__(**kwargs): return GetPeertubeVideos(**kwargs) if __name__ == "__main__": __main__()