Improve test for duplicates, make safer output, enable update during migration

This commit is contained in:
Antonio J. Delgado 2024-11-16 13:22:15 +02:00
parent 7c1fb5c9c8
commit a5c08ab235

View file

@ -190,29 +190,44 @@ class NextcloudHandler:
)
self.keychain = json.loads(decrypt(text, key))
def _safer_obj(self, obj, fields=None):
if fields is None:
fields = ['password', 'token', 'secret', 'pass', 'passwd', 'hash']
if isinstance(obj, dict):
for key in obj.keys():
if isinstance(obj[key], dict) or isinstance(obj[key], list):
obj[key] = self._safer_obj(obj[key], fields=fields)
else:
if key in fields:
obj[key] = '***'
if isinstance(obj, list):
for item in obj:
item = self._safer_obj(item, fields=fields)
return obj
def debug(self, obj):
'''Show debug information'''
self._log.debug(
json.dumps(obj, indent=2)
json.dumps(self._safer_obj(obj), indent=2)
)
def warning(self, obj):
'''Show warning information'''
self._log.warning(
json.dumps(obj, indent=2)
json.dumps(self._safer_obj(obj), indent=2)
)
def info(self, obj):
'''Show information'''
self._log.info(
json.dumps(obj, indent=2)
json.dumps(self._safer_obj(obj), indent=2)
)
def error(self, obj):
'''Show error information'''
try:
self._log.error(
json.dumps(obj, indent=2)
json.dumps(self._safer_obj(obj), indent=2)
)
except TypeError as error:
self._log.error(
@ -253,7 +268,7 @@ class NextcloudHandler:
timeout=self.timeout
)
if r.status_code == 200:
if r.status_code < 299:
self.debug(
{"action": "get", "status_code": r.status_code, "size": len(r.content)}
)
@ -489,7 +504,7 @@ class NextcloudHandler:
auth=(self.user, self.token),
verify=self.ssl, timeout=self.timeout
)
if r.status_code == 200:
if r.status_code < 299:
self.x_api_session = r.headers.get('X-API-SESSION')
return r.json()
else:
@ -677,13 +692,15 @@ class NextcloudHandler:
)
return False
def create_password(self, post_obj):
def create_password(self, post_obj, update):
'''Create/add a password'''
if 'folder' in post_obj and post_obj['folder']:
if post_obj.get('folder', '') != '':
folder_id = self.get_folder_id(post_obj['folder'])
if not folder_id:
folder_id = self.create_passwords_folder(post_obj['folder'])['id']
post_obj['folder'] = folder_id
else:
post_obj.pop('folder', None)
if not 'username' in post_obj:
post_obj['username'] = ''
if not 'url' in post_obj:
@ -726,14 +743,17 @@ class NextcloudHandler:
}
)
else:
self.warning(
{
"action": "create_password",
"message": "Password with that name/label already exists",
"object": safer_obj,
"label": post_obj['label']
}
)
if update:
return self.update_password(post_obj)
else:
self.warning(
{
"action": "create_password",
"message": "Password with that name/label already exists",
"object": safer_obj,
"label": post_obj['label']
}
)
return False
def delete_password(self, post_obj):
@ -747,7 +767,7 @@ class NextcloudHandler:
verify=self.ssl, timeout=self.timeout
)
if r.status_code == 200:
if r.status_code < 299:
return r.json()
self.error(
{
@ -768,6 +788,7 @@ class NextcloudHandler:
def update_password(self, post_obj):
'''Update a password'''
safer_obj = dict(post_obj, **{ 'password': '***' })
try:
r = self.session.patch(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update',
@ -777,13 +798,17 @@ class NextcloudHandler:
verify=self.ssl, timeout=self.timeout
)
if r.status_code == 200:
if r.status_code < 299:
return r.json()
self.error(
{
"action": "update_password",
"message": f"Nextcloud instance returned status code {r.status_code}",
"object": post_obj,
#"content": r.content,
"json": r.json(),
"headers": r.headers,
# "request_headers": r.request.headers,
"object": safer_obj,
}
)
except requests.exceptions.ReadTimeout as error:
@ -791,7 +816,7 @@ class NextcloudHandler:
{
"action": "update_password",
"message": f"Timeout ({self.timeout} sec) error doing GET request",
"object": post_obj,
"object": safer_obj,
"error": error,
}
)
@ -829,16 +854,14 @@ class NextcloudHandler:
# }
# }
# )
if (
self.is_same_key('username', obj1, obj2) and
self.is_same_key('password', obj1, obj2) and
self.is_same_key('url', obj1, obj2) and
self.is_same_key('folder', obj1, obj2)
):
self.debug(
{ "action": "notify_match", "object": { "obj1": safer_obj1, "obj2": safer_obj2 } }
)
return True
if self.is_same_key('username', obj1, obj2):
if self.is_same_key('password', obj1, obj2):
if self.is_same_key('url', obj1, obj2):
#if self.is_same_key('folder', obj1, obj2):
self.debug(
{ "action": "notify_match", "object": { "obj1": safer_obj1, "obj2": safer_obj2 } }
)
return True
return False
def get_folder_id(self, folder_name):
@ -881,29 +904,51 @@ class NcPasswordClient:
}
self.nc = NextcloudHandler(params)
def _safer_obj(self, obj, fields=None):
if fields is None:
fields = ['password', 'token', 'secret', 'pass', 'passwd', 'hash']
if isinstance(obj, dict):
for key in obj.keys():
if isinstance(obj[key], dict) or isinstance(obj[key], list):
obj[key] = self._safer_obj(obj[key], fields=fields)
else:
if key in fields:
obj[key] = '***'
if isinstance(obj, list):
for item in obj:
item = self._safer_obj(item, fields=fields)
return obj
def debug(self, obj):
'''Show debug information'''
self._log.debug(
json.dumps(obj, indent=2)
json.dumps(self._safer_obj(obj), indent=2)
)
def warning(self, obj):
'''Show warning information'''
self._log.warning(
json.dumps(obj, indent=2)
json.dumps(self._safer_obj(obj), indent=2)
)
def info(self, obj):
'''Show information'''
self._log.info(
json.dumps(obj, indent=2)
json.dumps(self._safer_obj(obj), indent=2)
)
def error(self, obj):
'''Show error information'''
self._log.error(
json.dumps(obj, indent=2)
)
try:
self._log.error(
json.dumps(self._safer_obj(obj), indent=2)
)
except TypeError as error:
self._log.error(
'Additional error showing the error from %s. %s',
obj,
error
)
def get_password(self, name, details):
'''Get a single password'''
@ -917,7 +962,7 @@ class NcPasswordClient:
'''List all password folders'''
self.info(self.nc.list_passwords_folders())
def create_password(self, obj):
def create_password(self, obj, update):
'''Create a password with an object'''
safer_obj = dict(obj, **{ 'password': '***' })
self.debug(
@ -926,7 +971,7 @@ class NcPasswordClient:
self.debug(
{
"action": "created_password",
"object": self.nc.create_password(obj)
"object": self.nc.create_password(obj, update)
}
)
@ -959,7 +1004,7 @@ class NcPasswordClient:
'''Delete a passwords folder'''
self.info(self.nc.delete_passwords_folder(name))
def migrate_pass(self, limit=-1):
def migrate_pass(self, limit=-1, update=False):
'''Migrate password store to Nextcloud pass'''
store = passpy.store.Store()
count = 0
@ -974,8 +1019,8 @@ class NcPasswordClient:
raw_values = store.get_key(item).split('\n')
obj['password'] = raw_values[0]
raw_values.pop(0)
custom_fields = {}
for line in raw_values:
custom_fields = {}
if ': ' in line:
split_line = line.split(': ')
field = split_line[0]
@ -999,12 +1044,21 @@ class NcPasswordClient:
custom_fields[field].append(value)
obj[field] = value
obj['customFields'] = json.dumps(custom_fields)
new_password = self.nc.create_password(obj)
new_password = self.nc.create_password(obj, update=update)
if new_password:
self.debug(
{ "action": "created_password", "object": new_password }
)
count += 1
self._log.debug(
{
"action": "migrate_pass",
"message": "Migrate Pass summary",
"limi": limit,
"update": update,
"migrated_passwords": count
}
)
def delete_all_passwords(self, yes_i_am_sure):
'''DANGEROUS! Delete ALL passwords from your Nextcloud Password instance'''
@ -1032,15 +1086,37 @@ class NcPasswordClient:
self.nc.delete_password(item)
return True
def remove_duplicates(self):
def remove_duplicates(self, limit):
'''Remove duplicate passwords'''
checked_passwords = []
for item in self.nc.list_passwords():
count = 0
if limit == 0:
return True
passwords = self.nc.list_passwords()
if passwords is None:
return False
for item in passwords:
for checked in checked_passwords:
if self.nc.is_same_password(checked, item):
self.nc.delete_password(item)
self._log.debug(
{
"action": "remove_duplicate",
"object": self.nc.delete_password(item)
}
)
count += 1
break
checked_passwords.append(item)
if limit != -1 and count >= limit:
break
self._log.debug(
{
"action": "remove_duplicates",
"message": "Remove duplicates summary",
"checked_password": len(checked_passwords),
"removed_duplicates": count
}
)
return True
def _init_log(self):
@ -1141,11 +1217,17 @@ def ls(ctx):
@cli.command()
@click.option('--obj', '-o', required=True, help='JSON object for a password')
@click.option(
'--update', '-u',
is_flag=True,
default=False,
help='Update exdisting passwords with the data from Password Store'
)
@click_config_file.configuration_option()
@click.pass_context
def create_password(ctx, obj):
def create_password(ctx, obj, update):
'''Create a password'''
ctx.obj['NcPasswordClient'].create_password(json.loads(obj))
ctx.obj['NcPasswordClient'].create_password(json.loads(obj), update)
@cli.command()
@click.option('--name', '-n', required=True, help='Name of the password to delete')
@ -1164,11 +1246,16 @@ def create_passwords_folder(ctx, name):
ctx.obj['NcPasswordClient'].create_passwords_folder(name)
@cli.command()
@click.option(
'--limit', '-l',
default=-1,
help='Maximun number of passwords to remove. -1 for unlimited.'
)
@click_config_file.configuration_option()
@click.pass_context
def remove_duplicates(ctx):
def remove_duplicates(ctx, limit):
'''Remove duplicate passwords'''
ctx.obj['NcPasswordClient'].remove_duplicates()
ctx.obj['NcPasswordClient'].remove_duplicates(limit)
@cli.command()
@click_config_file.configuration_option()
@ -1191,11 +1278,17 @@ def delete_passwords_folder(ctx, name):
default=-1,
help='Maximun number of passwords to migrate. -1 for unlimited.'
)
@click.option(
'--update', '-u',
is_flag=True,
default=False,
help='Update exdisting passwords with the data from Password Store'
)
@click_config_file.configuration_option()
@click.pass_context
def migrate_pass(ctx, limit):
def migrate_pass(ctx, limit, update):
'''Migrate Password Store passwords to Nextcloud Passwords'''
ctx.obj['NcPasswordClient'].migrate_pass(limit)
ctx.obj['NcPasswordClient'].migrate_pass(limit, update)
@cli.command()
@click.option(