#!/usr/bin/env python3 # -*- encoding: utf-8 -*- # # This script is licensed under GNU GPL version 2.0 or above # (c) 2025 Antonio J. Delgado """Backup an IMAP account into a folder""" import sys import os import json import time import re import logging from logging.handlers import SysLogHandler import imaplib import email from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import click import click_config_file HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/')) if HOME_FOLDER == '/': CACHE_FOLDER = '/var/cache' LOG_FOLDER = '/var/log/' else: CACHE_FOLDER = f"{HOME_FOLDER}/.local/" LOG_FOLDER = f"{HOME_FOLDER}/log/" class BackupImap: """Backup an IMAP account into a folder""" def __init__(self, **kwargs): self.config = kwargs if 'log_file' not in kwargs or kwargs['log_file'] is None: self.config['log_file'] = os.path.join( os.environ.get( 'HOME', os.environ.get( 'USERPROFILE', os.getcwd() ) ), 'log', 'backup_imap.log' ) self._init_log() self._default_data = { "last_update": 0, "backedup_messages": [], } self.data = self._read_cached_data() if not os.path.exists(self.config['destination_folder']): os.mkdir(self.config['destination_folder']) self.imap = None imaplib._MAXLINE = 100000 self._connect_imap( imap_server=self.config['imap_server'], imap_port=self.config['imap_port'], ssl=self.config['use_ssl'], imap_user=self.config['imap_user'], imap_password=self.config['imap_password'], ) self._process_mailbox(self.config['mailbox']) def _process_mailbox(self, mailbox): self._log.debug("Searching for all mailboxes in mailbox '%s'...", mailbox) list_response, list_data = self.imap.list(f"\"{mailbox}\"", '*') self._log.debug( "Mailboxes: %s", list_data ) if list_response == 'OK': for subdir in list_data: if subdir: sub_mailbox = self._parse_mailbox(subdir) if sub_mailbox != mailbox: self._process_mailbox(self._parse_mailbox(subdir)) self._log.debug( "Seleting mailbox '%s'", mailbox ) if mailbox == '': real_mailbox = 'INBOX' else: real_mailbox = mailbox self.imap.select(mailbox=f"\"{real_mailbox}\"", readonly=False) self._log.debug( "Searching for all messages in '%s'...", mailbox ) mailbox_path = os.path.join( self.config['destination_folder'], mailbox ) if not os.path.exists(os.path.dirname(mailbox_path)): os.mkdir(os.path.dirname(mailbox_path)) if not os.path.exists(mailbox_path): os.mkdir(mailbox_path) search_response, search_data = self.imap.search('UTF-8', 'UNDELETED') if search_response == 'OK': all_msgs_uids = search_data[0].split() self._log.debug( "Found %s messages", len(all_msgs_uids) ) msg_counter = 1 for message_uid in all_msgs_uids: self._log.debug( "Processing message '%s' (%s/%s)", message_uid.decode(), msg_counter, len(all_msgs_uids) ) msg_counter += 1 fetch_response, fetch_data = self.imap.fetch(message_uid, "(RFC822)") if fetch_response == 'OK': store_result, unseen_data = self.imap.store(message_uid, '-FLAGS', '\\Seen') if store_result != 'OK': self._log.error( "Error marking as unseen the message '%s'. %s", message_uid, unseen_data ) sys.exit(6) subject = f'__no_subject__{message_uid}' data = fetch_data[0][1] subjects = self._get_mail_header('Subject', data) if len(subjects) > 0: subject = subjects[-1] message_path = os.path.join( mailbox_path, subject ) original_subject = subject message_id = self._get_mail_header('Message-ID', data)[-1] if not self._backedup_message(message_id, mailbox): counter = 1 while os.path.exists(message_path): subject = f"{original_subject}_{counter}" message_path = os.path.join( mailbox_path, subject ) counter += 1 try: with open(message_path, 'wb') as file_pointer: file_pointer.write(data) self.data['backedup_messages'].append({ "message_id": message_id, "mailbox": mailbox}) except OSError as error: if error.errno == 36: # File name too long message_path = os.path.join( mailbox_path, f"message_uid_{message_uid.decode()}" ) with open(message_path, 'wb') as file_pointer: file_pointer.write(data) self.data['backedup_messages'].append({ "message_id": message_id, "mailbox": mailbox}) else: self._log.error( "Error writing email '%s'. %s", message_path, error ) def _backedup_message(self, message_id, mailbox): for message in self.data['backedup_messages']: if message['message_id'] == message_id and message['mailbox'] == mailbox: return True return False def _get_mail_header(self, header, data): message = email.message_from_string(data) decoded_header = email.header.decode_header(message.get(header, "")) result = [] for raw_header_data in decoded_header: if isinstance(raw_header_data[0], str): header_data = raw_header_data[0] result.append(header_data) else: try: header_data = raw_header_data[0].decode() result.append(header_data) except UnicodeDecodeError: try: header_data = raw_header_data[0].decode('Windows-1252') result.append(header_data) except UnicodeDecodeError as error: self._log.error( "Error decoding header data as UTF-8. Data: %s. Error: %s", raw_header_data[0], error ) break return result def _parse_mailbox(self, data): result = data match = re.match(rb'\((.*)\) "(.*)" (.*)', data) if match: result = match.group(3).replace(b'"', b'').decode() return result def _connect_imap(self, imap_server, imap_port, ssl, imap_user, imap_password): '''Create connection object to the IMAP server''' self._log.debug( 'Connecting to server %s:%s...', imap_server, imap_port, ) if ssl: try: self.imap = imaplib.IMAP4_SSL(imap_server, imap_port) except imaplib.IMAP4.error as error: self._log.error( "Error connecting securely to IMAP server '%s'. %s", imap_server, error, ) sys.exit(1) else: try: self.imap = imaplib.IMAP4(imap_server, imap_port) except imaplib.IMAP4.error as error: self._log.error( "Error connecting to IMAP server '%s'. %s", imap_server, error, ) sys.exit(2) try: self._log.debug('Authenticating as user %s...', imap_user) result, data = self.imap.login(imap_user, imap_password) if result != 'OK': self._log.error( "Error login into IMAP server. %s", data ) sys.exit(3) except imaplib.IMAP4.error as error: self._log.error( "Error while login as '%s'. %s'", imap_user, error, ) self.imap.close() sys.exit(4) def close(self): '''Close class and save data''' self._save_cached_data(self.data) def _read_cached_data(self): if os.path.exists(self.config['cache_file']): with open(self.config['cache_file'], 'r', encoding='utf-8') as cache_file: try: cached_data = json.load(cache_file) if ( 'last_update' in cached_data and cached_data['last_update'] + self.config['max_cache_age'] > time.time() ): cached_data = self._default_data except json.decoder.JSONDecodeError: cached_data = self._default_data return cached_data else: return self._default_data def _save_cached_data(self, data): data['last_update'] = time.time() with open(self.config['cache_file'], 'w', encoding='utf-8') as cache_file: json.dump(data, cache_file, indent=2) self._log.debug( "Saved cached data in '%s'", self.config['cache_file'] ) def _init_log(self): ''' Initialize log object ''' self._log = logging.getLogger("backup_imap") self._log.setLevel(logging.DEBUG) sysloghandler = SysLogHandler() sysloghandler.setLevel(logging.DEBUG) self._log.addHandler(sysloghandler) streamhandler = logging.StreamHandler(sys.stdout) streamhandler.setLevel( logging.getLevelName(self.config.get("debug_level", 'INFO')) ) self._log.addHandler(streamhandler) if 'log_file' in self.config: log_file = self.config['log_file'] else: home_folder = os.environ.get( 'HOME', os.environ.get('USERPROFILE', '') ) log_folder = os.path.join(home_folder, "log") log_file = os.path.join(log_folder, "backup_imap.log") if not os.path.exists(os.path.dirname(log_file)): os.mkdir(os.path.dirname(log_file)) filehandler = logging.handlers.RotatingFileHandler( log_file, maxBytes=102400000 ) # create formatter formatter = logging.Formatter( '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' ) filehandler.setFormatter(formatter) filehandler.setLevel(logging.DEBUG) self._log.addHandler(filehandler) return True @click.command() @click.option( "--debug-level", "-d", default="INFO", type=click.Choice( ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], case_sensitive=False, ), help='Set the debug level for the standard output.' ) @click.option( '--log-file', '-l', default=f"{LOG_FOLDER}/backup_imap.log", help="File to store all debug messages." ) @click.option( '--imap-server', '-s', required=True, help='Hostname or IP of the IMAP server' ) @click.option( '--imap-port', '-p', default=993, help='IMAP port to contact the server' ) @click.option( '--use-ssl', '-S', is_flag=True, default=True, help='Use SSL to contact the IMAP server' ) @click.option( '--imap-user', '-u', required=True, help='User to connect to IMAP server' ) @click.option( '--imap-password', '-P', required=True, help='User password to conect to IMAP server' ) @click.option( '--mailbox', '-m', default='INBOX', help='Mailbox to backup' ) @click.option( '--destination-folder', '-F', required=True, help='Folder to save the messages and folders' ) @click.option( '--cache-file', '-f', default=f"{CACHE_FOLDER}/__project_code_name__.json", help='Cache file to store data from each run', ) @click.option( '--max-cache-age', '-a', default=60*60*24*7, help='Max age in seconds for the cache' ) # @click.option("--dummy","-n", is_flag=True, # help="Don't do anything, just show what would be done.") @click_config_file.configuration_option() def __main__(**kwargs): obj = BackupImap(**kwargs) obj.close() if __name__ == "__main__": __main__()