mastodon_email_bridge/mastodon_email_bridge/mastodon_email_bridge.py
2025-02-17 09:54:19 +02:00

445 lines
16 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2024 Antonio J. Delgado
"""Redirect the Mastodon home timeline to email"""
import json
import time
import sys
import os
import logging
from logging.handlers import SysLogHandler
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import sqlite3
import importlib
import click
import click_config_file
import requests
from jinja2 import Environment, select_autoescape, FileSystemLoader
class MastodonEmailBridge:
'''CLass to redirect the Mastodon home timeline to email'''
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'mastodon_email_bridge.log'
)
if 'sent_items_file' not in kwargs or kwargs['sent_items_file'] is None:
self.config['sent_items_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'.mastodon_email_bridge_sent_items.db'
)
self._init_log()
self._get_sent_posts()
if 'templates_folder' in self.config:
templates_folder=self.config['templates_folder']
else:
templates_folder = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'.config',
'mastodon_email_bridge',
'templates'
)
if not os.path.exists(templates_folder):
os.mkdir(templates_folder)
self.j2env = Environment(
#loader=PackageLoader("mastodon_email_bridge"),
loader=FileSystemLoader(templates_folder, followlinks=True),
autoescape=select_autoescape()
)
self.translate_session = requests.Session()
headers = {
'accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
}
self.translate_session.headers.update(headers)
self.session = requests.Session()
self.session.headers.update({'Authorization': f"Bearer {self.config['token']}"})
count=1
self._log.debug(
"Getting URL 'https://%s/api/v1/timelines/home?limit=%s'",
self.config['server'],
self.config['limit_per_request']
)
next_url = f"https://{self.config['server']}/api/v1/timelines/home?limit={self.config['limit_per_request']}"
next_url = self.process_url(next_url)
while next_url != "" and (count < self.config['limit'] or self.config['limit'] == 0):
count = count + 1
self._log.debug("Waiting %s seconds for next request...", self.config['wait'])
time.sleep(self.config['wait'])
self._log.debug("Getting URL '%s'", next_url)
next_url = self.process_url(next_url)
def _get_sent_posts(self):
self.sent_items = []
self._log.debug(
"Reading sent items from database..."
)
self.sqlite = sqlite3.connect(self.config['sent_items_file'])
self.sqlite.row_factory = sqlite3.Row
cur = self.sqlite.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS sent_items(id PRIMARY KEY, date)")
self.sqlite.commit()
res = cur.execute("SELECT id FROM sent_items")
rows = res.fetchall()
for row in rows:
self._log.debug(
row
)
if row[0] in self.sent_items:
self._log.warning("Duplicate id in database '%s'", row[0])
# else:
# self._log.debug("Found sent item with id '%s' (%s)", row[0], type(row[0]))
self.sent_items.append(row[0])
self._log.debug(
"Got %s sent items from database",
len(self.sent_items)
)
return True
def process_url(self, url):
'''Process a home endpoint URL'''
next_url = ''
result = self.session.get(url)
# for header in result.headers:
# self._log.debug("%s = %s", header, result.headers[header])
if ('X-RateLimit-Remaining' in result.headers
and int(result.headers['X-RateLimit-Remaining']) < 10):
self._log.warning("X-RateLimit-Reset: %s", result.headers['X-RateLimit-Reset'])
try:
reset_time = time.mktime(
time.strptime(
result.headers['X-RateLimit-Reset'],
'%Y-%m-%dT%H:%M:%S.%fZ'
)
)
self._log.warning('Waiting until that time to try again.')
time.sleep(reset_time - time.time() + 2)
except Exception as error:
self._log.error(
"Error trying to convert given reset time '%s'. %s",
result.headers['X-RateLimit-Reset'],
error
)
return url
for data in result.json():
data['meb_reply_to'] = []
if data['in_reply_to_id']:
self._log.debug(
"This post is a reply to '%s', fetching it",
data['in_reply_to_id']
)
data['meb_reply_to'].append(self.get_post(data['in_reply_to_id']))
if data['reblog'] and data['reblog']['in_reply_to_id']:
self._log.debug(
"This post is a reblog of a reply to '%s', fetching it",
data['reblog']['in_reply_to_id']
)
data['meb_reply_to'].append(self.get_post(data['reblog']['in_reply_to_id']))
data = self._translate_data(data)
if int(data['id']) not in self.sent_items:
self.send_mail(data)
else:
self._log.debug("Skipping post %s that was sent in the past", data['id'])
if 'Link' in result.headers:
for link in result.headers['Link'].split(', '):
slink = link.split('; ')
if slink[1] == 'rel="next"':
next_url = slink[0].replace('<', '').replace('>', '')
return next_url
def _translate_data(self, data):
if self.config['libretranslate_url'] is None or self.config['libretranslate_url'] == '':
self._log.debug(
"Not translating data because no LibreTranslate URL was specified"
)
return data
new_data = data
if 'language' not in data:
source_language = 'auto'
else:
source_language = data['language']
if source_language in self.config['not_translate_language']:
return data
fields_to_translate = [
'spoiler',
'content'
]
for field in fields_to_translate:
if field in data:
new_data[f"translated_{field}"] = self._translate(
data[field],
source_language=source_language
)
return new_data
def get_post(self, post_id):
'''Get a single post'''
post = {}
self._log.debug(
"Getting post URL 'https://%s/api/v1/statuses/%s'...",
self.config['server'],
post_id
)
result = self.session.get(f"https://{self.config['server']}/api/v1/statuses/{post_id}")
post = result.json()
return post
def send_mail(self, data):
'''Send an email with the post composed'''
sender = self._str_template(self.config['sender'], data)
recipient = self._str_template(self.config['recipient'], data)
msg = MIMEMultipart('alternative')
msg['Subject'] = self._str_template(self.config['subjet_template'], data)
msg['From'] = sender
msg['Date'] = time.strftime('%a, %d %b %Y %H:%M:%S %z')
msg['To'] = recipient
html_template = self.j2env.get_template("new_post.html.j2")
html_source = html_template.render(
imp0rt = importlib.import_module,
data=data,
json_raw=json.dumps(data, indent=2)
)
txt_template = self.j2env.get_template("new_post.txt.j2")
txt_source = txt_template.render(
imp0rt = importlib.import_module,
data=data,
json_raw=json.dumps(data, indent=2)
)
if 'sent_folder' not in self.config:
sent_folder = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'.mastodon_email_bridge_sent_items'
)
else:
sent_folder = self.config['sent_folder']
if not os.path.exists(sent_folder):
os.mkdir(sent_folder)
sent_file = os.path.join(sent_folder, data['id'] + '.html')
with open(sent_file, 'a', encoding="utf-8") as sent_item_file:
sent_item_file.write(html_source)
part1 = MIMEText(txt_source, 'plain')
part2 = MIMEText(html_source, 'html')
msg.attach(part1)
msg.attach(part2)
conn=smtplib.SMTP_SSL(self.config['mail_server'],self.config['mail_server_port'])
if self.config['mail_user'] is not None:
conn.login(self.config['mail_user'], self.config['mail_pass'])
self._log.debug("Sending email for post with id '%s'...", data['id'])
conn.sendmail(sender, recipient, msg.as_string())
conn.quit()
self._log.debug("Adding entry to database...")
cur = self.sqlite.cursor()
cur.execute(f"INSERT INTO sent_items (id, date) VALUES ({data['id']}, {time.time()})")
self.sqlite.commit()
self.sent_items.append(data['id'])
return True
def _str_template(self, template_string, data):
template = self.j2env.from_string(template_string)
result = template.render(data=data)
return result
def _translate(self, text, source_language='auto', destination_language=None):
if destination_language is None:
destination_language = self.config['destination_language']
data = {
"q": text,
"source": source_language,
"target": destination_language,
"api_key": self.config['libretranslate_token'],
"format": "text",
}
response = self.translate_session.post(
url=f"{self.config['libretranslate_url']}/",
data=data,
)
translation = response.json()['translatedText']
return translation
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("mastodon_email_bridge")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "mastodon_email_bridge.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option('--token', '-t', required=True, help='Mastodon token with read access.')
@click.option(
'--server',
'-s',
default='eu.mastodon.green',
help='Mastodon server full qualified name.'
)
@click.option('--limit', '-L', default=0, help='Mastodon token with read access.')
@click.option('--limit-per-request', '-R', default=40, help='Mastodon token with read access.')
@click.option(
'--wait',
'-w',
default=60,
help='Seconds to wait between requests to avoid rate limits.'
)
@click.option(
'--recipient',
'-r',
required=True,
help='Recipient email to get the posts. This can be a Jinja2 template.'
)
@click.option(
'--sender',
'-S',
default='mastodon_email_bridge@example.org',
help='Sender email thant send the posts. This can be a Jinja2 template.'
)
@click.option(
'--sent-items-file',
'-f',
default=None,
help='File to store the IDs of post already sent by email.'
)
@click.option('--mail-server', '-m', default='localhost', help='SMTP Mail server to send emails.')
@click.option(
'--mail-user',
'-u',
default=None,
help='Username for SMTP Mail server to send emails.'
)
@click.option(
'--mail-pass',
'-P',
default=None,
help='User password for SMTP Mail server to send emails.'
)
@click.option(
'--mail-server-port',
'-p',
default=465,
help='SMTP Mail server port to send emails.'
)
@click.option(
'--subjet-template',
'-t',
default='{{ data["account"]["display_name"] }} ({{ data["account"]["username"] }}) {% if data["in_reply_to_id"] %}replied {% else %}posted{% endif %}{% for tag in data["tags"] %} #{{ tag["name"] }}{% endfor %}',
help='Jinja2 template for the subject of the emails.'
)
@click.option(
'--sent-folder',
'-F',
help='Folder to store generated HTML files to be sent.'
)
@click.option(
'--templates-folder',
'-T',
help='Folder with the templates to generate HTML and text files to be sent.'
)
@click.option(
'--libretranslate-url',
'-U',
help='LibreTranslate instance URL to use like: https://translate.example.org/translate'
)
@click.option(
'--libretranslate-token',
'-o',
default='',
help='LibreTranslate token to use'
)
@click.option(
'--not-translate-language',
'-n',
multiple=True,
help='Languages that you don\'t want to translate with LibreTranslate'
)
@click.option(
'--destination-language',
'-D',
default='en',
help='Language destination for the translations'
)
@click.option('--log-file', '-l', help="File to store all debug messages.")
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click_config_file.configuration_option()
def __main__(**kwargs):
return MastodonEmailBridge(**kwargs)
if __name__ == "__main__":
__main__()