#!/usr/bin/env python3 # -*- encoding: utf-8 -*- # # This script is licensed under GNU GPL version 2.0 or above # (c) 2025 Antonio J. Delgado """Backup an IMAP account into a folder""" import sys import os import json import time import re import logging from logging.handlers import SysLogHandler import imaplib import email from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import click import click_config_file HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/')) if HOME_FOLDER == '/': CACHE_FOLDER = '/var/cache' LOG_FOLDER = '/var/log/' else: CACHE_FOLDER = f"{HOME_FOLDER}/.local/" LOG_FOLDER = f"{HOME_FOLDER}/log/" class BackupImap: """Backup an IMAP account into a folder""" def __init__(self, **kwargs): self.config = kwargs if 'log_file' not in kwargs or kwargs['log_file'] is None: self.config['log_file'] = os.path.join( os.environ.get( 'HOME', os.environ.get( 'USERPROFILE', os.getcwd() ) ), 'log', 'backup_imap.log' ) self._init_log() if not os.path.exists(self.config['destination_folder']): os.mkdir(self.config['destination_folder']) self.imap = None self._connect_imap( imap_server=self.config['imap_server'], imap_port=self.config['imap_port'], ssl=self.config['use_ssl'], imap_user=self.config['imap_user'], imap_password=self.config['imap_password'], ) self._process_mailbox(self.config['mailbox']) def _process_mailbox(self, mailbox): self._log.debug("Searching for all mailboxes in mailbox '%s'...", mailbox) list_response, list_data = self.imap.list(f"\"{mailbox}\"", '*') self._log.debug( "Mailboxes: %s", list_data ) if list_response == 'OK': for subdir in list_data: if subdir: sub_mailbox = self._parse_mailbox(subdir) if sub_mailbox != mailbox: self._process_mailbox(self._parse_mailbox(subdir)) self._log.debug( "Seleting mailbox '%s'", mailbox ) if mailbox == '': real_mailbox = 'INBOX' else: real_mailbox = mailbox self.imap.select(mailbox=f"\"{real_mailbox}\"", readonly=False) self._log.debug( "Searching for all messages in '%s'...", mailbox ) mailbox_path = os.path.join( self.config['destination_folder'], mailbox ) if not os.path.exists(os.path.dirname(mailbox_path)): os.mkdir(os.path.dirname(mailbox_path)) if not os.path.exists(mailbox_path): os.mkdir(mailbox_path) search_response, search_data = self.imap.search('UTF-8', 'UNDELETED') if search_response == 'OK': all_msgs_uids = search_data[0].split() self._log.debug( "Found %s messages", len(all_msgs_uids) ) msg_counter = 1 for message_uid in all_msgs_uids: self._log.debug( "Processing message '%s' (%s/%s)", message_uid.decode(), msg_counter, len(all_msgs_uids) ) msg_counter += 1 fetch_response, fetch_data = self.imap.fetch(message_uid, "(RFC822)") if fetch_response == 'OK': store_result, unseen_data = self.imap.store(message_uid, '-FLAGS', '\\Seen') if store_result != 'OK': self._log.error( "Error marking as unseen the message '%s'. %s", message_uid, unseen_data ) sys.exit(6) subject = f'__no_subject__{message_uid}' data = fetch_data[0][1] subject_match = re.search(rb'Subject: (.*)\r\n', data) if subject_match: subject = subject_match.group(1).decode().replace(os.path.sep, '_') message_path = os.path.join( mailbox_path, subject ) original_subject = subject counter = 1 while os.path.exists(message_path): subject = f"{original_subject}_{counter}" message_path = os.path.join( mailbox_path, subject ) counter += 1 try: with open(message_path, 'wb') as file_pointer: file_pointer.write(data) except OSError as error: if error.errno == 36: # File name too long message_path = os.path.join( mailbox_path, f"message_uid_{message_uid.decode()}" ) with open(message_path, 'wb') as file_pointer: file_pointer.write(data) else: self._log.error( "Error writing email '%s'. %s", message_path, error ) def _parse_mailbox(self, data): result = data match = re.match(rb'\((.*)\) "(.*)" (.*)', data) if match: result = match.group(3).replace(b'"', b'').decode() return result def _connect_imap(self, imap_server, imap_port, ssl, imap_user, imap_password): '''Create connection object to the IMAP server''' self._log.debug( 'Connecting to server %s:%s...', imap_server, imap_port, ) if ssl: try: self.imap = imaplib.IMAP4_SSL(imap_server, imap_port) except imaplib.IMAP4.error as error: self._log.error( "Error connecting securely to IMAP server '%s'. %s", imap_server, error, ) sys.exit(1) else: try: self.imap = imaplib.IMAP4(imap_server, imap_port) except imaplib.IMAP4.error as error: self._log.error( "Error connecting to IMAP server '%s'. %s", imap_server, error, ) sys.exit(2) try: self._log.debug('Authenticating as user %s...', imap_user) result, data = self.imap.login(imap_user, imap_password) if result != 'OK': self._log.error( "Error login into IMAP server. %s", data ) sys.exit(3) except imaplib.IMAP4.error as error: self._log.error( "Error while login as '%s'. %s'", imap_user, error, ) self.imap.close() sys.exit(4) def close(self): '''Close class''' def _init_log(self): ''' Initialize log object ''' self._log = logging.getLogger("backup_imap") self._log.setLevel(logging.DEBUG) sysloghandler = SysLogHandler() sysloghandler.setLevel(logging.DEBUG) self._log.addHandler(sysloghandler) streamhandler = logging.StreamHandler(sys.stdout) streamhandler.setLevel( logging.getLevelName(self.config.get("debug_level", 'INFO')) ) self._log.addHandler(streamhandler) if 'log_file' in self.config: log_file = self.config['log_file'] else: home_folder = os.environ.get( 'HOME', os.environ.get('USERPROFILE', '') ) log_folder = os.path.join(home_folder, "log") log_file = os.path.join(log_folder, "backup_imap.log") if not os.path.exists(os.path.dirname(log_file)): os.mkdir(os.path.dirname(log_file)) filehandler = logging.handlers.RotatingFileHandler( log_file, maxBytes=102400000 ) # create formatter formatter = logging.Formatter( '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' ) filehandler.setFormatter(formatter) filehandler.setLevel(logging.DEBUG) self._log.addHandler(filehandler) return True @click.command() @click.option( "--debug-level", "-d", default="INFO", type=click.Choice( ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], case_sensitive=False, ), help='Set the debug level for the standard output.' ) @click.option( '--log-file', '-l', default=f"{LOG_FOLDER}/backup_imap.log", help="File to store all debug messages." ) @click.option( '--imap-server', '-s', required=True, help='Hostname or IP of the IMAP server' ) @click.option( '--imap-port', '-p', default=993, help='IMAP port to contact the server' ) @click.option( '--use-ssl', '-S', is_flag=True, default=True, help='Use SSL to contact the IMAP server' ) @click.option( '--imap-user', '-u', required=True, help='User to connect to IMAP server' ) @click.option( '--imap-password', '-P', required=True, help='User password to conect to IMAP server' ) @click.option( '--mailbox', '-m', default='INBOX', help='Mailbox to backup' ) @click.option( '--destination-folder', '-F', required=True, help='Folder to save the messages and folders' ) # @click.option("--dummy","-n", is_flag=True, # help="Don't do anything, just show what would be done.") @click_config_file.configuration_option() def __main__(**kwargs): obj = BackupImap(**kwargs) obj.close() if __name__ == "__main__": __main__()