336 lines
12 KiB
Python
Executable file
336 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
#
|
|
# This script is licensed under GNU GPL version 2.0 or above
|
|
# (c) 2024 Antonio J. Delgado
|
|
"""Redirect the Mastodon home timeline to email"""
|
|
|
|
import json
|
|
import time
|
|
import sys
|
|
import os
|
|
import logging
|
|
from logging.handlers import SysLogHandler
|
|
import smtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
import sqlite3
|
|
import click
|
|
import click_config_file
|
|
import requests
|
|
from jinja2 import Environment, select_autoescape, FileSystemLoader
|
|
import importlib
|
|
|
|
class MastodonEmailBridge:
|
|
'''CLass to redirect the Mastodon home timeline to email'''
|
|
|
|
def __init__(self, **kwargs):
|
|
self.config = kwargs
|
|
if 'log_file' not in kwargs or kwargs['log_file'] is None:
|
|
self.config['log_file'] = os.path.join(
|
|
os.environ.get(
|
|
'HOME',
|
|
os.environ.get(
|
|
'USERPROFILE',
|
|
os.getcwd()
|
|
)
|
|
),
|
|
'log',
|
|
'mastodon_email_bridge.log'
|
|
)
|
|
if 'sent_items_file' not in kwargs or kwargs['sent_items_file'] is None:
|
|
self.config['sent_items_file'] = os.path.join(
|
|
os.environ.get(
|
|
'HOME',
|
|
os.environ.get(
|
|
'USERPROFILE',
|
|
os.getcwd()
|
|
)
|
|
),
|
|
'.mastodon_email_bridge_sent_items.db'
|
|
)
|
|
self._init_log()
|
|
self._get_sent_posts()
|
|
templates_folder = os.path.join(
|
|
os.environ.get(
|
|
'HOME',
|
|
os.environ.get(
|
|
'USERPROFILE',
|
|
os.getcwd()
|
|
)
|
|
),
|
|
'.config',
|
|
'mastodon_email_bridge',
|
|
'templates'
|
|
)
|
|
if not os.path.exists(templates_folder):
|
|
os.mkdir(templates_folder)
|
|
|
|
self.j2env = Environment(
|
|
#loader=PackageLoader("mastodon_email_bridge"),
|
|
loader=FileSystemLoader(templates_folder, followlinks=True),
|
|
autoescape=select_autoescape()
|
|
)
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'Authorization': f"Bearer {self.config['token']}"})
|
|
count=1
|
|
self._log.debug(
|
|
"Getting URL 'https://%s/api/v1/timelines/home?limit=%s'",
|
|
self.config['server'],
|
|
self.config['limit_per_request']
|
|
)
|
|
next_url = f"https://{self.config['server']}/api/v1/timelines/home?limit={self.config['limit_per_request']}"
|
|
next_url = self.process_url(next_url)
|
|
while next_url != "" and (count < self.config['limit'] or self.config['limit'] == 0):
|
|
count = count + 1
|
|
self._log.debug("Waiting %s seconds for next request...", self.config['wait'])
|
|
time.sleep(self.config['wait'])
|
|
self._log.debug("Getting URL '%s'", next_url)
|
|
next_url = self.process_url(next_url)
|
|
|
|
def _get_sent_posts(self):
|
|
self.sent_items = []
|
|
self.sqlite = sqlite3.connect(self.config['sent_items_file'])
|
|
self.sqlite.row_factory = sqlite3.Row
|
|
cur = self.sqlite.cursor()
|
|
cur.execute("CREATE TABLE IF NOT EXISTS sent_items(id PRIMARY KEY, date)")
|
|
self.sqlite.commit()
|
|
res = cur.execute("SELECT id FROM sent_items")
|
|
rows = res.fetchall()
|
|
for row in rows:
|
|
if row[0] in self.sent_items:
|
|
self._log.warning("Duplicate id in database '%s'", row[0])
|
|
else:
|
|
self._log.debug("Found sent item with id '%s' (%s)", row[0], type(row[0]))
|
|
self.sent_items.append(row[0])
|
|
return True
|
|
|
|
def process_url(self, url):
|
|
'''Process a home endpoint URL'''
|
|
next_url = ''
|
|
result = self.session.get(url)
|
|
# for header in result.headers:
|
|
# self._log.debug("%s = %s", header, result.headers[header])
|
|
if 'X-RateLimit-Remaining' in result.headers and int(result.headers['X-RateLimit-Remaining']) < 10:
|
|
self._log.warning("X-RateLimit-Reset: %s", result.headers['X-RateLimit-Reset'])
|
|
try:
|
|
reset_time = time.mktime(
|
|
time.strptime(
|
|
result.headers['X-RateLimit-Reset'],
|
|
'%Y-%m-%dT%H:%M:%S.%fZ'
|
|
)
|
|
)
|
|
self._log.warning('Waiting until that time to try again.')
|
|
time.sleep(reset_time - time.time() + 2)
|
|
except Exception as error:
|
|
self._log.error(
|
|
"Error trying to convert given reset time '%s'. %s",
|
|
result.headers['X-RateLimit-Reset'],
|
|
error
|
|
)
|
|
return url
|
|
for data in result.json():
|
|
data['meb_reply_to'] = []
|
|
if data['in_reply_to_id']:
|
|
self._log.debug(
|
|
"This post is a reply to '%s', fetching it",
|
|
data['in_reply_to_id']
|
|
)
|
|
data['meb_reply_to'].append(self.get_post(data['in_reply_to_id']))
|
|
if data['reblog'] and data['reblog']['in_reply_to_id']:
|
|
self._log.debug(
|
|
"This post is a reblog of a reply to '%s', fetching it",
|
|
data['reblog']['in_reply_to_id']
|
|
)
|
|
data['meb_reply_to'].append(self.get_post(data['reblog']['in_reply_to_id']))
|
|
if int(data['id']) not in self.sent_items:
|
|
self.send_mail(data)
|
|
else:
|
|
self._log.debug("Skipping post %s that was sent in the past", data['id'])
|
|
if 'Link' in result.headers:
|
|
for link in result.headers['Link'].split(', '):
|
|
slink = link.split('; ')
|
|
if slink[1] == 'rel="next"':
|
|
next_url = slink[0].replace('<', '').replace('>', '')
|
|
return next_url
|
|
|
|
def get_post(self, post_id):
|
|
'''Get a single post'''
|
|
post = {}
|
|
self._log.debug(
|
|
"Getting post URL 'https://%s/api/v1/statuses/%s'...",
|
|
self.config['server'],
|
|
post_id
|
|
)
|
|
result = self.session.get(f"https://{self.config['server']}/api/v1/statuses/{post_id}")
|
|
post = result.json()
|
|
return post
|
|
|
|
def send_mail(self, data):
|
|
'''Send an email with the post composed'''
|
|
sender = self._str_template(self.config['sender'], data)
|
|
recipient = self._str_template(self.config['recipient'], data)
|
|
msg = MIMEMultipart('alternative')
|
|
if data['in_reply_to_id']:
|
|
msg['Subject'] = f"FediReply from {data['account']['display_name']} ({data['account']['username']})"
|
|
else:
|
|
msg['Subject'] = f"FediPost from {data['account']['display_name']} ({data['account']['username']})"
|
|
msg['From'] = sender
|
|
msg['To'] = recipient
|
|
html_template = self.j2env.get_template("new_post.html.j2")
|
|
html_source = html_template.render(
|
|
imp0rt = importlib.import_module,
|
|
data=data,
|
|
json_raw=json.dumps(data, indent=2)
|
|
)
|
|
txt_template = self.j2env.get_template("new_post.txt.j2")
|
|
txt_source = txt_template.render(
|
|
imp0rt = importlib.import_module,
|
|
data=data,
|
|
json_raw=json.dumps(data, indent=2)
|
|
)
|
|
sent_folder = os.path.join(
|
|
os.environ.get(
|
|
'HOME',
|
|
os.environ.get(
|
|
'USERPROFILE',
|
|
os.getcwd()
|
|
)
|
|
),
|
|
'.mastodon_email_bridge_sent_items'
|
|
)
|
|
if not os.path.exists(sent_folder):
|
|
os.mkdir(sent_folder)
|
|
sent_file = os.path.join(sent_folder, data['id'] + '.html')
|
|
with open(sent_file, 'a', encoding="utf-8") as sent_item_file:
|
|
sent_item_file.write(html_source)
|
|
part1 = MIMEText(txt_source, 'plain')
|
|
part2 = MIMEText(html_source, 'html')
|
|
msg.attach(part1)
|
|
msg.attach(part2)
|
|
conn=smtplib.SMTP_SSL(self.config['mail_server'],self.config['mail_server_port'])
|
|
if self.config['mail_user'] is not None:
|
|
conn.login(self.config['mail_user'], self.config['mail_pass'])
|
|
self._log.debug("Sending email for post with id '%s'...", data['id'])
|
|
conn.sendmail(sender, recipient, msg.as_string())
|
|
conn.quit()
|
|
self._log.debug("Adding entry to database...")
|
|
cur = self.sqlite.cursor()
|
|
cur.execute(f"INSERT INTO sent_items (id, date) VALUES ({data['id']}, {time.time()})")
|
|
self.sqlite.commit()
|
|
self.sent_items.append(data['id'])
|
|
return True
|
|
|
|
def _str_template(self, template_string, data):
|
|
template = self.j2env.from_string(template_string)
|
|
result = template.render(data=data)
|
|
return result
|
|
|
|
def _init_log(self):
|
|
''' Initialize log object '''
|
|
self._log = logging.getLogger("mastodon_email_bridge")
|
|
self._log.setLevel(logging.DEBUG)
|
|
|
|
sysloghandler = SysLogHandler()
|
|
sysloghandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(sysloghandler)
|
|
|
|
streamhandler = logging.StreamHandler(sys.stdout)
|
|
streamhandler.setLevel(
|
|
logging.getLevelName(self.config.get("debug_level", 'INFO'))
|
|
)
|
|
self._log.addHandler(streamhandler)
|
|
|
|
if 'log_file' in self.config:
|
|
log_file = self.config['log_file']
|
|
else:
|
|
home_folder = os.environ.get(
|
|
'HOME', os.environ.get('USERPROFILE', '')
|
|
)
|
|
log_folder = os.path.join(home_folder, "log")
|
|
log_file = os.path.join(log_folder, "mastodon_email_bridge.log")
|
|
|
|
if not os.path.exists(os.path.dirname(log_file)):
|
|
os.mkdir(os.path.dirname(log_file))
|
|
|
|
filehandler = logging.handlers.RotatingFileHandler(
|
|
log_file, maxBytes=102400000
|
|
)
|
|
# create formatter
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
|
|
)
|
|
filehandler.setFormatter(formatter)
|
|
filehandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(filehandler)
|
|
return True
|
|
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--debug-level",
|
|
"-d",
|
|
default="INFO",
|
|
type=click.Choice(
|
|
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
|
|
case_sensitive=False,
|
|
),
|
|
help='Set the debug level for the standard output.'
|
|
)
|
|
@click.option('--token', '-t', required=True, help='Mastodon token with read access.')
|
|
@click.option(
|
|
'--server',
|
|
'-s',
|
|
default='eu.mastodon.green',
|
|
help='Mastodon server full qualified name.'
|
|
)
|
|
@click.option('--limit', '-L', default=0, help='Mastodon token with read access.')
|
|
@click.option('--limit-per-request', '-R', default=40, help='Mastodon token with read access.')
|
|
@click.option('--wait', '-w', default=60, help='Seconds to wait between requests to avoid rate limits.')
|
|
@click.option(
|
|
'--recipient',
|
|
'-r',
|
|
required=True,
|
|
help='Recipient email to get the posts. This can be a Jinja2 template.'
|
|
)
|
|
@click.option(
|
|
'--sender',
|
|
'-S',
|
|
default='mastodon_email_bridge@example.org',
|
|
help='Sender email thant send the posts. This can be a Jinja2 template.'
|
|
)
|
|
@click.option(
|
|
'--sent-items-file',
|
|
'-f',
|
|
default=None,
|
|
help='File to store the IDs of post already sent by email.'
|
|
)
|
|
@click.option('--mail-server', '-m', default='localhost', help='SMTP Mail server to send emails.')
|
|
@click.option(
|
|
'--mail-user',
|
|
'-u',
|
|
default=None,
|
|
help='Username for SMTP Mail server to send emails.'
|
|
)
|
|
@click.option(
|
|
'--mail-pass',
|
|
'-P',
|
|
default=None,
|
|
help='User password for SMTP Mail server to send emails.'
|
|
)
|
|
@click.option(
|
|
'--mail-server-port',
|
|
'-p',
|
|
default=465,
|
|
help='SMTP Mail server port to send emails.'
|
|
)
|
|
@click.option('--log-file', '-l', help="File to store all debug messages.")
|
|
# @click.option("--dummy","-n", is_flag=True,
|
|
# help="Don't do anything, just show what would be done.")
|
|
@click_config_file.configuration_option()
|
|
def __main__(**kwargs):
|
|
return MastodonEmailBridge(**kwargs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
__main__()
|