discover-mastodon-servers/discover-mastodon-servers/discover_mastodon_servers.py
2024-01-19 17:42:15 +02:00

288 lines
10 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2024 Antonio J. Delgado
"""Discover Mastodon servers by looking at public timelines"""
import sys
import os
import logging
from logging.handlers import SysLogHandler
import sqlite3
import time
import re
import click
import click_config_file
import requests
class DiscoverMastodonServers:
'''Class to Discover Mastodon Servers'''
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'discover-mastodon-servers.log'
)
self._init_log()
self.session = requests.Session()
self.session.proxies.update({ 'https': self.config['proxy']})
self.conn = sqlite3.connect(self.config['database_file'])
self.read_db()
if len(self.servers) == 0:
self._log.debug("Adding initial server.")
self.servers[self.config['initial_server']] = {
"name": self.config['initial_server'],
"last_update": time.time(),
"private": False
}
new_servers_count = 1
while new_servers_count > 0:
new_servers_count = self.discover()
def get_timeline(self, server):
'''Get the data of a public timeline for a given server'''
return self.get_path(server, 'api/v1/timelines/public')
def get_path(self, server, endpoint):
'''Get the data of an endpoint of a server'''
data = None
try:
result = self.session.get(
f"https://{server}/{endpoint}",
timeout=10
)
if result.status_code < 400:
if 'application/json' in result.headers['Content-Type']:
data = result.json()
if 'error' not in data:
return data
else:
self._log.debug(
"Server '%s' didn't reply with JSON data.", server
)
else:
self._log.debug(
"Server '%s' returned error code %s.", server, result.status_code
)
except requests.exceptions.ReadTimeout as error:
self._log.warning(
"Server '%s' didn't respond on time. %s",
server,
error
)
except requests.exceptions.SSLError as error:
self._log.warning(
"Server '%s' don't have a valid SSL certificate. %s",
server,
error
)
except requests.exceptions.ConnectionError as error:
self._log.warning(
"Server '%s' connection failed. %s",
server,
error
)
return data
def discover(self):
'''Discover new servers'''
all_servers = []
new_servers_count = 0
for server in self.servers.items():
all_servers.append(server[0])
if not server[1]['private']:
self._log.debug("Fetching peers of the server '%s'", server[0])
data = self.get_path(server[0], 'api/v1/instance/peers')
if data:
for new_server in data:
new_servers_count += 1
self._log.debug(
"Adding new server '%s'",
new_server
)
all_servers.append(new_server)
self.write_record(
(new_server,
{
"name": new_server,
"last_update": time.time(),
"private": False
}
)
)
self._log.debug("Fetching public timeline in server '%s'", server[0])
data = self.get_timeline(server[0])
if data:
for item in data:
if 'uri' in item:
match_server = re.match(r'https?://([^/]*)/', item['uri'])
if match_server:
new_server = match_server.group(1)
if new_server not in all_servers:
data = self.get_timeline(new_server)
if data:
new_servers_count += 1
self._log.debug(
"Adding new server '%s'",
new_server
)
all_servers.append(new_server)
private = False
else:
private = True
self.write_record(
(new_server,
{
"name": new_server,
"last_update": time.time(),
"private": private
}
)
)
else:
# Item in public timeline don't have an URI
self._log.debug(
"Item don't have URI. %s",
item
)
else:
server[1]['private'] = True
self.write_record(server)
return new_servers_count
def write_record(self, record, table='servers'):
'''Write record to a table'''
cur = self.conn.cursor()
result_select = cur.execute(f"""
SELECT name FROM {table} WHERE name = '{record[1]['name']}'
""")
if len(result_select.fetchall()) > 0:
self._log.debug('Record exists, updating.')
query = f"""
UPDATE {table}
SET last_update = {record[1]['last_update']},
private = {record[1]['private']}
WHERE name = '{record[0]}'
"""
else:
self._log.debug('Record doesn\'t exist, inserting.')
query = f"""
INSERT INTO {table} (
name,
last_update,
private
)
VALUES (
'{record[0]}',
{record[1]['last_update']},
{record[1]['private']}
)
"""
result_update = cur.execute(query)
self._log.debug("Written record '%s' with ID %s",
record[0],
result_update.lastrowid
)
cur.close()
self.conn.commit()
def read_db(self):
'''Read database file'''
cur = self.conn.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS servers(
name TEXT PRIMARY KEY,
last_update REAL,
private INT
)""")
result_select = cur.execute("SELECT * FROM servers ORDER BY last_update DESC")
self.servers = {}
for item in result_select.fetchall():
self.servers[item[0]] = {
"name": item[0],
"last_update": item[1],
"private": item[2]
}
self._log.debug("There are %s servers in the database.", len(self.servers))
self.conn.commit()
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("discover-mastodon-servers")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "discover-mastodon-servers.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option('--log-file', '-l', help="File to store all debug messages.")
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click.option(
'--initial-server', '-i', default='mastodon.social',
help='First Mastodon server to reach to read public timeline and discover others.'
)
@click.option('--proxy', '-p', help='Proxy URL to use.')
@click.option(
'--database-file', '-d', default='mastodon-servers.db',
help='File with the database of results.'
)
@click_config_file.configuration_option()
def __main__(**kwargs):
return DiscoverMastodonServers(**kwargs)
if __name__ == "__main__":
__main__()