From a5c08ab23540ef13272e9d3390bdbe3d671cc824 Mon Sep 17 00:00:00 2001 From: "Antonio J. Delgado" Date: Sat, 16 Nov 2024 13:22:15 +0200 Subject: [PATCH] Improve test for duplicates, make safer output, enable update during migration --- nc_password_client/nc_password_client.py | 193 +++++++++++++++++------ 1 file changed, 143 insertions(+), 50 deletions(-) diff --git a/nc_password_client/nc_password_client.py b/nc_password_client/nc_password_client.py index 45df3b5..69a7625 100755 --- a/nc_password_client/nc_password_client.py +++ b/nc_password_client/nc_password_client.py @@ -190,29 +190,44 @@ class NextcloudHandler: ) self.keychain = json.loads(decrypt(text, key)) + def _safer_obj(self, obj, fields=None): + if fields is None: + fields = ['password', 'token', 'secret', 'pass', 'passwd', 'hash'] + if isinstance(obj, dict): + for key in obj.keys(): + if isinstance(obj[key], dict) or isinstance(obj[key], list): + obj[key] = self._safer_obj(obj[key], fields=fields) + else: + if key in fields: + obj[key] = '***' + if isinstance(obj, list): + for item in obj: + item = self._safer_obj(item, fields=fields) + return obj + def debug(self, obj): '''Show debug information''' self._log.debug( - json.dumps(obj, indent=2) + json.dumps(self._safer_obj(obj), indent=2) ) def warning(self, obj): '''Show warning information''' self._log.warning( - json.dumps(obj, indent=2) + json.dumps(self._safer_obj(obj), indent=2) ) def info(self, obj): '''Show information''' self._log.info( - json.dumps(obj, indent=2) + json.dumps(self._safer_obj(obj), indent=2) ) def error(self, obj): '''Show error information''' try: self._log.error( - json.dumps(obj, indent=2) + json.dumps(self._safer_obj(obj), indent=2) ) except TypeError as error: self._log.error( @@ -253,7 +268,7 @@ class NextcloudHandler: timeout=self.timeout ) - if r.status_code == 200: + if r.status_code < 299: self.debug( {"action": "get", "status_code": r.status_code, "size": len(r.content)} ) @@ -489,7 +504,7 @@ class NextcloudHandler: auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout ) - if r.status_code == 200: + if r.status_code < 299: self.x_api_session = r.headers.get('X-API-SESSION') return r.json() else: @@ -677,13 +692,15 @@ class NextcloudHandler: ) return False - def create_password(self, post_obj): + def create_password(self, post_obj, update): '''Create/add a password''' - if 'folder' in post_obj and post_obj['folder']: + if post_obj.get('folder', '') != '': folder_id = self.get_folder_id(post_obj['folder']) if not folder_id: folder_id = self.create_passwords_folder(post_obj['folder'])['id'] post_obj['folder'] = folder_id + else: + post_obj.pop('folder', None) if not 'username' in post_obj: post_obj['username'] = '' if not 'url' in post_obj: @@ -726,14 +743,17 @@ class NextcloudHandler: } ) else: - self.warning( - { - "action": "create_password", - "message": "Password with that name/label already exists", - "object": safer_obj, - "label": post_obj['label'] - } - ) + if update: + return self.update_password(post_obj) + else: + self.warning( + { + "action": "create_password", + "message": "Password with that name/label already exists", + "object": safer_obj, + "label": post_obj['label'] + } + ) return False def delete_password(self, post_obj): @@ -747,7 +767,7 @@ class NextcloudHandler: verify=self.ssl, timeout=self.timeout ) - if r.status_code == 200: + if r.status_code < 299: return r.json() self.error( { @@ -768,6 +788,7 @@ class NextcloudHandler: def update_password(self, post_obj): '''Update a password''' + safer_obj = dict(post_obj, **{ 'password': '***' }) try: r = self.session.patch( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update', @@ -777,13 +798,17 @@ class NextcloudHandler: verify=self.ssl, timeout=self.timeout ) - if r.status_code == 200: + if r.status_code < 299: return r.json() self.error( { "action": "update_password", "message": f"Nextcloud instance returned status code {r.status_code}", - "object": post_obj, + #"content": r.content, + "json": r.json(), + "headers": r.headers, + # "request_headers": r.request.headers, + "object": safer_obj, } ) except requests.exceptions.ReadTimeout as error: @@ -791,7 +816,7 @@ class NextcloudHandler: { "action": "update_password", "message": f"Timeout ({self.timeout} sec) error doing GET request", - "object": post_obj, + "object": safer_obj, "error": error, } ) @@ -829,16 +854,14 @@ class NextcloudHandler: # } # } # ) - if ( - self.is_same_key('username', obj1, obj2) and - self.is_same_key('password', obj1, obj2) and - self.is_same_key('url', obj1, obj2) and - self.is_same_key('folder', obj1, obj2) - ): - self.debug( - { "action": "notify_match", "object": { "obj1": safer_obj1, "obj2": safer_obj2 } } - ) - return True + if self.is_same_key('username', obj1, obj2): + if self.is_same_key('password', obj1, obj2): + if self.is_same_key('url', obj1, obj2): + #if self.is_same_key('folder', obj1, obj2): + self.debug( + { "action": "notify_match", "object": { "obj1": safer_obj1, "obj2": safer_obj2 } } + ) + return True return False def get_folder_id(self, folder_name): @@ -881,29 +904,51 @@ class NcPasswordClient: } self.nc = NextcloudHandler(params) + def _safer_obj(self, obj, fields=None): + if fields is None: + fields = ['password', 'token', 'secret', 'pass', 'passwd', 'hash'] + if isinstance(obj, dict): + for key in obj.keys(): + if isinstance(obj[key], dict) or isinstance(obj[key], list): + obj[key] = self._safer_obj(obj[key], fields=fields) + else: + if key in fields: + obj[key] = '***' + if isinstance(obj, list): + for item in obj: + item = self._safer_obj(item, fields=fields) + return obj + def debug(self, obj): '''Show debug information''' self._log.debug( - json.dumps(obj, indent=2) + json.dumps(self._safer_obj(obj), indent=2) ) def warning(self, obj): '''Show warning information''' self._log.warning( - json.dumps(obj, indent=2) + json.dumps(self._safer_obj(obj), indent=2) ) def info(self, obj): '''Show information''' self._log.info( - json.dumps(obj, indent=2) + json.dumps(self._safer_obj(obj), indent=2) ) def error(self, obj): '''Show error information''' - self._log.error( - json.dumps(obj, indent=2) - ) + try: + self._log.error( + json.dumps(self._safer_obj(obj), indent=2) + ) + except TypeError as error: + self._log.error( + 'Additional error showing the error from %s. %s', + obj, + error + ) def get_password(self, name, details): '''Get a single password''' @@ -917,7 +962,7 @@ class NcPasswordClient: '''List all password folders''' self.info(self.nc.list_passwords_folders()) - def create_password(self, obj): + def create_password(self, obj, update): '''Create a password with an object''' safer_obj = dict(obj, **{ 'password': '***' }) self.debug( @@ -926,7 +971,7 @@ class NcPasswordClient: self.debug( { "action": "created_password", - "object": self.nc.create_password(obj) + "object": self.nc.create_password(obj, update) } ) @@ -959,7 +1004,7 @@ class NcPasswordClient: '''Delete a passwords folder''' self.info(self.nc.delete_passwords_folder(name)) - def migrate_pass(self, limit=-1): + def migrate_pass(self, limit=-1, update=False): '''Migrate password store to Nextcloud pass''' store = passpy.store.Store() count = 0 @@ -974,8 +1019,8 @@ class NcPasswordClient: raw_values = store.get_key(item).split('\n') obj['password'] = raw_values[0] raw_values.pop(0) + custom_fields = {} for line in raw_values: - custom_fields = {} if ': ' in line: split_line = line.split(': ') field = split_line[0] @@ -999,12 +1044,21 @@ class NcPasswordClient: custom_fields[field].append(value) obj[field] = value obj['customFields'] = json.dumps(custom_fields) - new_password = self.nc.create_password(obj) + new_password = self.nc.create_password(obj, update=update) if new_password: self.debug( { "action": "created_password", "object": new_password } ) count += 1 + self._log.debug( + { + "action": "migrate_pass", + "message": "Migrate Pass summary", + "limi": limit, + "update": update, + "migrated_passwords": count + } + ) def delete_all_passwords(self, yes_i_am_sure): '''DANGEROUS! Delete ALL passwords from your Nextcloud Password instance''' @@ -1032,15 +1086,37 @@ class NcPasswordClient: self.nc.delete_password(item) return True - def remove_duplicates(self): + def remove_duplicates(self, limit): '''Remove duplicate passwords''' checked_passwords = [] - for item in self.nc.list_passwords(): + count = 0 + if limit == 0: + return True + passwords = self.nc.list_passwords() + if passwords is None: + return False + for item in passwords: for checked in checked_passwords: if self.nc.is_same_password(checked, item): - self.nc.delete_password(item) + self._log.debug( + { + "action": "remove_duplicate", + "object": self.nc.delete_password(item) + } + ) + count += 1 break checked_passwords.append(item) + if limit != -1 and count >= limit: + break + self._log.debug( + { + "action": "remove_duplicates", + "message": "Remove duplicates summary", + "checked_password": len(checked_passwords), + "removed_duplicates": count + } + ) return True def _init_log(self): @@ -1141,11 +1217,17 @@ def ls(ctx): @cli.command() @click.option('--obj', '-o', required=True, help='JSON object for a password') +@click.option( + '--update', '-u', + is_flag=True, + default=False, + help='Update exdisting passwords with the data from Password Store' +) @click_config_file.configuration_option() @click.pass_context -def create_password(ctx, obj): +def create_password(ctx, obj, update): '''Create a password''' - ctx.obj['NcPasswordClient'].create_password(json.loads(obj)) + ctx.obj['NcPasswordClient'].create_password(json.loads(obj), update) @cli.command() @click.option('--name', '-n', required=True, help='Name of the password to delete') @@ -1164,11 +1246,16 @@ def create_passwords_folder(ctx, name): ctx.obj['NcPasswordClient'].create_passwords_folder(name) @cli.command() +@click.option( + '--limit', '-l', + default=-1, + help='Maximun number of passwords to remove. -1 for unlimited.' +) @click_config_file.configuration_option() @click.pass_context -def remove_duplicates(ctx): +def remove_duplicates(ctx, limit): '''Remove duplicate passwords''' - ctx.obj['NcPasswordClient'].remove_duplicates() + ctx.obj['NcPasswordClient'].remove_duplicates(limit) @cli.command() @click_config_file.configuration_option() @@ -1191,11 +1278,17 @@ def delete_passwords_folder(ctx, name): default=-1, help='Maximun number of passwords to migrate. -1 for unlimited.' ) +@click.option( + '--update', '-u', + is_flag=True, + default=False, + help='Update exdisting passwords with the data from Password Store' +) @click_config_file.configuration_option() @click.pass_context -def migrate_pass(ctx, limit): +def migrate_pass(ctx, limit, update): '''Migrate Password Store passwords to Nextcloud Passwords''' - ctx.obj['NcPasswordClient'].migrate_pass(limit) + ctx.obj['NcPasswordClient'].migrate_pass(limit, update) @cli.command() @click.option(