get_youtube_videos/get_youtube_videos/get_youtube_videos.py

451 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2024 Antonio J. Delgado
"""Get YouTube videos from a series of channel's feeds"""
import sys
import os
import re
import json
from yaml import dump
try:
from yaml import CDumper as Dumper
except ImportError:
from yaml import Dumper
import logging
from logging.handlers import SysLogHandler
import click
import click_config_file
import requests
import feedparser
import yt_dlp
class GetYoutubeVideos:
'''Get YouTube videos from a series of channels'''
def __init__(self, **kwargs):
self.time_duration_units = (
('week', 60*60*24*7),
('day', 60*60*24),
('hour', 60*60),
('min', 60),
('sec', 1)
)
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'get_youtube_videos.log'
)
self._init_log()
self.summary = {
'config': self.config,
'entries_count': 0,
'skipped_videos': 0,
'downloaded_videos': 0,
'videos_with_error': 0,
'possible_proxy_errors': 0,
'possible_ban_errors': 0,
'age_limit_errors': 0,
}
self.selected_proxy = ''
self.proxy_index = 0
if os.path.exists(self.config['downloaded_database']):
with open(self.config['downloaded_database'], 'r', encoding='utf-8') as db_file:
self.downloaded_items = db_file.read().split('\n')
else:
self.downloaded_items = []
self.session = requests.Session()
self._process_channels()
self._log.info(
dump(
{
"Summary": self.summary
},
Dumper=Dumper
)
)
def _get_video_info(self, video_id):
uri=f"https://www.youtube.com/watch?v={video_id}"
ydl_opts = {
'logger': self._log,
'progress_hooks': [self._yt_progress_hook],
}
if self.selected_proxy != '':
ydl_opts['proxy'] = self.selected_proxy
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
raw_video_info = ydl.extract_info(uri, download=False)
except yt_dlp.utils.DownloadError as error:
self._log.debug(
"Skipping video. Error: %s",
error
)
if 'The uploader has not made this video available in your country' in f"{error}":
self.summary['videos_with_error'] += 1
elif 'HTTP Error 403: Forbidden' in f"{error}":
self.summary['videos_with_error'] += 1
self.summary['possible_proxy_errors'] += 1
if len(self.config['proxy']) > 0:
if self.proxy_index<len(self.config['proxy']):
self.selected_proxy = self.config['proxy'][self.proxy_index]
self.proxy_index += 1
return self._get_video_info(video_id)
self.proxy_index = 0
self.selected_proxy = ''
return None
elif 'not a bot' in f"{error}":
self.summary['videos_with_error'] += 1
self.summary['possible_ban_errors'] += 1
# elif age_limit_errors?
elif 'This live event will begin in a few moments' in f"{error}":
self.summary['skipped_videos'] += 1
else:
self.summary['skipped_videos'] += 1
return None
video_info = ydl.sanitize_info(raw_video_info)
return video_info
def _process_channels(self):
self.total_count = 0
self.channels_count = 0
self.summary['total_channels'] = len(self.config['channels'])
for channel in self.config['channels']:
self.channels_count += 1
self._log.debug(
"Processing channel %s/%s '%s'...",
self.channels_count,
len(self.config['channels']),
channel
)
feed = feedparser.parse(
f"https://www.youtube.com/feeds/videos.xml?channel_id={channel}"
)
self.channel_count = 0
self.entries_count = 0
for entry in feed['entries']:
self.entries_count += 1
self._log.debug(
"Processing video entry %s/%s...",
self.entries_count,
len(feed['entries'])
)
result=re.search('v=([0-9a-zA-Z-_]{11})',entry['link'])
if result:
video_id=result.group(1)
if video_id not in self.downloaded_items:
video_info = self._get_video_info(video_id)
info_filename = os.path.join(
self.config['download_dir'],
f"{video_id}.json"
)
self._log.debug(
"Writting information in to file '%s'",
info_filename
)
with open(info_filename, 'w', encoding='utf-8') as info_file:
json.dump(video_info, info_file, indent=2)
if self.config['skip_live_videos'] and video_info['live_status'] == 'is_live':
self._log.debug(
"Skipping video '%s' as it's a live video",
video_info.get('title', '?')
)
self.summary['skipped_videos'] += 1
self.downloaded_items.append(video_id)
with open(
self.config['downloaded_database'], 'w', encoding='utf-8'
) as db_file:
for item in self.downloaded_items:
db_file.write(f"{item}\n")
break
if video_info['was_live']:
self._log.debug(
"Skipping video '%s' as it was a live video",
video_info.get('title', '?')
)
self.summary['skipped_videos'] += 1
self._save_downloaded_items(video_id)
break
if ('duration' in video_info and
video_info['duration'] > self.config['max_length']):
self._log.debug(
"Skipping video '%s' as it was larger than %s",
video_info.get('title', '?'),
self._human_time_duration(self.config['max_length'])
)
self.summary['skipped_videos'] += 1
self._save_downloaded_items(video_id)
break
if 'duration' not in video_info:
self._log.debug(
"Skipping video '%s' as there is no video duration",
video_info.get('title', '?')
)
self.summary['skipped_videos'] += 1
self._save_downloaded_items(video_id)
break
self._log.info(
"Downloading. Filename: '%s'. Video ID: '%s'. Duration: %s. Counts: %s/%s - %s/%s)",
video_info.get('title', '?'),
video_id,
self._human_time_duration(video_info.get('duration', '-1')),
self.total_count+1,
self.config['total_limit'],
self.channel_count+1,
self.config['channel_limit']
)
if self.config['channels_folder'] and 'channel' in video_info:
download_dir = os.path.join(
self.config['download_dir'],
video_info['channel']
)
else:
download_dir = self.config['download_dir']
ydl_opts = {
'paths': {
'temp': '/tmp',
'home': download_dir
},
'writesubtitles': True,
'writeautomaticsub': True,
'writeannotations': True,
'write_all_thumbnails': False,
'writethumbnail': True,
'writeinfojson': True,
'subtitlesformat': 'srt',
'subtitleslangs': self.config['subtitle_langs'],
'allow_multiple_audio_streams': True,
'noprogress': True,
'merge_output_format': 'mkv',
'format': 'bestvideo+bestaudio[ext=m4a]/best',
}
if self.selected_proxy != '':
ydl_opts['proxy'] = self.selected_proxy
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
uri=f"https://www.youtube.com/watch?v={video_id}"
return_code = ydl.download(uri)
self._process_download(
{
"return_code": return_code,
'info_dict': {
'id': video_id,
},
'filename': video_info.get('title', '?'),
'video_info': video_info,
}
)
except yt_dlp.utils.DownloadError as error:
self._log.error(
"Error getting video. %s",
error
)
self.summary['videos_with_error'] += 1
break
self.summary['downloaded_videos'] += 1
else:
self._log.debug(
"Video with ID '%s' has been already downloaded",
video_id
)
self.summary['skipped_videos'] += 1
else:
self._log.error(
"Error! Video ID not found in URI '%s'",
entry['link']
)
self.summary['videos_with_error'] += 1
self.summary['entries_count'] += self.entries_count
self.summary['processed_channels'] = self.channels_count
self.summary['total_count'] = self.total_count
def _process_download(self, data):
self.total_count += 1
self.channel_count += 1
if self.total_count == self.config['total_limit']:
self._log.info(
"Limit (%s) reached for videos from all channels",
self.config['total_limit'],
)
sys.exit(0)
if self.channel_count == self.config['channel_limit']:
self._log.info(
"Limit (%s) reached for videos for this channel '%s'",
self.config['channel_limit'],
data['info_dict'].get('channel', '?')
)
# break
info_filename = os.path.join(self.config['download_dir'], f"{os.path.basename(data['filename'])}.download_info.json")
self._log.debug(
"Writting download information in to file '%s'",
info_filename
)
with open(info_filename, 'w', encoding='utf-8') as info_file:
json.dump(data, info_file, indent=2)
if 'id' in data['info_dict']:
self._save_downloaded_items(data['info_dict']['id'])
def _yt_progress_hook(self, data):
if data['status'] == 'finished':
self._process_download(data)
elif data['status'] == 'downloading':
downloading = True
else:
self._log.debug(
"Progress hook got data['status']=%s instead of 'finished'",
data['status']
)
def _save_downloaded_items(self, video_id):
self.downloaded_items.append(video_id)
with open(self.config['downloaded_database'], 'w', encoding='utf-8') as db_file:
for item in self.downloaded_items:
db_file.write(f"{item}\n")
def _human_time_duration(self, seconds):
'''Return time duration in a human readable formated string'''
if seconds == 0:
return 'inf'
parts = []
for unit, div in self.time_duration_units:
amount, seconds = divmod(int(seconds), div)
if amount > 0:
parts.append(f'{amount} {unit}{"" if amount == 1 else "s"}')
return ', '.join(parts)
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("get_youtube_videos")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "get_youtube_videos.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option('--log-file', '-l', help="File to store all debug messages.")
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click.option(
'--proxy', '-p',
multiple=True,
help='URLs for proxies to use in case of errors.'
)
@click.option(
'--downloaded-database', '-d',
default=f"{os.environ.get('HOME', os.environ.get('USERPROFILE', ''))}/.config/downloaded_youtube_videos",
help='File to store the IDs of downloaded videos'
)
@click.option(
'--download-dir', '-f',
default=f"{os.environ.get('HOME', os.environ.get('USERPROFILE', ''))}/downloaded_youtube_videos",
help='Folder to store the downloaded videos'
)
@click.option(
'--channels', '-c',
multiple=True,
required=True,
help='YouTube channels IDs to look up'
)
@click.option(
'--channel-limit', '-l',
default=5,
type=int,
help='Maximun number of videos to download from a channel'
)
@click.option(
'--total-limit', '-L',
default=5,
type=int,
help='Maximun number of videos to download in total'
)
@click.option(
'--max-length', '-d',
default=5400,
type=int,
help='Maximun duration of videos to download in seconds'
)
@click.option(
'--subtitle-langs', '-s',
multiple=True,
default=['en.*'],
help='''List of languages of the subtitles to download (can be regex).
The list may contain "all" to refer to all the available
subtitles. The language can be prefixed with a "-" to
exclude it from the requested languages, e.g. ['all', '-live_chat'].
And you can use wildcards like en.*'''
)
@click.option(
'--channels-folder', '-c',
is_flag=True,
default=False,
help='Create folders per channel and save videos inside their channel folder'
)
@click.option(
'--skip-live-videos', '-S',
is_flag=True,
default=False,
help='Skip live videos'
)
@click_config_file.configuration_option()
def __main__(**kwargs):
return GetYoutubeVideos(**kwargs)
if __name__ == "__main__":
__main__()