From 8335783e89ce187395f17032efd37e5124b51c51 Mon Sep 17 00:00:00 2001 From: "Antonio J. Delgado" Date: Mon, 11 Nov 2024 16:40:15 +0200 Subject: [PATCH] Unify output to json --- nc_password_client/nc_password_client.py | 419 +++++++++++++++-------- 1 file changed, 271 insertions(+), 148 deletions(-) diff --git a/nc_password_client/nc_password_client.py b/nc_password_client/nc_password_client.py index 471d1e2..fa9b1b8 100755 --- a/nc_password_client/nc_password_client.py +++ b/nc_password_client/nc_password_client.py @@ -101,17 +101,32 @@ class NextcloudHandler: self.host = params.get('host') or os.environ.get('NEXTCLOUD_HOST') if self.host is None: - self._log.error('Unable to continue. No Nextcloud Host is given.') + self.error( + { + "action": "check_params", + "message": 'Unable to continue. No Nextcloud Host is given.', + } + ) sys.exit(1) self.user = params.get('user') or os.environ.get('NEXTCLOUD_USER') if self.user is None: - self._log.error('Unable to continue. No Nextcloud User is given.') + self.error( + { + "action": "check_params", + "message": 'Unable to continue. No Nextcloud User is given.' + } + ) sys.exit(2) self.token = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN') if self.token is None: - self._log.error('Unable to continue. No Nextcloud Token is given.') + self.error( + { + "action": "check_params", + "message": 'Unable to continue. No Nextcloud Token is given.' + } + ) sys.exit(3) self.e2e_password = False @@ -153,6 +168,30 @@ class NextcloudHandler: ) self.keychain = json.loads(decrypt(text, key)) + def debug(self, obj): + '''Show debug information''' + self._log.debug( + json.dumps(obj, indent=2) + ) + + def warning(self, obj): + '''Show warning information''' + self._log.warning( + json.dumps(obj, indent=2) + ) + + def info(self, obj): + '''Show information''' + self._log.info( + json.dumps(obj, indent=2) + ) + + def error(self, obj): + '''Show error information''' + self._log.error( + json.dumps(obj, indent=2) + ) + def _init_log(self, params): ''' Initialize log object ''' self._log = logging.getLogger("nc_password_client") @@ -212,22 +251,33 @@ class NextcloudHandler: ) if r.status_code == 200: - return r + return r.json() elif r.status_code == 404: - self._log.error( - 'Path %s does not exist', - path - ) + self.error( + { + "action": "get", + "message": 'Path does not exist', + "object": path + } + ) sys.exit(3) else: - self._log.error(r.status_code) + self.error( + { + "action": "get", + "message": f'Server return error {r.status_code}', + } + ) sys.exit(4) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "get", + "message": f"Timeout ({self.timeout} sec) error doing GET request. %s", + "error": error + } ) + return None def propfind(self, path): '''Do a PROPFIND request''' @@ -295,15 +345,20 @@ class NextcloudHandler: elif r.status_code == 404: return {} else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error( + { + "action": "propfind", + "message": f"Nextcloud instance returned status code: {r.status_code}", + "path": path, + } ) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "propfind", + "message": f"Timeout ({self.timeout} sec) error doing GET request.", + "error": error + } ) def put(self, path, src=None): @@ -328,15 +383,19 @@ class NextcloudHandler: if r.status_code in [200, 201, 204]: return r, True - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error( + { + "action": "put", + "message": f"Nextcloud instance returned status code {r.status_code}", + } ) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "put", + "message": f"Timeout ({self.timeout} sec) error doing GET request", + "error": error + } ) def delete(self, path): @@ -352,15 +411,20 @@ class NextcloudHandler: elif r.status_code == 404: return r, False else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error( + { + "action": "delete", + "message": f"Nextcloud instance returned status code {r.status_code}", + "path": path, + } ) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "delete", + "message": f"Timeout ({self.timeout} sec) error doing GET request", + "error": error + } ) def talk(self, message, channel): @@ -383,27 +447,26 @@ class NextcloudHandler: if r.status_code == 201: return r, True else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error( + { + "action": "talk", + "message": f"Nextcloud instance returned status code {r.status_code}", + "talk_message": message, + "channel": channel + } ) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "talk", + "message": f"Timeout ({self.timeout} sec) error doing talk request", + "error": error + } ) def request_passwords_session(self): '''Request a Passwords API session''' - r = self.get("index.php/apps/passwords/api/1.0/session/request") - if r.status_code == 200: - return r.json() - else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code - ) + return self.get("index.php/apps/passwords/api/1.0/session/request") def open_passwords_session(self, password_hash): '''Open a Passwords API session''' @@ -422,43 +485,25 @@ class NextcloudHandler: self.x_api_session = r.headers.get('X-API-SESSION') return r.json() else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error( + { + "action": "open_password_session", + "message": f"Nextcloud instance returned status code {r.status_code}", + "password_hash": password_hash + } ) def close_passwords_session(self): '''Close Passwords API session''' - r = self.get("index.php/apps/passwords/api/1.0/session/close") - if r.status_code == 200: - return r.json() - else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code - ) + return self.get("index.php/apps/passwords/api/1.0/session/close") def list_passwords(self): '''List all passwords''' - r = self.get("index.php/apps/passwords/api/1.0/password/list") - if r.status_code == 200: - return r.json() - else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code - ) + return self.get("index.php/apps/passwords/api/1.0/password/list") def list_passwords_folders(self): '''List passwords folders''' - r = self.get("index.php/apps/passwords/api/1.0/folder/list") - if r.status_code == 200: - return r.json() - else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code - ) + return self.get("index.php/apps/passwords/api/1.0/folder/list") def delete_passwords_folder(self, name): '''Delete a passwords folder''' @@ -467,10 +512,11 @@ class NextcloudHandler: for folder in all_folders: if folder['label'] == name: folder_id = folder['id'] - self._log.debug( - "Found folder with id '%s' to delete", - folder['id'] - ) + self.debug({ + "action": "delete_passwords_folder", + "message": "found folder to delete", + "object": folder['id'] + }) if folder_id: post_obj = { 'id': folder_id @@ -486,13 +532,20 @@ class NextcloudHandler: if r.status_code == 201: return r.json() else: - self._log.error( - "Nextcloud instance returned status code: %s", -r.status_code -) + self.error( + { + "action": "delete_passwords_folder", + "message": f"Nextcloud instance returned status code {r.status_code}", + "folder_name": name + } + ) else: - self._log.error( - "Fodler '{name}' not found." + self.error( + { + "action": "delete_passwords_folder", + "message": "Folder not found", + "folder_name": name + } ) def exists_passwords_folder(self, name): @@ -521,14 +574,20 @@ r.status_code if r.status_code == 201: return r.json() else: - self._log.error( - "Nextcloud instance returned status code: %s", -r.status_code -) + self.error( + { + "action": "create_passwords_folder", + "message": f"Nextcloud instance returned status code {r.status_code}", + "folder_name": name + } + ) else: - self._log.warning( - "Password folder '%s' already exists", - name + self.warning( + { + "action": "create_passwords_folder", + "message": "Passwords folder already exists", + "folder_name": name + } ) def get_passwords_folder(self, name): @@ -567,17 +626,13 @@ r.status_code def fetch_generated_password(self): '''Fetch a generated password''' - r = self.get('index.php/apps/passwords/api/1.0/service/password') - if r.status_code == 200: - return [r.json().get('password')] - else: - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code - ) + return self.get('index.php/apps/passwords/api/1.0/service/password').get('password') def exists_password(self, obj): '''Test if a password exist with the same name''' + self.debug( + { "action": "check_exists_password", "object": obj } + ) for password in self.list_passwords(): if self.is_same_password(obj, password): return True @@ -587,13 +642,15 @@ r.status_code '''Create/add a password''' if 'folder' in post_obj: post_obj['folder'] = self.get_folder_id(post_obj['folder']) + if not 'username' in post_obj: + post_obj['username'] = '' + if not 'url' in post_obj: + post_obj['url'] = '' if not self.exists_password(post_obj): try: - safer_obj = post_obj - safer_obj['password'] = '***' - self._log.debug( - "Creating password: %s", - json.dumps(safer_obj, indent=2) + safer_obj = dict(post_obj, **{ 'password': '***' }) + self.debug( + { "action": "create_password", "object": safer_obj } ) r = requests.post( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create', @@ -605,21 +662,31 @@ r.status_code if r.status_code == 201: return r.json() - self._log.error(r.json()) - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error(r.json()) + self.error( + { + "action": "create_password", + "message": f"Nextcloud instance returned status code: {r.status_code}", + "object": post_obj + } ) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "create_password", + "message": f"Timeout ({self.timeout} sec) error doing GET request", + "object": post_obj, + "error": error + } ) else: - self._log.warning( - "Password with name '%s' already exists", - post_obj['label'] + self.warning( + { + "action": "create_password", + "message": "Password with that name/label already exists", + "object": safer_obj, + "label": post_obj['label'] + } ) def delete_password(self, post_obj): @@ -635,15 +702,19 @@ r.status_code if r.status_code == 200: return r.json() - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error( + { + "action": "delete_password", + "message": f"Nextcloud instance returned status code {r.status_code}", + } ) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "delete_password", + "message": f"Timeout ({self.timeout} sec) error doing GET request", + "error": error, + } ) @@ -660,15 +731,21 @@ r.status_code if r.status_code == 200: return r.json() - self._log.error( - "Nextcloud instance returned status code: %s", - r.status_code + self.error( + { + "action": "update_password", + "message": f"Nextcloud instance returned status code {r.status_code}", + "object": post_obj, + } ) except requests.exceptions.ReadTimeout as error: - self._log.error( - "Timeout (%s sec) error doing GET request. %s", - self.timeout, - error + self.error( + { + "action": "update_password", + "message": f"Timeout ({self.timeout} sec) error doing GET request", + "object": post_obj, + "error": error, + } ) def is_same_key(self, key, dict1, dict2): @@ -681,13 +758,23 @@ r.status_code def is_same_password(self, obj1, obj2): '''Test if two password objects are the same or similar''' if obj1 == obj2: + self.debug( + {"action": "notify_exact_match", "object": { "obj1": obj1, "obj2": obj2 } } + ) return True + if obj1['label'] == obj2['label']: + self.debug( + { "action": "notify_name_match", "object": { "obj1": obj1, "obj2": obj2 } } + ) if ( self.is_same_key('username', obj1, obj2) and self.is_same_key('password', obj1, obj2) and self.is_same_key('url', obj1, obj2) and self.is_same_key('folder', obj1, obj2) ): + self.debug( + { "action": "notify_match", "object": { "obj1": obj1, "obj2": obj2 } } + ) return True return False @@ -727,48 +814,80 @@ class NcPasswordClient: } self.nc = NextcloudHandler(params) + def debug(self, obj): + '''Show debug information''' + self._log.debug( + json.dumps(obj, indent=2) + ) + + def warning(self, obj): + '''Show warning information''' + self._log.warning( + json.dumps(obj, indent=2) + ) + + def info(self, obj): + '''Show information''' + self.info( + json.dumps(obj, indent=2) + ) + + def error(self, obj): + '''Show error information''' + self._log.error( + json.dumps(obj, indent=2) + ) + def get_password(self, name): '''Get a single password''' - print(json.dumps(self.nc.get_password(name), indent=2)) + self.info(self.nc.get_password(name)) def list_passwords(self): '''List all passwords''' - print(json.dumps(self.nc.list_passwords(), indent=2)) + self.info(self.nc.list_passwords()) def list_passwords_folders(self): '''List all password folders''' - print(json.dumps(self.nc.list_passwords_folders(), indent=2)) + self.info(self.nc.list_passwords_folders()) def create_password(self, obj): '''Create a password with an object''' - self._log.debug( - "Creating password %s", - obj + self.debug( + { "action": "create_password", "object": obj } + ) + self.debug( + { + "action": "created_password", + "object": self.nc.create_password(obj) + } ) - print(json.dumps(self.nc.create_password(obj), indent=2)) def delete_password(self, name): '''Delete a password''' for password in self.nc.list_passwords(): if password['label'] == name: - self._log.debug( - "Deleting password:%s", - json.dumps(password) + self.debug( + { "action": "delete_password", "object": password } + ) + self.debug( + { "action": "deleted_password", "object": self.nc.delete_password(password)} ) - print(json.dumps(self.nc.delete_password(password), indent=2)) return True - self._log.warning( - "Password with name '%s' doesn't exist", - name + self.warning( + { + "action": "delete_password", + "message": "Password with that name doesn't exist", + "object": name + } ) def create_passwords_folder(self, name): '''Create a passwords folder''' - print(json.dumps(self.nc.create_passwords_folder(name), indent=2)) + self.info(self.nc.create_passwords_folder(name)) def delete_passwords_folder(self, name): '''Delete a passwords folder''' - print(json.dumps(self.nc.delete_passwords_folder(name), indent=2)) + self.info(self.nc.delete_passwords_folder(name)) def migrate_pass(self, limit=-1): '''Migrate password store to Nextcloud pass''' @@ -794,8 +913,12 @@ class NcPasswordClient: field = line value = '' if field != '' and value != '': + if field == 'login': + field = 'username' obj[field] = value - print(json.dumps(self.nc.create_password(obj), indent=2)) + self.debug( + { "action": "created_password", "object": self.nc.create_password(obj) } + ) count += 1 def _init_log(self):