backup_imap/backup_imap/backup_imap.py
2025-03-28 10:37:20 +02:00

446 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2025 Antonio J. Delgado
"""Backup an IMAP account into a folder"""
import sys
import os
import json
import time
import re
import logging
from logging.handlers import SysLogHandler
import imaplib
import email
import click
import click_config_file
HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/'))
if HOME_FOLDER == '/':
CACHE_FOLDER = '/var/cache'
LOG_FOLDER = '/var/log/'
else:
CACHE_FOLDER = f"{HOME_FOLDER}/.local/"
LOG_FOLDER = f"{HOME_FOLDER}/log/"
class BackupImap:
"""Backup an IMAP account into a folder"""
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'backup_imap.log'
)
self._init_log()
self._default_data = {
"last_update": 0,
"backedup_messages": [],
}
self.data = self._read_cached_data()
if not os.path.exists(self.config['destination_folder']):
os.mkdir(self.config['destination_folder'])
self.imap = None
imaplib._MAXLINE = 100000
self._connect_imap(
imap_server=self.config['imap_server'],
imap_port=self.config['imap_port'],
ssl=self.config['use_ssl'],
imap_user=self.config['imap_user'],
imap_password=self.config['imap_password'],
)
self._process_mailbox(self.config['mailbox'])
def _process_mailbox(self, mailbox):
self._log.debug("Searching for all mailboxes in mailbox '%s'...", mailbox)
list_response, list_data = self.imap.list(f"\"{mailbox}\"", '*')
self._log.debug(
"Mailboxes: %s",
list_data
)
if list_response == 'OK':
for subdir in list_data:
if subdir:
sub_mailbox = self._parse_mailbox(subdir)
if sub_mailbox != mailbox:
self._process_mailbox(self._parse_mailbox(subdir))
self._log.debug(
"Seleting mailbox '%s'",
mailbox
)
if mailbox == '':
real_mailbox = 'INBOX'
else:
real_mailbox = mailbox
self.imap.select(mailbox=f"\"{real_mailbox}\"", readonly=False)
self._log.debug(
"Searching for all messages in '%s'...",
mailbox
)
mailbox_path = os.path.join(
self.config['destination_folder'],
mailbox
)
if not os.path.exists(os.path.dirname(mailbox_path)):
os.mkdir(os.path.dirname(mailbox_path))
if not os.path.exists(mailbox_path):
os.mkdir(mailbox_path)
current = time.gmtime()
# One year at a time because big mailboxes will take all memory due to Python IMAP library memory handling
for year in range(current.tm_year-10, current.tm_year+1):
search_response, search_data = self.imap.search('UTF-8', f'(BEFORE "1-Jan-{year}")')
if search_response == 'OK':
all_msgs_uids = search_data[0].split()
self._log.debug(
"Found %s messages",
len(all_msgs_uids)
)
msg_counter = 1
for message_uid in all_msgs_uids:
self._log.debug(
"Processing message %s (%s/%s)",
message_uid.decode(),
msg_counter,
len(all_msgs_uids)
)
msg_counter += 1
fetch_response, fetch_data = self.imap.fetch(message_uid, "(RFC822)")
if fetch_response == 'OK':
store_result, unseen_data = self.imap.store(message_uid, '-FLAGS', '\\Seen')
if store_result != 'OK':
self._log.error(
"Error marking as unseen the message '%s'. %s",
message_uid,
unseen_data
)
sys.exit(6)
subject = f'__no_subject__{message_uid}'
data = fetch_data[0][1]
subjects = self._get_mail_header('Subject', data)
if len(subjects) > 0:
subject = subjects[-1].replace(
os.path.sep,
'_'
).replace(
'\r',
'_'
).replace(
'\n',
'_'
).replace(
':',
'_'
)
message_path = os.path.join(
mailbox_path,
subject
)
original_subject = subject
message_id = self._get_mail_header('Message-ID', data)[-1]
if not self._backedup_message(message_id, mailbox):
counter = 1
while os.path.exists(message_path):
subject = f"{original_subject}_{counter}"
message_path = os.path.join(
mailbox_path,
subject
)
counter += 1
try:
with open(message_path, 'wb') as file_pointer:
file_pointer.write(data)
self.data['backedup_messages'].append(
{
"message_id": message_id,
"mailbox": mailbox
}
)
self._save_cached_data(self.data)
except OSError as error:
if error.errno == 36: # File name too long
message_path = os.path.join(
mailbox_path,
f"message_uid_{message_uid.decode()}"
)
with open(message_path, 'wb') as file_pointer:
file_pointer.write(data)
self.data['backedup_messages'].append(
{
"message_id": message_id,
"mailbox": mailbox
}
)
self._save_cached_data(self.data)
else:
self._log.error(
"Error writing email '%s'. %s",
message_path,
error
)
else:
self._log.debug(
"Skipping already backed up message"
)
def _backedup_message(self, message_id, mailbox):
for message in self.data['backedup_messages']:
if message['message_id'] == message_id and message['mailbox'] == mailbox:
return True
return False
def _get_mail_header(self, header, data):
message = email.message_from_bytes(data)
decoded_header = email.header.decode_header(message.get(header, ""))
result = []
for raw_header_data in decoded_header:
if isinstance(raw_header_data[0], str):
header_data = raw_header_data[0]
result.append(header_data)
else:
try:
header_data = raw_header_data[0].decode()
result.append(header_data)
except UnicodeDecodeError:
try:
header_data = raw_header_data[0].decode('Windows-1252')
result.append(header_data)
except UnicodeDecodeError as error:
self._log.error(
"Error decoding header data as UTF-8. Data: %s. Error: %s",
raw_header_data[0],
error
)
break
return result
def _parse_mailbox(self, data):
result = data
match = re.match(rb'\((.*)\) "(.*)" (.*)', data)
if match:
result = match.group(3).replace(b'"', b'').decode()
return result
def _connect_imap(self, imap_server, imap_port, ssl, imap_user, imap_password):
'''Create connection object to the IMAP server'''
self._log.debug(
'Connecting to server %s:%s...',
imap_server,
imap_port,
)
if ssl:
try:
self.imap = imaplib.IMAP4_SSL(imap_server, imap_port)
except imaplib.IMAP4.error as error:
self._log.error(
"Error connecting securely to IMAP server '%s'. %s",
imap_server,
error,
)
sys.exit(1)
else:
try:
self.imap = imaplib.IMAP4(imap_server, imap_port)
except imaplib.IMAP4.error as error:
self._log.error(
"Error connecting to IMAP server '%s'. %s",
imap_server,
error,
)
sys.exit(2)
try:
self._log.debug('Authenticating as user %s...', imap_user)
result, data = self.imap.login(imap_user, imap_password)
if result != 'OK':
self._log.error(
"Error login into IMAP server. %s",
data
)
sys.exit(3)
except imaplib.IMAP4.error as error:
self._log.error(
"Error while login as '%s'. %s'",
imap_user,
error,
)
self.imap.close()
sys.exit(4)
def close(self):
'''Close class and save data'''
self._save_cached_data(self.data)
def _read_cached_data(self):
if os.path.exists(self.config['cache_file']):
with open(self.config['cache_file'], 'r', encoding='utf-8') as cache_file:
try:
cached_data = json.load(cache_file)
if (
'last_update' in cached_data and
cached_data['last_update'] + self.config['max_cache_age'] < time.time()
):
self._log.debug(
"Data in cache file '%s' is too old (%s + %s > %s), initializing cache data.",
self.config['cache_file'],
cached_data['last_update'],
self.config['max_cache_age'],
time.time()
)
cached_data = self._default_data
except json.decoder.JSONDecodeError:
self._log.debug(
"Cache file '%s' doesn't contain valid JSON, initializing cache data.",
self.config['cache_file']
)
cached_data = self._default_data
return cached_data
else:
self._log.debug(
"Cache file '%s' doesn't exist, initializing cache data.",
self.config['cache_file']
)
return self._default_data
def _save_cached_data(self, data):
data['last_update'] = time.time()
with open(self.config['cache_file'], 'w', encoding='utf-8') as cache_file:
json.dump(data, cache_file, indent=2)
self._log.debug(
"Saved cached data in '%s'",
self.config['cache_file']
)
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("backup_imap")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "backup_imap.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option(
'--log-file',
'-l',
default=f"{LOG_FOLDER}/backup_imap.log",
help="File to store all debug messages."
)
@click.option(
'--imap-server',
'-s',
required=True,
help='Hostname or IP of the IMAP server'
)
@click.option(
'--imap-port',
'-p',
default=993,
help='IMAP port to contact the server'
)
@click.option(
'--use-ssl',
'-S',
is_flag=True,
default=True,
help='Use SSL to contact the IMAP server'
)
@click.option(
'--imap-user',
'-u',
required=True,
help='User to connect to IMAP server'
)
@click.option(
'--imap-password',
'-P',
required=True,
help='User password to conect to IMAP server'
)
@click.option(
'--mailbox',
'-m',
default='INBOX',
help='Mailbox to backup'
)
@click.option(
'--destination-folder',
'-F',
required=True,
help='Folder to save the messages and folders'
)
@click.option(
'--cache-file',
'-f',
default=f"{CACHE_FOLDER}/backup_imap.json",
help='Cache file to store data from each run',
)
@click.option(
'--max-cache-age',
'-a',
default=60*60*24*7,
help='Max age in seconds for the cache'
)
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click_config_file.configuration_option()
def __main__(**kwargs):
obj = BackupImap(**kwargs)
obj.close()
if __name__ == "__main__":
__main__()