#!/usr/bin/env python3 # -*- encoding: utf-8 -*- # # This script is licensed under GNU GPL version 2.0 or above # (c) 2024 Antonio J. Delgado """Get YouTube videos from a series of channel's feeds""" import sys import os import re import time import stat import json import logging from logging.handlers import SysLogHandler from yaml import dump try: from yaml import CDumper as Dumper except ImportError: from yaml import Dumper import click import click_config_file import requests import feedparser import yt_dlp HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/')) if HOME_FOLDER == '/': CACHE_FOLDER = '/var/cache' LOG_FOLDER = '/var/log/' else: CACHE_FOLDER = f"{HOME_FOLDER}/.local/" LOG_FOLDER = f"{HOME_FOLDER}/log/" class GetYoutubeVideos: '''Get YouTube videos from a series of channels''' def __init__(self, **kwargs): self.time_duration_units = ( ('week', 60*60*24*7), ('day', 60*60*24), ('hour', 60*60), ('min', 60), ('sec', 1) ) self.config = kwargs if 'log_file' not in kwargs or kwargs['log_file'] is None: self.config['log_file'] = os.path.join( os.environ.get( 'HOME', os.environ.get( 'USERPROFILE', os.getcwd() ) ), 'log', 'get_youtube_videos.log' ) self._init_log() self._default_data = { "last_update": 0, "proxies": {}, } self.data = self._read_cached_data() self.summary = { 'config': self.config, 'entries_count': 0, 'skipped_videos': 0, 'downloaded_videos': 0, 'videos_with_error': 0, 'possible_proxy_errors': 0, 'possible_ban_errors': 0, 'age_limit_errors': 0, 'georestricted_videos': 0, 'downloaded_videos_titles': [], } if len(self.config['proxy']) > 0: for proxy in self.config['proxy']: if proxy not in self._default_data['proxies']: self._default_data['proxies'][proxy] = { 'failures': 0 } less_failures = -1 for proxy in self._default_data['proxies']: if ( less_failures == -1 or self._default_data['proxies'][proxy]['failures'] < less_failures ): less_failures = self._default_data['proxies'][proxy]['failures'] self.selected_proxy = proxy else: self._default_data['proxies'] = {} self.selected_proxy = '' self.proxy_index = 0 if os.path.exists(self.config['downloaded_database']): with open(self.config['downloaded_database'], 'r', encoding='utf-8') as db_file: self.downloaded_items = db_file.read().split('\n') else: self.downloaded_items = [] self.session = requests.Session() self._process_channels() self._log.info( dump( { "Summary": self.summary }, Dumper=Dumper ) ) self._save_metrics(self.summary) def close(self): '''Close class and save data''' self._save_cached_data(self.data) def _read_cached_data(self): if os.path.exists(self.config['cache_file']): with open(self.config['cache_file'], 'r', encoding='utf-8') as cache_file: try: cached_data = json.load(cache_file) if ( 'last_update' in cached_data and cached_data['last_update'] + self.config['max_cache_age'] > time.time() ) or self.config['max_cache_age'] == -1: cached_data = self._default_data except json.decoder.JSONDecodeError: cached_data = self._default_data return cached_data else: return self._default_data def _save_cached_data(self, data): data['last_update'] = time.time() with open(self.config['cache_file'], 'w', encoding='utf-8') as cache_file: json.dump(data, cache_file, indent=2) self._log.debug( "Saved cached data in '%s'", self.config['cache_file'] ) def _is_numeric(self, variable): if isinstance(variable, (int, float, complex)): return True return False def _save_metrics(self, data): with open( '/var/lib/prometheus/node-exporter/get_youtube_videos.prom', 'w', encoding='UTF-8' ) as metrics_file: metrics_file.write('''# HELP get_youtube_summary Get YouTube videos summary metrics. # TYPE get_youtube_summary counter\n''') for key in data: if self._is_numeric(data[key]): metrics_file.write(f"get_youtube_videos_{key} {data[key]}\n") elif isinstance(data[key], bool): value = 0 if data[key]: value = '1' metrics_file.write(f"get_youtube_videos_{key} {value}\n") os.chmod('/var/lib/prometheus/node-exporter/get_youtube_videos.prom', stat.S_IROTH) def _change_proxy(self, video_id): less_failures = -1 self._default_data['proxies'][self.selected_proxy]['failures'] += 1 previous_proxy = self.selected_proxy for proxy in self._default_data['proxies']: if ( less_failures == -1 or self._default_data['proxies'][proxy]['failures'] < less_failures ): less_failures = self._default_data['proxies'][proxy]['failures'] self.selected_proxy = proxy if less_failures == -1: self.selected_proxy = list(self._default_data['proxies'].keys())[0] self._log.debug( "All proxies have the same number of errors, using first proxy of the list." ) self._log.warning( "Got an error fetching video information with proxy '%s'. Setting proxy to '%s'...", previous_proxy, self.selected_proxy ) return self._get_video_info(video_id) def _get_video_info(self, video_id): uri=f"https://www.youtube.com/watch?v={video_id}" ydl_opts = { 'logger': self._log, 'progress_hooks': [self._yt_progress_hook], } if self.selected_proxy != '': ydl_opts['proxy'] = self.selected_proxy with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: raw_video_info = ydl.extract_info(uri, download=False) except yt_dlp.utils.DownloadError as error: if 'The uploader has not made this video available in your country' in f"{error}": self.summary['videos_with_error'] += 1 self.summary['georestricted_videos'] += 1 result = self._change_proxy(video_id) if not result: return None return result if 'HTTP Error 403: Forbidden' in f"{error}": result = self._change_proxy(video_id) if not result: self.summary['videos_with_error'] += 1 self.summary['possible_proxy_errors'] += 1 return None return result if 'not a bot' in f"{error}": # elif age_limit_errors? result = self._change_proxy(video_id) if not result: self.summary['videos_with_error'] += 1 self.summary['possible_ban_errors'] += 1 return None return result if 'Failed to extract any player response' in f"{error}": self._log.error( "Possible error connecting to proxy '%s'", self.selected_proxy ) result = self._change_proxy(video_id) if not result: self.summary['videos_with_error'] += 1 self.summary['possible_proxy_errors'] += 1 return None return result if 'HTTP Error 429: Too Many Requests' in f"{error}": result = self._change_proxy(video_id) if not result: self.summary['videos_with_error'] += 1 self.summary['possible_proxy_errors'] += 1 return None return result # if 'This live event will begin in a few moments' in f"{error}": # self.summary['skipped_videos'] += 1 self.summary['skipped_videos'] += 1 self._log.debug( "Skipping video. Error: %s", error ) return None video_info = ydl.sanitize_info(raw_video_info) return video_info def _process_channels(self): self.total_count = 0 self.channels_count = 0 self.summary['total_channels'] = len(self.config['channels']) for channel in self.config['channels']: self.channels_count += 1 self._log.debug( "Processing channel %s/%s '%s'...", self.channels_count, len(self.config['channels']), channel ) feed = feedparser.parse( f"https://www.youtube.com/feeds/videos.xml?channel_id={channel}" ) self.channel_count = 0 self.entries_count = 0 for entry in feed['entries']: self.entries_count += 1 self._log.debug( "Processing video entry %s/%s...", self.entries_count, len(feed['entries']) ) result=re.search('v=([0-9a-zA-Z-_]{11})',entry['link']) if result: video_id=result.group(1) if video_id not in self.downloaded_items: video_info = self._get_video_info(video_id) downloadable_subtitles = [] if not video_info: break if video_info['subtitles']: for subtitle in self.config['subtitle_langs']: if subtitle in video_info['subtitles']: downloadable_subtitles.append(subtitle) info_filename = os.path.join( self.config['download_dir'], f"{video_id}.json" ) self._log.debug( "Writting information in to file '%s'", info_filename ) with open(info_filename, 'w', encoding='utf-8') as info_file: json.dump(video_info, info_file, indent=2) if ( self.config['skip_live_videos'] and video_info['live_status'] == 'is_live' ): self._log.debug( "Skipping video '%s' as it's a live video", video_info.get('title', '?') ) self.summary['skipped_videos'] += 1 self.downloaded_items.append(video_id) with open( self.config['downloaded_database'], 'w', encoding='utf-8' ) as db_file: for item in self.downloaded_items: db_file.write(f"{item}\n") break if video_info['was_live']: self._log.debug( "Skipping video '%s' as it was a live video", video_info.get('title', '?') ) self.summary['skipped_videos'] += 1 self._save_downloaded_items(video_id) break if ('duration' in video_info and video_info['duration'] > self.config['max_length']): self._log.debug( "Skipping video '%s' as it was larger than %s", video_info.get('title', '?'), self._human_time_duration(self.config['max_length']) ) self.summary['skipped_videos'] += 1 self._save_downloaded_items(video_id) break if 'duration' not in video_info: self._log.debug( "Skipping video '%s' as there is no video duration", video_info.get('title', '?') ) self.summary['skipped_videos'] += 1 self._save_downloaded_items(video_id) break self._log.info( "Downloading. Filename: '%s'. Video ID: '%s'. Duration: %s. Counts: %s/%s - %s/%s)", video_info.get('title', '?'), video_id, self._human_time_duration(video_info.get('duration', '-1')), self.total_count+1, self.config['total_limit'], self.channel_count+1, self.config['channel_limit'] ) if self.config['channels_folder'] and 'channel' in video_info: download_dir = os.path.join( self.config['download_dir'], video_info['channel'] ) else: download_dir = self.config['download_dir'] ydl_opts = { 'paths': { 'temp': '/tmp', 'home': download_dir }, 'writesubtitles': True, 'writeautomaticsub': True, 'writeannotations': True, 'write_all_thumbnails': False, 'writethumbnail': True, 'writeinfojson': True, 'subtitlesformat': 'srt', 'subtitleslangs': downloadable_subtitles, 'allow_multiple_audio_streams': True, 'noprogress': True, 'merge_output_format': 'mkv', # 'format': 'bestvideo+bestaudio[ext=m4a]/best', 'format': '248+ba', # 'extractor_args': {'youtube': {'player-client': ['mweb']}} } if self.selected_proxy != '': ydl_opts['proxy'] = self.selected_proxy with yt_dlp.YoutubeDL(ydl_opts) as ydl: result = 'starting' while result != 'downloaded' and result != 'error': self._log.debug( f"Download status: {result}" ) try: uri=f"https://www.youtube.com/watch?v={video_id}" return_code = ydl.download(uri) self._process_download( { "return_code": return_code, 'info_dict': { 'id': video_id, }, 'filename': video_info.get('title', '?'), 'video_info': video_info, } ) result = 'downloaded' self.summary['downloaded_videos_titles'].append( video_info.get('title', '?') ) self.summary['downloaded_videos'] += 1 except yt_dlp.utils.DownloadError as error: if 'Requested format is not available' in f"{error}": self._log.warning( "Requested format is not available, trying best format." ) ydl_opts['format'] = 'bestvideo+bestaudio[ext=m4a]/best' result = 'retrying (format error)' elif 'Unable to download video subtitles for' in f"{error}": subtitle_match = re.match( r"Unable to download video subtitles for '([a-z]*)':", f"{error}" ) if not subtitle_match: self._log.error( f"Error finding subtitle that failed in error string" ) else: ydl_opts['subtitleslangs'].pop(subtitle_match.group(1)) result = 'retrying (subtitles error)' else: self._log.error( "Error getting video with proxy '%s'. %s", self.selected_proxy, error ) self.summary['videos_with_error'] += 1 result = error continue else: self._log.debug( "Video with ID '%s' has been already downloaded", video_id ) self.summary['skipped_videos'] += 1 else: self._log.error( "Error! Video ID not found in URI '%s'", entry['link'] ) self.summary['videos_with_error'] += 1 self.summary['entries_count'] += self.entries_count self.summary['processed_channels'] = self.channels_count self.summary['total_count'] = self.total_count def _process_download(self, data): self.total_count += 1 self.channel_count += 1 if self.total_count == self.config['total_limit']: self._log.info( "Limit (%s) reached for videos from all channels", self.config['total_limit'], ) sys.exit(0) if self.channel_count == self.config['channel_limit']: self._log.info( "Limit (%s) reached for videos for this channel '%s'", self.config['channel_limit'], data['info_dict'].get('channel', '?') ) # break info_filename = os.path.join( self.config['download_dir'], f"{os.path.basename(data['filename'])}.download_info.json" ) self._log.debug( "Writting download information in to file '%s'", info_filename ) with open(info_filename, 'w', encoding='utf-8') as info_file: json.dump(data, info_file, indent=2) if 'id' in data['info_dict']: self._save_downloaded_items(data['info_dict']['id']) def _yt_progress_hook(self, data): if data['status'] == 'finished': self._process_download(data) elif data['status'] == 'downloading': self._log.debug( "Still downloading..." ) else: self._log.debug( "Progress hook got data['status']=%s instead of 'finished'", data['status'] ) def _save_downloaded_items(self, video_id): self.downloaded_items.append(video_id) with open(self.config['downloaded_database'], 'w', encoding='utf-8') as db_file: for item in self.downloaded_items: db_file.write(f"{item}\n") def _human_time_duration(self, seconds): '''Return time duration in a human readable formated string''' if seconds == 0: return 'inf' parts = [] for unit, div in self.time_duration_units: amount, seconds = divmod(int(seconds), div) if amount > 0: parts.append(f'{amount} {unit}{"" if amount == 1 else "s"}') return ', '.join(parts) def _init_log(self): ''' Initialize log object ''' self._log = logging.getLogger("get_youtube_videos") self._log.setLevel(logging.DEBUG) sysloghandler = SysLogHandler() sysloghandler.setLevel(logging.DEBUG) self._log.addHandler(sysloghandler) streamhandler = logging.StreamHandler(sys.stdout) streamhandler.setLevel( logging.getLevelName(self.config.get("debug_level", 'INFO')) ) self._log.addHandler(streamhandler) if 'log_file' in self.config: log_file = self.config['log_file'] else: home_folder = os.environ.get( 'HOME', os.environ.get('USERPROFILE', '') ) log_folder = os.path.join(home_folder, "log") log_file = os.path.join(log_folder, "get_youtube_videos.log") if not os.path.exists(os.path.dirname(log_file)): os.mkdir(os.path.dirname(log_file)) filehandler = logging.handlers.RotatingFileHandler( log_file, maxBytes=102400000 ) # create formatter formatter = logging.Formatter( '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' ) filehandler.setFormatter(formatter) filehandler.setLevel(logging.DEBUG) self._log.addHandler(filehandler) return True @click.command() @click.option( "--debug-level", "-d", default="INFO", type=click.Choice( ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], case_sensitive=False, ), help='Set the debug level for the standard output.' ) @click.option('--log-file', '-l', help="File to store all debug messages.") # @click.option("--dummy","-n", is_flag=True, # help="Don't do anything, just show what would be done.") @click.option( '--proxy', '-p', multiple=True, help='URLs for proxies to use in case of errors.' ) @click.option( '--downloaded-database', '-D', default=f"{os.environ.get( 'HOME', os.environ.get('USERPROFILE', '') )}/.config/downloaded_youtube_videos", help='File to store the IDs of downloaded videos' ) @click.option( '--download-dir', '-f', default=f"{os.environ.get( 'HOME', os.environ.get('USERPROFILE', '') )}/downloaded_youtube_videos", help='Folder to store the downloaded videos' ) @click.option( '--channels', '-S', multiple=True, required=True, help='YouTube channels IDs to look up' ) @click.option( '--channel-limit', '-r', default=5, type=int, help='Maximun number of videos to download from a channel' ) @click.option( '--total-limit', '-L', default=5, type=int, help='Maximun number of videos to download in total' ) @click.option( '--max-length', '-d', default=5400, type=int, help='Maximun duration of videos to download in seconds' ) @click.option( '--subtitle-langs', '-s', multiple=True, default=['en.*'], help='''List of languages of the subtitles to download (can be regex). The list may contain "all" to refer to all the available subtitles. The language can be prefixed with a "-" to exclude it from the requested languages, e.g. ['all', '-live_chat']. And you can use wildcards like en.*''' ) @click.option( '--channels-folder', '-c', is_flag=True, default=False, help='Create folders per channel and save videos inside their channel folder' ) @click.option( '--skip-live-videos', '-S', is_flag=True, default=False, help='Skip live videos' ) @click.option( '--cache-file', '-F', default=f"{CACHE_FOLDER}/get_youtube_videos.json", help='Cache file to store data from each run', ) @click.option( '--max-cache-age', '-a', default=60*60*24*7, help='Max age in seconds for the cache' ) @click_config_file.configuration_option() def __main__(**kwargs): obj = GetYoutubeVideos(**kwargs) obj.close() if __name__ == "__main__": __main__()