Initial script

This commit is contained in:
Antonio J. Delgado 2025-03-26 18:07:56 +02:00
parent 0c88de50f3
commit 7b066379c1

View file

@ -9,8 +9,13 @@ import sys
import os
import json
import time
import re
import logging
from logging.handlers import SysLogHandler
import imaplib
import email
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import click
import click_config_file
@ -46,6 +51,153 @@ class BackupImap:
"last_update": 0,
}
self.data = self._read_cached_data()
if not os.path.exists(self.config['destination_folder']):
os.mkdir(self.config['destination_folder'])
self.imap = None
self._connect_imap(
imap_server=self.config['imap_server'],
imap_port=self.config['imap_port'],
ssl=self.config['use_ssl'],
imap_user=self.config['imap_user'],
imap_password=self.config['imap_password'],
)
self._process_mailbox(self.config['mailbox'])
def _process_mailbox(self, mailbox):
self._log.debug(
"Seleting mailbox '%s'",
mailbox
)
if mailbox == '':
real_mailbox = 'INBOX'
else:
real_mailbox = mailbox
self.imap.select(mailbox=f"\"{real_mailbox}\"", readonly=False)
self._log.debug(
"Searching for all messages in '%s'...",
mailbox
)
mailbox_path = os.path.join(
self.config['destination_folder'],
mailbox
)
if not os.path.exists(os.path.dirname(mailbox_path)):
os.mkdir(os.path.dirname(mailbox_path))
if not os.path.exists(mailbox_path):
os.mkdir(mailbox_path)
search_response, search_data = self.imap.search('UTF-8', 'UNDELETED')
if search_response == 'OK':
all_msgs_uids = search_data[0].split()
self._log.debug(
"Found %s messages",
len(all_msgs_uids)
)
msg_counter = 1
for message_uid in all_msgs_uids:
self._log.debug(
"Processing message '%s' (%s/%s)",
message_uid.decode(),
msg_counter,
len(all_msgs_uids)
)
msg_counter += 1
fetch_response, fetch_data = self.imap.fetch(message_uid, "(RFC822)")
if fetch_response == 'OK':
subject = f'__no_subject__{message_uid}'
data = fetch_data[0][1]
subject_match = re.search(rb'Subject: (.*)\r\n', data)
if subject_match:
subject = subject_match.group(1).decode().replace(os.path.sep, '_')
message_path = os.path.join(
mailbox_path,
subject
)
original_subject = subject
counter = 1
while os.path.exists(message_path):
subject = f"{original_subject}_{counter}"
message_path = os.path.join(
mailbox_path,
subject
)
counter += 1
try:
with open(message_path, 'wb') as file_pointer:
file_pointer.write(data)
except OSError as error:
if error.errno == 36: # File name too long
message_path = os.path.join(
mailbox_path,
f"message_uid_{message_uid.decode()}"
)
with open(message_path, 'wb') as file_pointer:
file_pointer.write(data)
else:
self._log.error(
"Error writing email '%s'. %s",
message_path,
error
)
self._log.debug("Searching for all messages in mailbox '%s'...", mailbox)
list_response, list_data = self.imap.list(f"\"{mailbox}\"", '*')
if list_response == 'OK':
for subdir in list_data:
sub_mailbox = self._parse_mailbox(subdir)
if sub_mailbox != mailbox:
self._process_mailbox(self._parse_mailbox(subdir))
def _parse_mailbox(self, data):
result = data
match = re.match(rb'\((.*)\) "(.*)" (.*)', data)
if match:
result = match.group(3).replace(b'"', b'').decode()
return result
def _connect_imap(self, imap_server, imap_port, ssl, imap_user, imap_password):
'''Create connection object to the IMAP server'''
self._log.debug(
'Connecting to server %s:%s...',
imap_server,
imap_port,
)
if ssl:
try:
self.imap = imaplib.IMAP4_SSL(imap_server, imap_port)
except imaplib.IMAP4.error as error:
self._log.error(
"Error connecting securely to IMAP server '%s'. %s",
imap_server,
error,
)
sys.exit(1)
else:
try:
self.imap = imaplib.IMAP4(imap_server, imap_port)
except imaplib.IMAP4.error as error:
self._log.error(
"Error connecting to IMAP server '%s'. %s",
imap_server,
error,
)
sys.exit(2)
try:
self._log.debug('Authenticating as user %s...', imap_user)
result, data = self.imap.login(imap_user, imap_password)
if result != 'OK':
self._log.error(
"Error login into IMAP server. %s",
data
)
sys.exit(3)
except imaplib.IMAP4.error as error:
self._log.error(
"Error while login as '%s'. %s'",
imap_user,
error,
)
self.imap.close()
sys.exit(4)
def close(self):
'''Close class and save data'''
@ -130,15 +282,64 @@ class BackupImap:
@click.option(
'--log-file',
'-l',
default=f"{LOG_FOLDER}/__project_code_name__.log",
default=f"{LOG_FOLDER}/backup_imap.log",
help="File to store all debug messages."
)
@click.option(
'--cache-file',
'-f',
default=f"{CACHE_FOLDER}/__project_code_name__.json",
default=f"{CACHE_FOLDER}/backup_imap.json",
help='Cache file to store data from each run',
)
@click.option(
'--imap-server',
'-s',
required=True,
help='Hostname or IP of the IMAP server'
)
@click.option(
'--imap-port',
'-p',
default=993,
help='IMAP port to contact the server'
)
@click.option(
'--use-ssl',
'-S',
is_flag=True,
default=True,
help='Use SSL to contact the IMAP server'
)
@click.option(
'--imap-user',
'-u',
required=True,
help='User to connect to IMAP server'
)
@click.option(
'--imap-password',
'-P',
required=True,
help='User password to conect to IMAP server'
)
@click.option(
'--mailbox',
'-m',
default='INBOX',
help='Mailbox to backup'
)
@click.option(
'--destination-folder',
'-F',
required=True,
help='Folder to save the messages and folders'
)
@click.option(
'--batch-size',
'-b',
default=1000,
help='Maximun number of messages to fetch per request'
)
@click.option(
'--max-cache-age',
'-a',