Add initial script

This commit is contained in:
Antonio J. Delgado 2024-09-04 13:40:46 +03:00
parent 8216520fe6
commit 6178df97b6
2 changed files with 281 additions and 3 deletions

View file

@ -9,11 +9,16 @@ import sys
import os
import logging
from logging.handlers import SysLogHandler
import imaplib
import email
from signal import signal, SIGINT
import json
import re
import click
import click_config_file
class ImapFilter:
'''IMAP filter tool'''
def __init__(self, **kwargs):
self.config = kwargs
@ -30,6 +35,241 @@ class ImapFilter:
'imap_filter.log'
)
self._init_log()
signal(SIGINT, self._signal_handler)
self._convert_filters()
if self.config['filters_file']:
self._read_filters_file()
if len(self.config['filter']) == 0:
self._log.error(
"You must indicate either a filter or a filters-file. Use --help to see more details."
)
sys.exit(1)
self.connect_imap()
self._process_filters()
def _read_filters_file(self):
with open(self.config['filters_file'], 'r', encoding='UTF-8') as filters_file:
new_filters = json.load(filters_file)
for new_filter in new_filters:
self.config['filter'].append(new_filter)
def _process_filters(self):
for mfilter in self.config['filter']:
self.matches = 0
self._log.debug("Processing filter '%s'...", mfilter)
self.imap.select(mailbox=mfilter['mailbox'], readonly=False)
self._log.debug("Searching for all messages in mailbox '%s'...", mfilter['mailbox'])
typ, data = self.imap.search('UTF-8', 'ALL')
if typ != 'OK':
self._log.error('Error, server replied: %s', data)
return False
all_msgs_uids = data[0].split()
total_msgs = len(all_msgs_uids)
self._log.debug(
"Processing %s messages in mailbox '%s'...",
total_msgs,
mfilter['mailbox']
)
msg_count = 0
for message_id in all_msgs_uids:
msg_count += 1
self._log.debug(
"Fetching message %s of %s...",
msg_count,
total_msgs
)
try:
typ, data = self.imap.fetch(message_id, '(RFC822)')
if typ != 'OK':
self._log.error('Error, server replied: %s', data)
return False
typ, unseen_data = self.imap.store(message_id, '-FLAGS', '\\Seen')
if typ != 'OK':
self._log.error('Error, server replied: %s', unseen_data)
return False
self._process_message(message_id, data[0], mfilter)
except imaplib.IMAP4.error as error:
self._log.error(
"Error fetching message. %s",
error
)
self._log.debug(
"A total of %s matches for this filter",
self.matches
)
self.imap.expunge()
def _process_message(self, message_id, data, mfilter):
'''Process a mail message'''
if isinstance(data[1], int):
self._log.warning(
"Response part is integer %s in data '%s'. Try again.",
data[1],
data[0]
)
return False
part = data[1].decode('utf-8')
message = email.message_from_string(part)
decoded_field = email.header.decode_header(message.get(mfilter['field'], ""))
if isinstance(decoded_field[0][0], str):
field_data = decoded_field[0][0]
else:
field_data = decoded_field[0][0].decode()
match = re.match(mfilter['regexp'], field_data)
if match:
self._log.info(
"Field '%s' => '%s', matches '%s'",
mfilter['field'],
field_data,
mfilter['regexp']
)
self.matches += 1
if self.config['dummy']:
self._log.info('Doing nothing (dummy run)')
else:
self._do_filter(message_id, mfilter)
def _do_filter(self, message_id, mfilter):
if f"_action_{mfilter['action']}" in dir(self):
function = getattr(self, f"_action_{mfilter['action']}")
result = function(message_id, mfilter)
self._log.debug(
"Result: %s",
result
)
def _action_move(self, message_id, mfilter):
self._log.debug(
"Moving message '%s' to '%s'...",
message_id,
mfilter['destination']
)
if self._action_copy(message_id, mfilter):
self._action_delete(message_id, None)
def _create_mailbox(self, mailbox):
self._log.debug(
"Creating mailbox '%s'...",
mailbox
)
typ, data = self.imap.create(mailbox)
if typ != 'OK':
self._log.error(
'Error creating mailbox %s, server replied: %s',
mailbox,
data
)
return False
return True
def _action_copy(self, message_id, mfilter):
self._log.debug(
"Copying message '%s' to '%s'...",
message_id,
mfilter['destination']
)
typ, data = self.imap.copy(message_id, mfilter['destination'])
if typ != 'OK':
if b'[TRYCREATE]' in data[0]:
if self._create_mailbox(mfilter['destination']):
typ, data = self.imap.copy(message_id, mfilter['destination'])
if typ != 'OK':
self._log.error(
'Error copying message to %s, server replied: %s',
mfilter['destination'],
data
)
return False
else:
self._log.error(
'Error copying message to %s, server replied: %s',
mfilter['destination'],
data
)
return False
return True
def _action_delete(self, message_id, mfilter):
self._log.debug(
"Deleting message '%s'...",
message_id
)
typ, data = self.imap.store(message_id, '+FLAGS', '\\Deleted')
if typ != 'OK':
self._log.error(
'Error marking message %s as seen, server replied: %s',
message_id,
data
)
return False
return True
def _action_mark_seen(self, message_id, mfilter):
self._log.debug(
"Marking as seen message '%s'...",
message_id
)
typ, seen_data = self.imap.store(message_id, '+FLAGS', '\\Seen')
if typ != 'OK':
self._log.error(
'Error marking message %s as seen, server replied: %s',
message_id,
seen_data
)
return False
return True
def _convert_filters(self):
new_filter = []
for old_filter in self.config['filter']:
new_filter.append(json.loads(old_filter))
self.config['filter'] = new_filter
def connect_imap(self):
'''Create connection object to the IMAP server'''
self._log.debug(
'Connecting to server %s:%s...',
self.config['imap_server'],
self.config['imap_port']
)
if self.config['ssl']:
try:
self.imap = imaplib.IMAP4_SSL(self.config['imap_server'], self.config['imap_port'])
except Exception as error:
self._log.error(
"Error connecting securely to IMAP server '%s'. %s",
self.config['imap_server'],
error
)
sys.exit(1)
else:
try:
self.imap = imaplib.IMAP4(self.config['imap_server'], self.config['imap_port'])
except Exception as error:
self._log.error(
"Error connecting to IMAP server '%s'. %s",
self.config['imap_server'],
error
)
sys.exit(2)
try:
self._log.debug('Authenticating as user %s...', self.config['imap_user'])
self.imap.login(self.config['imap_user'], self.config['imap_password'])
except Exception as error:
self._log.error("Error while login as '%s'. %s'", self.config['imap_user'], error)
self.imap.close()
sys.exit(3)
def _signal_handler(self, signal_received, frame):
# Handle any cleanup here
self._log.info(
'SIGINT or CTRL-C (%s %s) detected. Exiting gracefully',
signal_received,
frame
)
self.imap.close()
self.imap.logout()
sys.exit(0)
def _init_log(self):
''' Initialize log object '''
@ -83,8 +323,36 @@ class ImapFilter:
help='Set the debug level for the standard output.'
)
@click.option('--log-file', '-l', help="File to store all debug messages.")
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click.option(
"--dummy","-n", is_flag=True,
help="Don't do anything, just show what would be done."
)
@click.option('--imap-server', '-s', default='localhost', help='IMAP server')
@click.option(
'--imap-port', '-p', default=993,
type=click.IntRange(1, 65535), help='IMAP server port'
)
@click.option(
'--imap-user', '-u', required=True,
help='User name to use for the connection to the IMAP server'
)
@click.option(
'--imap-password', '-P', required=True,
help='Password to connect to the IMAP server. Warning! Use a configuration file to avoid revelaing your passwords.'
)
@click.option(
'--ssl', '-S', default=True,
help='Whether to use a secure connection or not.'
)
@click.option(
'--filter', '-f', required=False,
multiple=True,
help='Filter rule.'
)
@click.option(
'--filters-file', '-F', required=False,
help='JSON file containing a list of dictionaries with the filter rules.'
)
@click_config_file.configuration_option()
def __main__(**kwargs):
return ImapFilter(**kwargs)

10
test.json Normal file
View file

@ -0,0 +1,10 @@
[
{
"name": "python",
"mailbox": "Feeds/Mastodon/Test",
"field": "Subject",
"regexp": "^(?=.*nvidia.*|.*Nvidia.*|.*ansible.*|.*Ansible.*|.*ubuntu.*|.*Ubuntu.*|.*blender.*|.*Blender.*|.*technology.*|.*Technology.*|.*msdos.*|.*dosbox.*|.*python.*|.*Python.*|.*devops.*|.*DevOps.*|.*forgejo.*|.*Forgejo.*|.*smartphone.*|.*Smartphone.*|.*SmartPhone.*|.*Android.*|.*android.*|.*github.*|.*Github.*|.*gitlab.*|.*Gitlab.*|.*#programming.*|.*TechCrunch.*|.*researchbuzz.*|.*ripencc.*|.*FCAI.*|.*TechDesk.*|.*#selfhosting.*|.*#selfhosted.*|.*#ai.*|.*#deepfake.*|.*#chatgpt.*|.*#tietotekniikka.*|.*#videogames.*|.*#software.*|.*#retrogaming.*|.*#web.*|.*#gaming.*|.*#pcgaming.*|.*#gamedev.*|.*#fairphone.*|.*#ebike.*|.*#windows.*|.*#speedrun.*|.*#cloud.*|.*#euhosted.*|.*#python.*|.*#steamdeck.*|.*#indiegame.*|.*#webdev.*|.*#rustlang.*|.*#valve.*|.*#intel.*|.*#dns.*|.*#digitaljustice.*|.*#tv.*)",
"action": "move",
"destination": "Feeds/Mastodon/Test/tech"
}
]