From 7b066379c1ffffe963eec7d120ac0f4c6e693e00 Mon Sep 17 00:00:00 2001 From: "Antonio J. Delgado" Date: Wed, 26 Mar 2025 18:07:56 +0200 Subject: [PATCH] Initial script --- backup_imap/backup_imap.py | 205 ++++++++++++++++++++++++++++++++++++- 1 file changed, 203 insertions(+), 2 deletions(-) diff --git a/backup_imap/backup_imap.py b/backup_imap/backup_imap.py index 9942003..63e8911 100644 --- a/backup_imap/backup_imap.py +++ b/backup_imap/backup_imap.py @@ -9,8 +9,13 @@ import sys import os import json import time +import re import logging from logging.handlers import SysLogHandler +import imaplib +import email +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText import click import click_config_file @@ -46,6 +51,153 @@ class BackupImap: "last_update": 0, } self.data = self._read_cached_data() + if not os.path.exists(self.config['destination_folder']): + os.mkdir(self.config['destination_folder']) + + self.imap = None + self._connect_imap( + imap_server=self.config['imap_server'], + imap_port=self.config['imap_port'], + ssl=self.config['use_ssl'], + imap_user=self.config['imap_user'], + imap_password=self.config['imap_password'], + ) + self._process_mailbox(self.config['mailbox']) + + def _process_mailbox(self, mailbox): + self._log.debug( + "Seleting mailbox '%s'", + mailbox + ) + if mailbox == '': + real_mailbox = 'INBOX' + else: + real_mailbox = mailbox + self.imap.select(mailbox=f"\"{real_mailbox}\"", readonly=False) + self._log.debug( + "Searching for all messages in '%s'...", + mailbox + ) + mailbox_path = os.path.join( + self.config['destination_folder'], + mailbox + ) + if not os.path.exists(os.path.dirname(mailbox_path)): + os.mkdir(os.path.dirname(mailbox_path)) + if not os.path.exists(mailbox_path): + os.mkdir(mailbox_path) + search_response, search_data = self.imap.search('UTF-8', 'UNDELETED') + if search_response == 'OK': + all_msgs_uids = search_data[0].split() + self._log.debug( + "Found %s messages", + len(all_msgs_uids) + ) + msg_counter = 1 + for message_uid in all_msgs_uids: + self._log.debug( + "Processing message '%s' (%s/%s)", + message_uid.decode(), + msg_counter, + len(all_msgs_uids) + ) + msg_counter += 1 + fetch_response, fetch_data = self.imap.fetch(message_uid, "(RFC822)") + if fetch_response == 'OK': + subject = f'__no_subject__{message_uid}' + data = fetch_data[0][1] + subject_match = re.search(rb'Subject: (.*)\r\n', data) + if subject_match: + subject = subject_match.group(1).decode().replace(os.path.sep, '_') + message_path = os.path.join( + mailbox_path, + subject + ) + original_subject = subject + counter = 1 + while os.path.exists(message_path): + subject = f"{original_subject}_{counter}" + message_path = os.path.join( + mailbox_path, + subject + ) + counter += 1 + try: + with open(message_path, 'wb') as file_pointer: + file_pointer.write(data) + except OSError as error: + if error.errno == 36: # File name too long + message_path = os.path.join( + mailbox_path, + f"message_uid_{message_uid.decode()}" + ) + with open(message_path, 'wb') as file_pointer: + file_pointer.write(data) + else: + self._log.error( + "Error writing email '%s'. %s", + message_path, + error + ) + self._log.debug("Searching for all messages in mailbox '%s'...", mailbox) + list_response, list_data = self.imap.list(f"\"{mailbox}\"", '*') + if list_response == 'OK': + for subdir in list_data: + sub_mailbox = self._parse_mailbox(subdir) + if sub_mailbox != mailbox: + self._process_mailbox(self._parse_mailbox(subdir)) + + def _parse_mailbox(self, data): + result = data + match = re.match(rb'\((.*)\) "(.*)" (.*)', data) + if match: + result = match.group(3).replace(b'"', b'').decode() + return result + + def _connect_imap(self, imap_server, imap_port, ssl, imap_user, imap_password): + '''Create connection object to the IMAP server''' + self._log.debug( + 'Connecting to server %s:%s...', + imap_server, + imap_port, + ) + if ssl: + try: + self.imap = imaplib.IMAP4_SSL(imap_server, imap_port) + except imaplib.IMAP4.error as error: + self._log.error( + "Error connecting securely to IMAP server '%s'. %s", + imap_server, + error, + ) + sys.exit(1) + else: + try: + self.imap = imaplib.IMAP4(imap_server, imap_port) + except imaplib.IMAP4.error as error: + self._log.error( + "Error connecting to IMAP server '%s'. %s", + imap_server, + error, + ) + sys.exit(2) + try: + self._log.debug('Authenticating as user %s...', imap_user) + result, data = self.imap.login(imap_user, imap_password) + if result != 'OK': + self._log.error( + "Error login into IMAP server. %s", + data + ) + sys.exit(3) + except imaplib.IMAP4.error as error: + self._log.error( + "Error while login as '%s'. %s'", + imap_user, + error, + ) + self.imap.close() + sys.exit(4) def close(self): '''Close class and save data''' @@ -130,15 +282,64 @@ class BackupImap: @click.option( '--log-file', '-l', - default=f"{LOG_FOLDER}/__project_code_name__.log", + default=f"{LOG_FOLDER}/backup_imap.log", help="File to store all debug messages." ) @click.option( '--cache-file', '-f', - default=f"{CACHE_FOLDER}/__project_code_name__.json", + default=f"{CACHE_FOLDER}/backup_imap.json", help='Cache file to store data from each run', ) +@click.option( + '--imap-server', + '-s', + required=True, + help='Hostname or IP of the IMAP server' +) +@click.option( + '--imap-port', + '-p', + default=993, + help='IMAP port to contact the server' +) +@click.option( + '--use-ssl', + '-S', + is_flag=True, + default=True, + help='Use SSL to contact the IMAP server' +) +@click.option( + '--imap-user', + '-u', + required=True, + help='User to connect to IMAP server' +) +@click.option( + '--imap-password', + '-P', + required=True, + help='User password to conect to IMAP server' +) +@click.option( + '--mailbox', + '-m', + default='INBOX', + help='Mailbox to backup' +) +@click.option( + '--destination-folder', + '-F', + required=True, + help='Folder to save the messages and folders' +) +@click.option( + '--batch-size', + '-b', + default=1000, + help='Maximun number of messages to fetch per request' +) @click.option( '--max-cache-age', '-a',