Add banned hostnames

This commit is contained in:
Antonio J. Delgado 2024-01-19 20:08:07 +02:00
parent ce4609068d
commit f614cbdceb

View file

@ -12,6 +12,7 @@ from logging.handlers import SysLogHandler
import sqlite3 import sqlite3
import time import time
import re import re
import json
import click import click
import click_config_file import click_config_file
import requests import requests
@ -63,13 +64,21 @@ class DiscoverMastodonServers:
timeout=10 timeout=10
) )
if result.status_code < 400: if result.status_code < 400:
if 'application/json' in result.headers['Content-Type']: if 'Content-Type' in result.headers:
data = result.json() if 'application/json' in result.headers['Content-Type']:
if 'error' not in data: data = result.json()
return data if 'error' not in data:
return data
else:
self._log.debug(
"Server '%s' didn't reply with JSON data.", server
)
else: else:
self._log.debug( self._log.debug(
"Server '%s' didn't reply with JSON data.", server "Server '%s' didn't return Content-Type header. Headers: '%s'. Content returned: '%s'",
server,
json.dumps(result.headers, indent=2),
result.content
) )
else: else:
self._log.debug( self._log.debug(
@ -95,70 +104,103 @@ class DiscoverMastodonServers:
) )
return data return data
def get_instance_info(self, server):
'''Get all server information'''
result = {}
instance = self.get_path(server, '/api/v1/instance')
if instance:
result['instance'] = instance
directory = []
result['directory'] = directory
offset=0
while len(directory) == 0:
directory = self.get_path(
server,
f"/api/v1/directory?limit=80&offset={offset}"
)
if directory:
result['directory'] = result['directory'] + directory
offset += 80
return result
def test_banned_server(self, server):
'''Check if a server name match agains any banned regular expressions'''
for banned in self.config['regexp_banned_host']:
match = re.search(banned, server)
if match:
self._log.debug(
"Regexp '%s' match server '%s'",
banned,
server
)
return True
return False
def discover(self): def discover(self):
'''Discover new servers''' '''Discover new servers'''
all_servers = [] all_servers = []
new_servers_count = 0 new_servers_count = 0
for server in self.servers.items(): for server in self.servers.items():
all_servers.append(server[0]) all_servers.append(server[0])
if not server[1]['private']: if not self.test_banned_server(server[0]):
self._log.debug("Fetching peers of the server '%s'", server[0]) if not server[1]['private']:
data = self.get_path(server[0], 'api/v1/instance/peers') self._log.debug("Fetching peers of the server '%s'", server[0])
if data: data = self.get_path(server[0], 'api/v1/instance/peers')
for new_server in data: if data:
new_servers_count += 1 for new_server in data:
self._log.debug( new_servers_count += 1
"Adding new server '%s'",
new_server
)
all_servers.append(new_server)
self.write_record(
(new_server,
{
"name": new_server,
"last_update": time.time(),
"private": False
}
)
)
self._log.debug("Fetching public timeline in server '%s'", server[0])
data = self.get_timeline(server[0])
if data:
for item in data:
if 'uri' in item:
match_server = re.match(r'https?://([^/]*)/', item['uri'])
if match_server:
new_server = match_server.group(1)
if new_server not in all_servers:
data = self.get_timeline(new_server)
if data:
new_servers_count += 1
self._log.debug(
"Adding new server '%s'",
new_server
)
all_servers.append(new_server)
private = False
else:
private = True
self.write_record(
(new_server,
{
"name": new_server,
"last_update": time.time(),
"private": private
}
)
)
else:
# Item in public timeline don't have an URI
self._log.debug( self._log.debug(
"Item don't have URI. %s", "Adding new server '%s'",
item new_server
) )
else: all_servers.append(new_server)
server[1]['private'] = True self.write_record(
self.write_record(server) (new_server,
{
"name": new_server,
"last_update": time.time(),
"private": False
}
)
)
self._log.debug("Fetching public timeline in server '%s'", server[0])
data = self.get_timeline(server[0])
if data:
for item in data:
if 'uri' in item:
match_server = re.match(r'https?://([^/]*)/', item['uri'])
if match_server:
new_server = match_server.group(1)
if new_server not in all_servers:
data = self.get_timeline(new_server)
if data:
new_servers_count += 1
self._log.debug(
"Adding new server '%s'",
new_server
)
all_servers.append(new_server)
private = False
else:
private = True
self.write_record(
(new_server,
{
"name": new_server,
"last_update": time.time(),
"private": private
}
)
)
else:
# Item in public timeline don't have an URI
self._log.debug(
"Item don't have URI. %s",
item
)
else:
server[1]['private'] = True
self.write_record(server)
return new_servers_count return new_servers_count
def write_record(self, record, table='servers'): def write_record(self, record, table='servers'):
@ -279,6 +321,10 @@ class DiscoverMastodonServers:
'--database-file', '-d', default='mastodon-servers.db', '--database-file', '-d', default='mastodon-servers.db',
help='File with the database of results.' help='File with the database of results.'
) )
@click.option(
'--regexp-banned-host', '-r', multiple=True,
help='Regular expression for banned host names.'
)
@click_config_file.configuration_option() @click_config_file.configuration_option()
def __main__(**kwargs): def __main__(**kwargs):
return DiscoverMastodonServers(**kwargs) return DiscoverMastodonServers(**kwargs)