pass_helper/pass_helper/pass_helper.py

276 lines
9 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2025 Antonio J. Delgado
"""Pass Helper"""
import sys
import os
import json
import time
import logging
from logging.handlers import SysLogHandler
import click
import click_config_file
import passpy
HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/'))
if HOME_FOLDER == '/':
CACHE_FOLDER = '/var/cache'
LOG_FOLDER = '/var/log/'
else:
CACHE_FOLDER = f"{HOME_FOLDER}/.local/"
LOG_FOLDER = f"{HOME_FOLDER}/log/"
class PassHelper:
"""Pass Helper"""
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'pass_helper.log'
)
self._init_log()
self._default_data = {
"last_update": 0,
}
self.data = self._read_cached_data()
self.passwords = {}
self.store = passpy.store.Store()
self._read_store()
def close(self):
'''Close class and save data'''
self._save_cached_data(self.data)
def _read_cached_data(self):
if os.path.exists(self.config['cache_file']):
with open(self.config['cache_file'], 'r', encoding='utf-8') as cache_file:
try:
cached_data = json.load(cache_file)
if (
'last_update' in cached_data and
cached_data['last_update'] + self.config['max_cache_age'] > time.time()
):
cached_data = self._default_data
except json.decoder.JSONDecodeError:
cached_data = self._default_data
return cached_data
else:
return self._default_data
def _save_cached_data(self, data):
data['last_update'] = time.time()
with open(self.config['cache_file'], 'w', encoding='utf-8') as cache_file:
json.dump(data, cache_file, indent=2)
self._log.debug(
"Saved cached data in '%s'",
self.config['cache_file']
)
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("pass_helper")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "pass_helper.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
def _read_store(self):
self._log.debug(
"Reading all passwords in the PasswordStore..."
)
self.passwords = {}
for item in self.store.find(''):
# self._log.debug(
# "Found %s passwords",
# len(self.passwords)
# )
obj = {
"label": os.path.basename(item),
}
obj['folder'] = os.path.dirname(item)
obj['fullpath'] = item
raw_values = self.store.get_key(item).split('\n')
obj['password'] = raw_values[0]
raw_values.pop(0)
custom_fields = {}
for line in raw_values:
if ': ' in line:
split_line = line.split(': ')
field = split_line[0].lower()
value = split_line[1]
else:
field = line
value = ''
if value != '':
if field == '':
field = 'notes'
if field == 'login':
field = 'username'
if field == 'uri':
field = 'url'
value = value.lower()
if field not in custom_fields:
custom_fields[field] = value
else:
if isinstance(custom_fields[field], str):
custom_fields[field] = [ custom_fields[field], value]
else:
custom_fields[field].append(value)
obj[field] = value
obj['customFields'] = json.dumps(custom_fields)
self.passwords[obj['fullpath']]=obj
def _are_same_password(self, password1, password2, compare_fields=None):
if not compare_fields:
compare_fields = ['username', 'password', 'url']
different = False
for field in compare_fields:
if (
field in password1 and
field in password2):
if password1[field] != '' or password2[field] != '':
if password1[field] != password2[field]:
different = True
break
if different:
return False
return True
def remove_duplicates(self, **kwargs):
'''Remove duplicate passwords'''
summary = {
'removed_duplicates': 0,
'processed_password': 0,
'removed_passwords': [],
}
total_passwords = len(self.passwords)
for password, password_data in self.passwords.items():
summary['processed_password'] += 1
self._log.debug(
"Processing password %s de %s",
summary['processed_password'],
total_passwords
)
for password2, password2_data in self.passwords.items():
if password not in summary['removed_passwords']:
if password != password2:
if self._are_same_password(password_data, password2_data):
self._log.warning(
"Same passwords (deleting first one):\n'%s'\n'%s'",
password,
password2
)
summary['removed_passwords'].append(password)
summary['removed_duplicates'] += 1
if self.config['dummy']:
self._log.info(
"Dummy (dry) run, not really deleting from PasswordStore"
)
else:
self.store.remove_path(password)
for removed in summary['removed_passwords']:
self.passwords.pop(removed)
self._log.info(
"Summary:\n%s",
json.dumps(summary, indent=2)
)
self.close()
@click.group()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option(
'--log-file',
'-l',
default=f"{LOG_FOLDER}/pass_helper.log",
help="File to store all debug messages."
)
@click.option(
'--cache-file',
'-f',
default=f"{CACHE_FOLDER}/pass_helper.json",
help='Cache file to store data from each run',
)
@click.option(
'--max-cache-age',
'-a',
default=60*60*24*7,
help='Max age in seconds for the cache'
)
@click.option(
"--dummy",
"-n",
is_flag=True,
help="Don't do anything, just show what would be done."
)
@click_config_file.configuration_option()
@click.pass_context
def cli(context, **kwargs):
'''Initialize class'''
context.ensure_object(dict)
context.obj['config'] = kwargs
context.obj['pass_helper'] = PassHelper(**kwargs)
@cli.command()
@click_config_file.configuration_option()
@click.pass_context
def remove_duplicates(context, **kwargs):
'''Remove duplicate password'''
context.obj['pass_helper'].remove_duplicates(**kwargs)
if __name__ == "__main__":
cli(obj={})