Initial commit

This commit is contained in:
Antonio J. Delgado 2024-01-19 15:33:49 +02:00
commit 2b9065fb0f
9 changed files with 498 additions and 0 deletions

145
.gitignore vendored Normal file
View file

@ -0,0 +1,145 @@
Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Configuration files
*.conf
*.ini
# Database files
*.db

0
LICENSE Normal file
View file

23
README.md Normal file
View file

@ -0,0 +1,23 @@
# discover-mastodon-servers
## Requirements
## Installation
### Linux
```bash
sudo python3 setup.py install
```
### Windows (from PowerShell)
```powershell
& $(where.exe python).split()[0] setup.py install
```
## Usage
```bash
discover-mastodon-servers.py [--debug-level|-d CRITICAL|ERROR|WARNING|INFO|DEBUG|NOTSET] # Other parameters
```

View file

View file

@ -0,0 +1,270 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2024 Antonio J. Delgado
"""Discover Mastodon servers by looking at public timelines"""
import sys
import os
import logging
from logging.handlers import SysLogHandler
import sqlite3
import time
import json
import re
import click
import click_config_file
import requests
class DiscoverMastodonServers:
'''Class to Discover Mastodon Servers'''
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'discover-mastodon-servers.log'
)
self._init_log()
self.session = requests.Session()
self.session.proxies.update({ 'https': self.config['proxy']})
self.conn = sqlite3.connect(self.config['database_file'])
self.read_db()
if len(self.servers) == 0:
self._log.debug("Adding initial server.")
self.servers[self.config['initial_server']] = {
"name": self.config['initial_server'],
"last_update": time.time(),
"private": False
}
new_servers_count = 1
while new_servers_count > 0:
new_servers_count = self.discover()
def discover(self):
'''Discover new servers'''
all_servers = []
new_servers_count = 0
for server in self.servers.items():
all_servers.append(server[0])
if not server[1]['private']:
self._log.debug("Fetching public timeline in server '%s'", server[0])
try:
result = self.session.get(
f"https://{server[0]}/api/v1/timelines/public",
timeout=10
)
if result.status_code < 400:
if 'application/json' in result.headers['Content-Type']:
data = result.json()
if 'error' in data:
if data['error'] == 'This method requires an authenticated user':
server[1]['private'] = True
self.write_record(server)
else:
for item in data:
if 'uri' in item:
match_server = re.match(r'https?://([^/]*)/', item['uri'])
if match_server:
new_server = match_server.group(1)
if new_server not in all_servers:
try:
new_result = self.session.get(
f"https://{new_server}/api/v1/timelines/public",
timeout=10
)
if new_result.status_code < 400:
new_servers_count += 1
self._log.debug(
"Adding new server '%s'",
new_server
)
all_servers.append(new_server)
# self._log.debug(new_result.headers['Content-Type'])
# self._log.debug(new_result.content)
if 'application/json' in new_result.headers['Content-Type']:
data = new_result.json()
# except requests.exceptions.JSONDecodeError:
private = False
if 'error' in data:
if data['error'] == 'This method requires an authenticated user':
private = True
self.write_record(
(new_server,
{
"name": new_server,
"last_update": time.time(),
"private": private
}
)
)
else:
self._log.debug(
"Server '%s' didn't reply with JSON encoded data",
new_server
)
private = True
self.write_record(
(new_server,
{
"name": new_server,
"last_update": time.time(),
"private": private
}
)
)
except requests.exceptions.ReadTimeout:
self._log.warning(
"Server '%s' didn't respond on time.",
new_server
)
else:
# Item in public timeline don't have an URI
self._log.debug(
"Item don't have URI. %s",
item
)
else:
self._log.debug(
"Server '%s' didn't reply with JSON data.",
server[0]
)
except requests.exceptions.ReadTimeout:
self._log.warning("Server '%s' didn't respond on time.", server[0])
return new_servers_count
def write_record(self, record, table='servers'):
'''Write record to a table'''
cur = self.conn.cursor()
result_select = cur.execute(f"""
SELECT name FROM {table} WHERE name = '{record[1]['name']}'
""")
if len(result_select.fetchall()) > 0:
self._log.debug('Record exists, updating.')
query = f"""
UPDATE {table}
SET last_update = {record[1]['last_update']},
private = {record[1]['private']}
WHERE name = '{record[0]}'
"""
else:
self._log.debug('Record doesn\'t exist, inserting.')
query = f"""
INSERT INTO {table} (
name,
last_update,
private
)
VALUES (
'{record[0]}',
{record[1]['last_update']},
{record[1]['private']}
)
"""
result_update = cur.execute(query)
self._log.debug("Written record '%s' with ID %s",
record[0],
result_update.lastrowid
)
cur.close()
self.conn.commit()
def read_db(self):
'''Read database file'''
cur = self.conn.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS servers(
name TEXT PRIMARY KEY,
last_update REAL,
private INT
)""")
result_select = cur.execute("SELECT * FROM servers ORDER BY last_update DESC")
self.servers = {}
for item in result_select.fetchall():
self.servers[item[0]] = {
"name": item[0],
"last_update": item[1],
"private": item[2]
}
self._log.debug("There are %s servers in the database.", len(self.servers))
self.conn.commit()
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("discover-mastodon-servers")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "discover-mastodon-servers.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option('--log-file', '-l', help="File to store all debug messages.")
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click.option(
'--initial-server', '-i', default='mastodon.social',
help='First Mastodon server to reach to read public timeline and discover others.'
)
@click.option('--proxy', '-p', help='Proxy URL to use.')
@click.option(
'--database-file', '-d', default='mastodon-servers.db',
help='File with the database of results.'
)
@click_config_file.configuration_option()
def __main__(**kwargs):
return DiscoverMastodonServers(**kwargs)
if __name__ == "__main__":
__main__()

25
pyproject.toml Normal file
View file

@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[project.urls]
Homepage = ""
[project]
name = "discover-mastodon-servers"
version = "0.0.1"
description = "Discover Mastodon servers by looking at public timelines"
readme = "README.md"
authors = [{ name = "Antonio J. Delgado", email = "" }]
license = { file = "LICENSE" }
classifiers = [
"License :: OSI Approved :: GPLv3 License",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
]
#keywords = ["vCard", "contacts", "duplicates"]
dependencies = [
"click",
"click_config_file",
]
requires-python = ">=3"

3
requirements.txt Normal file
View file

@ -0,0 +1,3 @@
click
click_config_file
requests

9
setup.cfg Normal file
View file

@ -0,0 +1,9 @@
[metadata]
name = discover-mastodon-servers
version = 0.0.1
[options]
packages = discover-mastodon-servers
install_requires =
requests
importlib; python_version == "3.10"

23
setup.py Normal file
View file

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Setup script"""
import configparser
import setuptools
config = configparser.ConfigParser()
config.read('setup.cfg')
setuptools.setup(
scripts=['discover-mastodon-servers/discover-mastodon-servers.py'],
author="Antonio J. Delgado",
version=config['metadata']['version'],
name=config['metadata']['name'],
author_email="",
url="",
description="Discover Mastodon servers by looking at public timelines",
long_description="README.md",
long_description_content_type="text/markdown",
license="GPLv3",
# keywords=["my", "script", "does", "things"]
)