smtpd_watcher/smtpd_watcher/smtpd_watcher.py
2025-01-31 16:53:18 +02:00

306 lines
9.4 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2025 Antonio J. Delgado
"""SMTPd watcher for failed connections"""
import sys
import os
import logging
from logging.handlers import SysLogHandler
import subprocess
import json
import re
import click
import click_config_file
import mariadb
class SmtpdWatcher:
"""SMTPd watcher for failed connections"""
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'smtpd_watcher.log'
)
self._init_log()
self.banned_ips = self._read_banned_ips()
self.mail_users = self._get_mail_user()
if self.config['mail_log_file'] == '-':
logfile = sys.stdin
else:
logfile = open(self.config['mail_log_file'], 'r', encoding='utf-8')
for line in logfile:
if not self._process_log_file(line):
sys.exit(1)
def _read_banned_ips(self):
ips = {}
result = subprocess.run(
['/usr/bin/fail2ban-client', 'get', 'postfix', 'banned'],
encoding='utf-8',
check=True,
capture_output=True,
)
# self._log.debug(
# "Args: %s. Stdout: %s. Return code: %s. Stderr: %s",
# result.args,
# result.stdout,
# result.returncode,
# result.stderr,
# )
ips['postfix'] = result.stdout.replace("'", '').replace(',', '').replace(']', '').replace('[', '').split(' ')
self._log.debug(
"Banned IPs in postfix jail: %s",
ips['postfix']
)
result = subprocess.run(
['/usr/bin/fail2ban-client', 'get', 'postfix-sasl', 'banned'],
encoding='utf-8',
check=True,
capture_output=True,
)
ips['postfix-sasl'] = result.stdout.replace("'", '').replace(',', '').replace(']', '').replace('[', '').split(' ')
self._log.debug(
"Banned IPs in postfix-sasl jail: %s",
ips['postfix-sasl']
)
result = subprocess.run(
['ufw', 'status', 'numbered'],
encoding='utf-8',
check=True,
capture_output=True,
)
ips['ufw'] = []
for line in result.stdout:
if 'DENY IN' in line:
split_line = line.split(' ')
ips['ufw'].append(split_line[4])
self._log.debug(
"Traffic denied to IPs in UFW: %s",
ips['ufw']
)
return ips
def _process_log_file(self, line):
ban = False
if 'authentication failure' in line:
ip_match = re.search(r'\[([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})\]', line)
if ip_match:
ip = ip_match.group(1)
else:
self._log.debug(
"Didn't find an IP in log file '%s'",
line
)
return False
target_user_match = re.search(r'sasl_username=([^ ]*)', line)
if target_user_match:
target_user = target_user_match.group(1)
if not self._check_mail_user(target_user):
ban = True
else:
self._log.debug(
"There is no SASL username field in log line, so banning IP '%s'. Log line: '%s'",
ip,
line
)
ban = True
if ban:
if ip not in self.banned_ips['postfix']:
self._fail2ban_ban_ip('postfix', ip)
else:
self._log.debug(
"IP '%s' already banned in 'postfix' jail",
ip
)
if ip not in self.banned_ips['postfix-sasl']:
self._fail2ban_ban_ip('postfix-sasl', ip)
else:
self._log.debug(
"IP '%s' already banned in 'postfix-sasl' jail",
ip
)
if ip not in self.banned_ips['ufw']:
self._ufw_deny_ip(ip)
self._log.debug(
"IP '%s' already dennied traffic in UFW",
ip
)
return True
def _ufw_deny_ip(self, ip):
result = subprocess.run(
['/usr/sbin/ufw', 'deny', 'from', ip],
encoding='utf-8',
check=True,
capture_output=True,
)
self._log.debug(
"Denying traffic from IP '%s' in UFW result: %s",
ip,
result.stdout
)
if result.returncode == 0:
return True
return False
def _fail2ban_ban_ip(self, jail, ip):
result = subprocess.run(
['/usr/bin/fail2ban-client', 'set', jail, 'banip', ip],
encoding='utf-8',
check=True,
capture_output=True,
)
self._log.debug(
"Adding ban to IP '%s' in jail '%s' result: %s",
ip,
jail,
result.stdout
)
if result.returncode == 0:
return True
return False
def _check_mail_user(self, user):
if user != '':
for mail_user in self.mail_users:
if user in mail_user:
self._log.debug(
"User '%s' match mail database user '%s'",
user,
mail_user
)
return mail_user
return False
def _get_mail_user(self):
self._log.debug(
"Getting all mail users from database..."
)
mail_users = []
try:
conn = mariadb.connect(
host=self.config['db_host'],
port=self.config['db_port'],
user=self.config['db_user'],
password=self.config['db_password']
)
cur = conn.cursor()
cur.execute(self.config['db_sql_query'])
for email in cur:
mail_users.append(email)
except mariadb.Error as error:
self._log.error(
"Error connecting to the database: %s",
error
)
sys.exit(1)
conn.close()
self._log.debug(
"Obtained mail users: %s",
mail_users
)
return mail_users
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("smtpd_watcher")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "smtpd_watcher.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option('--log-file', '-l', help="File to store all debug messages.")
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click.option(
'--mail-log-file', '-m',
default='/var/log/mail.log', help='Mail log file to read'
)
@click.option(
'--db-host', '-H',
default='127.0.0.1',
help='MariaDB host name for mail database'
)
@click.option(
'--db-port', '-p',
default=3306,
help='MariaDB host port for mail database'
)
@click.option(
'--db-user', '-u',
default=os.environ['USER'],
help='MariaDB user name for mail database'
)
@click.option(
'--db-password', '-P',
default='',
help='MariaDB user password for mail database'
)
@click.option(
'--db-sql-query', '-q',
default='SELECT email FROM mail.users',
help='MariaDB SQL query to get all users\' emails'
)
@click_config_file.configuration_option()
def __main__(**kwargs):
return SmtpdWatcher(**kwargs)
if __name__ == "__main__":
__main__()