481 lines
19 KiB
Python
481 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
#
|
|
# This script is licensed under GNU GPL version 2.0 or above
|
|
# (c) 2024 Antonio J. Delgado
|
|
"""Get YouTube videos from a series of channel's feeds"""
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import json
|
|
from yaml import dump
|
|
try:
|
|
from yaml import CDumper as Dumper
|
|
except ImportError:
|
|
from yaml import Dumper
|
|
import logging
|
|
from logging.handlers import SysLogHandler
|
|
import click
|
|
import click_config_file
|
|
import requests
|
|
import feedparser
|
|
import yt_dlp
|
|
|
|
|
|
class GetYoutubeVideos:
|
|
'''Get YouTube videos from a series of channels'''
|
|
|
|
def __init__(self, **kwargs):
|
|
self.time_duration_units = (
|
|
('week', 60*60*24*7),
|
|
('day', 60*60*24),
|
|
('hour', 60*60),
|
|
('min', 60),
|
|
('sec', 1)
|
|
)
|
|
self.config = kwargs
|
|
if 'log_file' not in kwargs or kwargs['log_file'] is None:
|
|
self.config['log_file'] = os.path.join(
|
|
os.environ.get(
|
|
'HOME',
|
|
os.environ.get(
|
|
'USERPROFILE',
|
|
os.getcwd()
|
|
)
|
|
),
|
|
'log',
|
|
'get_youtube_videos.log'
|
|
)
|
|
self._init_log()
|
|
self.summary = {
|
|
'config': self.config,
|
|
'entries_count': 0,
|
|
'skipped_videos': 0,
|
|
'downloaded_videos': 0,
|
|
'videos_with_error': 0,
|
|
'possible_proxy_errors': 0,
|
|
'possible_ban_errors': 0,
|
|
'age_limit_errors': 0,
|
|
}
|
|
self.selected_proxy = ''
|
|
self.proxy_index = 0
|
|
if os.path.exists(self.config['downloaded_database']):
|
|
with open(self.config['downloaded_database'], 'r', encoding='utf-8') as db_file:
|
|
self.downloaded_items = db_file.read().split('\n')
|
|
else:
|
|
self.downloaded_items = []
|
|
self.session = requests.Session()
|
|
self._process_channels()
|
|
self._log.info(
|
|
dump(
|
|
{
|
|
"Summary": self.summary
|
|
},
|
|
Dumper=Dumper
|
|
)
|
|
)
|
|
|
|
def _change_proxy(self, video_id):
|
|
if len(self.config['proxy']) > 0:
|
|
if self.proxy_index<len(self.config['proxy'])-1:
|
|
self.selected_proxy = self.config['proxy'][self.proxy_index]
|
|
self.proxy_index += 1
|
|
self._log.warning(
|
|
"Got an error fetching video information. Setting proxy to '%s'...",
|
|
self.selected_proxy
|
|
)
|
|
return self._get_video_info(video_id)
|
|
self.proxy_index = 0
|
|
self.selected_proxy = ''
|
|
return None
|
|
|
|
def _get_video_info(self, video_id):
|
|
uri=f"https://www.youtube.com/watch?v={video_id}"
|
|
ydl_opts = {
|
|
'logger': self._log,
|
|
'progress_hooks': [self._yt_progress_hook],
|
|
}
|
|
if self.selected_proxy != '':
|
|
ydl_opts['proxy'] = self.selected_proxy
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
try:
|
|
raw_video_info = ydl.extract_info(uri, download=False)
|
|
except yt_dlp.utils.DownloadError as error:
|
|
if 'The uploader has not made this video available in your country' in f"{error}":
|
|
self.summary['videos_with_error'] += 1
|
|
result = self._change_proxy(video_id)
|
|
if not result:
|
|
return None
|
|
return result
|
|
if 'HTTP Error 403: Forbidden' in f"{error}":
|
|
result = self._change_proxy(video_id)
|
|
if not result:
|
|
self.summary['videos_with_error'] += 1
|
|
self.summary['possible_proxy_errors'] += 1
|
|
return None
|
|
return result
|
|
if 'not a bot' in f"{error}":
|
|
# elif age_limit_errors?
|
|
result = self._change_proxy(video_id)
|
|
if not result:
|
|
self.summary['videos_with_error'] += 1
|
|
self.summary['possible_ban_errors'] += 1
|
|
return None
|
|
return result
|
|
if 'Failed to extract any player response' in f"{error}":
|
|
self._log.error(
|
|
"Possible error connecting to proxy '%s'",
|
|
self.selected_proxy
|
|
)
|
|
result = self._change_proxy(video_id)
|
|
if not result:
|
|
self.summary['videos_with_error'] += 1
|
|
self.summary['possible_proxy_errors'] += 1
|
|
return None
|
|
return result
|
|
# if 'This live event will begin in a few moments' in f"{error}":
|
|
# self.summary['skipped_videos'] += 1
|
|
self.summary['skipped_videos'] += 1
|
|
self._log.debug(
|
|
"Skipping video. Error: %s",
|
|
error
|
|
)
|
|
return None
|
|
video_info = ydl.sanitize_info(raw_video_info)
|
|
return video_info
|
|
|
|
def _process_channels(self):
|
|
self.total_count = 0
|
|
self.channels_count = 0
|
|
self.summary['total_channels'] = len(self.config['channels'])
|
|
for channel in self.config['channels']:
|
|
self.channels_count += 1
|
|
self._log.debug(
|
|
"Processing channel %s/%s '%s'...",
|
|
self.channels_count,
|
|
len(self.config['channels']),
|
|
channel
|
|
)
|
|
feed = feedparser.parse(
|
|
f"https://www.youtube.com/feeds/videos.xml?channel_id={channel}"
|
|
)
|
|
self.channel_count = 0
|
|
self.entries_count = 0
|
|
for entry in feed['entries']:
|
|
self.entries_count += 1
|
|
self._log.debug(
|
|
"Processing video entry %s/%s...",
|
|
self.entries_count,
|
|
len(feed['entries'])
|
|
)
|
|
result=re.search('v=([0-9a-zA-Z-_]{11})',entry['link'])
|
|
if result:
|
|
video_id=result.group(1)
|
|
if video_id not in self.downloaded_items:
|
|
video_info = self._get_video_info(video_id)
|
|
if not video_info:
|
|
break
|
|
info_filename = os.path.join(
|
|
self.config['download_dir'],
|
|
f"{video_id}.json"
|
|
)
|
|
self._log.debug(
|
|
"Writting information in to file '%s'",
|
|
info_filename
|
|
)
|
|
with open(info_filename, 'w', encoding='utf-8') as info_file:
|
|
json.dump(video_info, info_file, indent=2)
|
|
if self.config['skip_live_videos'] and video_info['live_status'] == 'is_live':
|
|
self._log.debug(
|
|
"Skipping video '%s' as it's a live video",
|
|
video_info.get('title', '?')
|
|
)
|
|
self.summary['skipped_videos'] += 1
|
|
self.downloaded_items.append(video_id)
|
|
with open(
|
|
self.config['downloaded_database'], 'w', encoding='utf-8'
|
|
) as db_file:
|
|
for item in self.downloaded_items:
|
|
db_file.write(f"{item}\n")
|
|
break
|
|
if video_info['was_live']:
|
|
self._log.debug(
|
|
"Skipping video '%s' as it was a live video",
|
|
video_info.get('title', '?')
|
|
)
|
|
self.summary['skipped_videos'] += 1
|
|
self._save_downloaded_items(video_id)
|
|
break
|
|
if ('duration' in video_info and
|
|
video_info['duration'] > self.config['max_length']):
|
|
self._log.debug(
|
|
"Skipping video '%s' as it was larger than %s",
|
|
video_info.get('title', '?'),
|
|
self._human_time_duration(self.config['max_length'])
|
|
)
|
|
self.summary['skipped_videos'] += 1
|
|
self._save_downloaded_items(video_id)
|
|
break
|
|
if 'duration' not in video_info:
|
|
self._log.debug(
|
|
"Skipping video '%s' as there is no video duration",
|
|
video_info.get('title', '?')
|
|
)
|
|
self.summary['skipped_videos'] += 1
|
|
self._save_downloaded_items(video_id)
|
|
break
|
|
self._log.info(
|
|
"Downloading. Filename: '%s'. Video ID: '%s'. Duration: %s. Counts: %s/%s - %s/%s)",
|
|
video_info.get('title', '?'),
|
|
video_id,
|
|
self._human_time_duration(video_info.get('duration', '-1')),
|
|
self.total_count+1,
|
|
self.config['total_limit'],
|
|
self.channel_count+1,
|
|
self.config['channel_limit']
|
|
)
|
|
if self.config['channels_folder'] and 'channel' in video_info:
|
|
download_dir = os.path.join(
|
|
self.config['download_dir'],
|
|
video_info['channel']
|
|
)
|
|
else:
|
|
download_dir = self.config['download_dir']
|
|
ydl_opts = {
|
|
'paths': {
|
|
'temp': '/tmp',
|
|
'home': download_dir
|
|
},
|
|
'writesubtitles': True,
|
|
'writeautomaticsub': True,
|
|
'writeannotations': True,
|
|
'write_all_thumbnails': False,
|
|
'writethumbnail': True,
|
|
'writeinfojson': True,
|
|
'subtitlesformat': 'srt',
|
|
'subtitleslangs': self.config['subtitle_langs'],
|
|
'allow_multiple_audio_streams': True,
|
|
'noprogress': True,
|
|
'merge_output_format': 'mkv',
|
|
'format': 'bestvideo+bestaudio[ext=m4a]/best',
|
|
}
|
|
if self.selected_proxy != '':
|
|
ydl_opts['proxy'] = self.selected_proxy
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
try:
|
|
uri=f"https://www.youtube.com/watch?v={video_id}"
|
|
return_code = ydl.download(uri)
|
|
self._process_download(
|
|
{
|
|
"return_code": return_code,
|
|
'info_dict': {
|
|
'id': video_id,
|
|
},
|
|
'filename': video_info.get('title', '?'),
|
|
'video_info': video_info,
|
|
}
|
|
)
|
|
except yt_dlp.utils.DownloadError as error:
|
|
self._log.error(
|
|
"Error getting video. %s",
|
|
error
|
|
)
|
|
self.summary['videos_with_error'] += 1
|
|
break
|
|
self.summary['downloaded_videos'] += 1
|
|
else:
|
|
self._log.debug(
|
|
"Video with ID '%s' has been already downloaded",
|
|
video_id
|
|
)
|
|
self.summary['skipped_videos'] += 1
|
|
else:
|
|
self._log.error(
|
|
"Error! Video ID not found in URI '%s'",
|
|
entry['link']
|
|
)
|
|
self.summary['videos_with_error'] += 1
|
|
self.summary['entries_count'] += self.entries_count
|
|
self.summary['processed_channels'] = self.channels_count
|
|
self.summary['total_count'] = self.total_count
|
|
|
|
|
|
def _process_download(self, data):
|
|
self.total_count += 1
|
|
self.channel_count += 1
|
|
if self.total_count == self.config['total_limit']:
|
|
self._log.info(
|
|
"Limit (%s) reached for videos from all channels",
|
|
self.config['total_limit'],
|
|
)
|
|
sys.exit(0)
|
|
if self.channel_count == self.config['channel_limit']:
|
|
self._log.info(
|
|
"Limit (%s) reached for videos for this channel '%s'",
|
|
self.config['channel_limit'],
|
|
data['info_dict'].get('channel', '?')
|
|
)
|
|
# break
|
|
info_filename = os.path.join(self.config['download_dir'], f"{os.path.basename(data['filename'])}.download_info.json")
|
|
self._log.debug(
|
|
"Writting download information in to file '%s'",
|
|
info_filename
|
|
)
|
|
with open(info_filename, 'w', encoding='utf-8') as info_file:
|
|
json.dump(data, info_file, indent=2)
|
|
if 'id' in data['info_dict']:
|
|
self._save_downloaded_items(data['info_dict']['id'])
|
|
|
|
|
|
def _yt_progress_hook(self, data):
|
|
if data['status'] == 'finished':
|
|
self._process_download(data)
|
|
elif data['status'] == 'downloading':
|
|
downloading = True
|
|
else:
|
|
self._log.debug(
|
|
"Progress hook got data['status']=%s instead of 'finished'",
|
|
data['status']
|
|
)
|
|
|
|
|
|
def _save_downloaded_items(self, video_id):
|
|
self.downloaded_items.append(video_id)
|
|
with open(self.config['downloaded_database'], 'w', encoding='utf-8') as db_file:
|
|
for item in self.downloaded_items:
|
|
db_file.write(f"{item}\n")
|
|
|
|
|
|
def _human_time_duration(self, seconds):
|
|
'''Return time duration in a human readable formated string'''
|
|
if seconds == 0:
|
|
return 'inf'
|
|
parts = []
|
|
for unit, div in self.time_duration_units:
|
|
amount, seconds = divmod(int(seconds), div)
|
|
if amount > 0:
|
|
parts.append(f'{amount} {unit}{"" if amount == 1 else "s"}')
|
|
return ', '.join(parts)
|
|
|
|
def _init_log(self):
|
|
''' Initialize log object '''
|
|
self._log = logging.getLogger("get_youtube_videos")
|
|
self._log.setLevel(logging.DEBUG)
|
|
|
|
sysloghandler = SysLogHandler()
|
|
sysloghandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(sysloghandler)
|
|
|
|
streamhandler = logging.StreamHandler(sys.stdout)
|
|
streamhandler.setLevel(
|
|
logging.getLevelName(self.config.get("debug_level", 'INFO'))
|
|
)
|
|
self._log.addHandler(streamhandler)
|
|
|
|
if 'log_file' in self.config:
|
|
log_file = self.config['log_file']
|
|
else:
|
|
home_folder = os.environ.get(
|
|
'HOME', os.environ.get('USERPROFILE', '')
|
|
)
|
|
log_folder = os.path.join(home_folder, "log")
|
|
log_file = os.path.join(log_folder, "get_youtube_videos.log")
|
|
|
|
if not os.path.exists(os.path.dirname(log_file)):
|
|
os.mkdir(os.path.dirname(log_file))
|
|
|
|
filehandler = logging.handlers.RotatingFileHandler(
|
|
log_file, maxBytes=102400000
|
|
)
|
|
# create formatter
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
|
|
)
|
|
filehandler.setFormatter(formatter)
|
|
filehandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(filehandler)
|
|
return True
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--debug-level",
|
|
"-d",
|
|
default="INFO",
|
|
type=click.Choice(
|
|
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
|
|
case_sensitive=False,
|
|
),
|
|
help='Set the debug level for the standard output.'
|
|
)
|
|
@click.option('--log-file', '-l', help="File to store all debug messages.")
|
|
# @click.option("--dummy","-n", is_flag=True,
|
|
# help="Don't do anything, just show what would be done.")
|
|
@click.option(
|
|
'--proxy', '-p',
|
|
multiple=True,
|
|
help='URLs for proxies to use in case of errors.'
|
|
)
|
|
@click.option(
|
|
'--downloaded-database', '-d',
|
|
default=f"{os.environ.get('HOME', os.environ.get('USERPROFILE', ''))}/.config/downloaded_youtube_videos",
|
|
help='File to store the IDs of downloaded videos'
|
|
)
|
|
@click.option(
|
|
'--download-dir', '-f',
|
|
default=f"{os.environ.get('HOME', os.environ.get('USERPROFILE', ''))}/downloaded_youtube_videos",
|
|
help='Folder to store the downloaded videos'
|
|
)
|
|
@click.option(
|
|
'--channels', '-c',
|
|
multiple=True,
|
|
required=True,
|
|
help='YouTube channels IDs to look up'
|
|
)
|
|
@click.option(
|
|
'--channel-limit', '-l',
|
|
default=5,
|
|
type=int,
|
|
help='Maximun number of videos to download from a channel'
|
|
)
|
|
@click.option(
|
|
'--total-limit', '-L',
|
|
default=5,
|
|
type=int,
|
|
help='Maximun number of videos to download in total'
|
|
)
|
|
@click.option(
|
|
'--max-length', '-d',
|
|
default=5400,
|
|
type=int,
|
|
help='Maximun duration of videos to download in seconds'
|
|
)
|
|
@click.option(
|
|
'--subtitle-langs', '-s',
|
|
multiple=True,
|
|
default=['en.*'],
|
|
help='''List of languages of the subtitles to download (can be regex).
|
|
The list may contain "all" to refer to all the available
|
|
subtitles. The language can be prefixed with a "-" to
|
|
exclude it from the requested languages, e.g. ['all', '-live_chat'].
|
|
And you can use wildcards like en.*'''
|
|
)
|
|
@click.option(
|
|
'--channels-folder', '-c',
|
|
is_flag=True,
|
|
default=False,
|
|
help='Create folders per channel and save videos inside their channel folder'
|
|
)
|
|
@click.option(
|
|
'--skip-live-videos', '-S',
|
|
is_flag=True,
|
|
default=False,
|
|
help='Skip live videos'
|
|
)
|
|
@click_config_file.configuration_option()
|
|
def __main__(**kwargs):
|
|
return GetYoutubeVideos(**kwargs)
|
|
|
|
if __name__ == "__main__":
|
|
__main__()
|