Unify output to json

This commit is contained in:
Antonio J. Delgado 2024-11-11 16:40:15 +02:00
parent 7272d3dce8
commit 8335783e89

View file

@ -101,17 +101,32 @@ class NextcloudHandler:
self.host = params.get('host') or os.environ.get('NEXTCLOUD_HOST') self.host = params.get('host') or os.environ.get('NEXTCLOUD_HOST')
if self.host is None: if self.host is None:
self._log.error('Unable to continue. No Nextcloud Host is given.') self.error(
{
"action": "check_params",
"message": 'Unable to continue. No Nextcloud Host is given.',
}
)
sys.exit(1) sys.exit(1)
self.user = params.get('user') or os.environ.get('NEXTCLOUD_USER') self.user = params.get('user') or os.environ.get('NEXTCLOUD_USER')
if self.user is None: if self.user is None:
self._log.error('Unable to continue. No Nextcloud User is given.') self.error(
{
"action": "check_params",
"message": 'Unable to continue. No Nextcloud User is given.'
}
)
sys.exit(2) sys.exit(2)
self.token = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN') self.token = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN')
if self.token is None: if self.token is None:
self._log.error('Unable to continue. No Nextcloud Token is given.') self.error(
{
"action": "check_params",
"message": 'Unable to continue. No Nextcloud Token is given.'
}
)
sys.exit(3) sys.exit(3)
self.e2e_password = False self.e2e_password = False
@ -153,6 +168,30 @@ class NextcloudHandler:
) )
self.keychain = json.loads(decrypt(text, key)) self.keychain = json.loads(decrypt(text, key))
def debug(self, obj):
'''Show debug information'''
self._log.debug(
json.dumps(obj, indent=2)
)
def warning(self, obj):
'''Show warning information'''
self._log.warning(
json.dumps(obj, indent=2)
)
def info(self, obj):
'''Show information'''
self._log.info(
json.dumps(obj, indent=2)
)
def error(self, obj):
'''Show error information'''
self._log.error(
json.dumps(obj, indent=2)
)
def _init_log(self, params): def _init_log(self, params):
''' Initialize log object ''' ''' Initialize log object '''
self._log = logging.getLogger("nc_password_client") self._log = logging.getLogger("nc_password_client")
@ -212,22 +251,33 @@ class NextcloudHandler:
) )
if r.status_code == 200: if r.status_code == 200:
return r return r.json()
elif r.status_code == 404: elif r.status_code == 404:
self._log.error( self.error(
'Path %s does not exist', {
path "action": "get",
"message": 'Path does not exist',
"object": path
}
) )
sys.exit(3) sys.exit(3)
else: else:
self._log.error(r.status_code) self.error(
{
"action": "get",
"message": f'Server return error {r.status_code}',
}
)
sys.exit(4) sys.exit(4)
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "get",
error "message": f"Timeout ({self.timeout} sec) error doing GET request. %s",
"error": error
}
) )
return None
def propfind(self, path): def propfind(self, path):
'''Do a PROPFIND request''' '''Do a PROPFIND request'''
@ -295,15 +345,20 @@ class NextcloudHandler:
elif r.status_code == 404: elif r.status_code == 404:
return {} return {}
else: else:
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "propfind",
"message": f"Nextcloud instance returned status code: {r.status_code}",
"path": path,
}
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "propfind",
error "message": f"Timeout ({self.timeout} sec) error doing GET request.",
"error": error
}
) )
def put(self, path, src=None): def put(self, path, src=None):
@ -328,15 +383,19 @@ class NextcloudHandler:
if r.status_code in [200, 201, 204]: if r.status_code in [200, 201, 204]:
return r, True return r, True
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "put",
"message": f"Nextcloud instance returned status code {r.status_code}",
}
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "put",
error "message": f"Timeout ({self.timeout} sec) error doing GET request",
"error": error
}
) )
def delete(self, path): def delete(self, path):
@ -352,15 +411,20 @@ class NextcloudHandler:
elif r.status_code == 404: elif r.status_code == 404:
return r, False return r, False
else: else:
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "delete",
"message": f"Nextcloud instance returned status code {r.status_code}",
"path": path,
}
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "delete",
error "message": f"Timeout ({self.timeout} sec) error doing GET request",
"error": error
}
) )
def talk(self, message, channel): def talk(self, message, channel):
@ -383,27 +447,26 @@ class NextcloudHandler:
if r.status_code == 201: if r.status_code == 201:
return r, True return r, True
else: else:
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "talk",
"message": f"Nextcloud instance returned status code {r.status_code}",
"talk_message": message,
"channel": channel
}
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "talk",
error "message": f"Timeout ({self.timeout} sec) error doing talk request",
"error": error
}
) )
def request_passwords_session(self): def request_passwords_session(self):
'''Request a Passwords API session''' '''Request a Passwords API session'''
r = self.get("index.php/apps/passwords/api/1.0/session/request") return self.get("index.php/apps/passwords/api/1.0/session/request")
if r.status_code == 200:
return r.json()
else:
self._log.error(
"Nextcloud instance returned status code: %s",
r.status_code
)
def open_passwords_session(self, password_hash): def open_passwords_session(self, password_hash):
'''Open a Passwords API session''' '''Open a Passwords API session'''
@ -422,43 +485,25 @@ class NextcloudHandler:
self.x_api_session = r.headers.get('X-API-SESSION') self.x_api_session = r.headers.get('X-API-SESSION')
return r.json() return r.json()
else: else:
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "open_password_session",
"message": f"Nextcloud instance returned status code {r.status_code}",
"password_hash": password_hash
}
) )
def close_passwords_session(self): def close_passwords_session(self):
'''Close Passwords API session''' '''Close Passwords API session'''
r = self.get("index.php/apps/passwords/api/1.0/session/close") return self.get("index.php/apps/passwords/api/1.0/session/close")
if r.status_code == 200:
return r.json()
else:
self._log.error(
"Nextcloud instance returned status code: %s",
r.status_code
)
def list_passwords(self): def list_passwords(self):
'''List all passwords''' '''List all passwords'''
r = self.get("index.php/apps/passwords/api/1.0/password/list") return self.get("index.php/apps/passwords/api/1.0/password/list")
if r.status_code == 200:
return r.json()
else:
self._log.error(
"Nextcloud instance returned status code: %s",
r.status_code
)
def list_passwords_folders(self): def list_passwords_folders(self):
'''List passwords folders''' '''List passwords folders'''
r = self.get("index.php/apps/passwords/api/1.0/folder/list") return self.get("index.php/apps/passwords/api/1.0/folder/list")
if r.status_code == 200:
return r.json()
else:
self._log.error(
"Nextcloud instance returned status code: %s",
r.status_code
)
def delete_passwords_folder(self, name): def delete_passwords_folder(self, name):
'''Delete a passwords folder''' '''Delete a passwords folder'''
@ -467,10 +512,11 @@ class NextcloudHandler:
for folder in all_folders: for folder in all_folders:
if folder['label'] == name: if folder['label'] == name:
folder_id = folder['id'] folder_id = folder['id']
self._log.debug( self.debug({
"Found folder with id '%s' to delete", "action": "delete_passwords_folder",
folder['id'] "message": "found folder to delete",
) "object": folder['id']
})
if folder_id: if folder_id:
post_obj = { post_obj = {
'id': folder_id 'id': folder_id
@ -486,13 +532,20 @@ class NextcloudHandler:
if r.status_code == 201: if r.status_code == 201:
return r.json() return r.json()
else: else:
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "delete_passwords_folder",
"message": f"Nextcloud instance returned status code {r.status_code}",
"folder_name": name
}
) )
else: else:
self._log.error( self.error(
"Fodler '{name}' not found." {
"action": "delete_passwords_folder",
"message": "Folder not found",
"folder_name": name
}
) )
def exists_passwords_folder(self, name): def exists_passwords_folder(self, name):
@ -521,14 +574,20 @@ r.status_code
if r.status_code == 201: if r.status_code == 201:
return r.json() return r.json()
else: else:
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "create_passwords_folder",
"message": f"Nextcloud instance returned status code {r.status_code}",
"folder_name": name
}
) )
else: else:
self._log.warning( self.warning(
"Password folder '%s' already exists", {
name "action": "create_passwords_folder",
"message": "Passwords folder already exists",
"folder_name": name
}
) )
def get_passwords_folder(self, name): def get_passwords_folder(self, name):
@ -567,17 +626,13 @@ r.status_code
def fetch_generated_password(self): def fetch_generated_password(self):
'''Fetch a generated password''' '''Fetch a generated password'''
r = self.get('index.php/apps/passwords/api/1.0/service/password') return self.get('index.php/apps/passwords/api/1.0/service/password').get('password')
if r.status_code == 200:
return [r.json().get('password')]
else:
self._log.error(
"Nextcloud instance returned status code: %s",
r.status_code
)
def exists_password(self, obj): def exists_password(self, obj):
'''Test if a password exist with the same name''' '''Test if a password exist with the same name'''
self.debug(
{ "action": "check_exists_password", "object": obj }
)
for password in self.list_passwords(): for password in self.list_passwords():
if self.is_same_password(obj, password): if self.is_same_password(obj, password):
return True return True
@ -587,13 +642,15 @@ r.status_code
'''Create/add a password''' '''Create/add a password'''
if 'folder' in post_obj: if 'folder' in post_obj:
post_obj['folder'] = self.get_folder_id(post_obj['folder']) post_obj['folder'] = self.get_folder_id(post_obj['folder'])
if not 'username' in post_obj:
post_obj['username'] = ''
if not 'url' in post_obj:
post_obj['url'] = ''
if not self.exists_password(post_obj): if not self.exists_password(post_obj):
try: try:
safer_obj = post_obj safer_obj = dict(post_obj, **{ 'password': '***' })
safer_obj['password'] = '***' self.debug(
self._log.debug( { "action": "create_password", "object": safer_obj }
"Creating password: %s",
json.dumps(safer_obj, indent=2)
) )
r = requests.post( r = requests.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create', f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create',
@ -605,21 +662,31 @@ r.status_code
if r.status_code == 201: if r.status_code == 201:
return r.json() return r.json()
self._log.error(r.json()) self.error(r.json())
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "create_password",
"message": f"Nextcloud instance returned status code: {r.status_code}",
"object": post_obj
}
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "create_password",
error "message": f"Timeout ({self.timeout} sec) error doing GET request",
"object": post_obj,
"error": error
}
) )
else: else:
self._log.warning( self.warning(
"Password with name '%s' already exists", {
post_obj['label'] "action": "create_password",
"message": "Password with that name/label already exists",
"object": safer_obj,
"label": post_obj['label']
}
) )
def delete_password(self, post_obj): def delete_password(self, post_obj):
@ -635,15 +702,19 @@ r.status_code
if r.status_code == 200: if r.status_code == 200:
return r.json() return r.json()
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "delete_password",
"message": f"Nextcloud instance returned status code {r.status_code}",
}
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "delete_password",
error "message": f"Timeout ({self.timeout} sec) error doing GET request",
"error": error,
}
) )
@ -660,15 +731,21 @@ r.status_code
if r.status_code == 200: if r.status_code == 200:
return r.json() return r.json()
self._log.error( self.error(
"Nextcloud instance returned status code: %s", {
r.status_code "action": "update_password",
"message": f"Nextcloud instance returned status code {r.status_code}",
"object": post_obj,
}
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
self._log.error( self.error(
"Timeout (%s sec) error doing GET request. %s", {
self.timeout, "action": "update_password",
error "message": f"Timeout ({self.timeout} sec) error doing GET request",
"object": post_obj,
"error": error,
}
) )
def is_same_key(self, key, dict1, dict2): def is_same_key(self, key, dict1, dict2):
@ -681,13 +758,23 @@ r.status_code
def is_same_password(self, obj1, obj2): def is_same_password(self, obj1, obj2):
'''Test if two password objects are the same or similar''' '''Test if two password objects are the same or similar'''
if obj1 == obj2: if obj1 == obj2:
self.debug(
{"action": "notify_exact_match", "object": { "obj1": obj1, "obj2": obj2 } }
)
return True return True
if obj1['label'] == obj2['label']:
self.debug(
{ "action": "notify_name_match", "object": { "obj1": obj1, "obj2": obj2 } }
)
if ( if (
self.is_same_key('username', obj1, obj2) and self.is_same_key('username', obj1, obj2) and
self.is_same_key('password', obj1, obj2) and self.is_same_key('password', obj1, obj2) and
self.is_same_key('url', obj1, obj2) and self.is_same_key('url', obj1, obj2) and
self.is_same_key('folder', obj1, obj2) self.is_same_key('folder', obj1, obj2)
): ):
self.debug(
{ "action": "notify_match", "object": { "obj1": obj1, "obj2": obj2 } }
)
return True return True
return False return False
@ -727,48 +814,80 @@ class NcPasswordClient:
} }
self.nc = NextcloudHandler(params) self.nc = NextcloudHandler(params)
def debug(self, obj):
'''Show debug information'''
self._log.debug(
json.dumps(obj, indent=2)
)
def warning(self, obj):
'''Show warning information'''
self._log.warning(
json.dumps(obj, indent=2)
)
def info(self, obj):
'''Show information'''
self.info(
json.dumps(obj, indent=2)
)
def error(self, obj):
'''Show error information'''
self._log.error(
json.dumps(obj, indent=2)
)
def get_password(self, name): def get_password(self, name):
'''Get a single password''' '''Get a single password'''
print(json.dumps(self.nc.get_password(name), indent=2)) self.info(self.nc.get_password(name))
def list_passwords(self): def list_passwords(self):
'''List all passwords''' '''List all passwords'''
print(json.dumps(self.nc.list_passwords(), indent=2)) self.info(self.nc.list_passwords())
def list_passwords_folders(self): def list_passwords_folders(self):
'''List all password folders''' '''List all password folders'''
print(json.dumps(self.nc.list_passwords_folders(), indent=2)) self.info(self.nc.list_passwords_folders())
def create_password(self, obj): def create_password(self, obj):
'''Create a password with an object''' '''Create a password with an object'''
self._log.debug( self.debug(
"Creating password %s", { "action": "create_password", "object": obj }
obj )
self.debug(
{
"action": "created_password",
"object": self.nc.create_password(obj)
}
) )
print(json.dumps(self.nc.create_password(obj), indent=2))
def delete_password(self, name): def delete_password(self, name):
'''Delete a password''' '''Delete a password'''
for password in self.nc.list_passwords(): for password in self.nc.list_passwords():
if password['label'] == name: if password['label'] == name:
self._log.debug( self.debug(
"Deleting password:%s", { "action": "delete_password", "object": password }
json.dumps(password) )
self.debug(
{ "action": "deleted_password", "object": self.nc.delete_password(password)}
) )
print(json.dumps(self.nc.delete_password(password), indent=2))
return True return True
self._log.warning( self.warning(
"Password with name '%s' doesn't exist", {
name "action": "delete_password",
"message": "Password with that name doesn't exist",
"object": name
}
) )
def create_passwords_folder(self, name): def create_passwords_folder(self, name):
'''Create a passwords folder''' '''Create a passwords folder'''
print(json.dumps(self.nc.create_passwords_folder(name), indent=2)) self.info(self.nc.create_passwords_folder(name))
def delete_passwords_folder(self, name): def delete_passwords_folder(self, name):
'''Delete a passwords folder''' '''Delete a passwords folder'''
print(json.dumps(self.nc.delete_passwords_folder(name), indent=2)) self.info(self.nc.delete_passwords_folder(name))
def migrate_pass(self, limit=-1): def migrate_pass(self, limit=-1):
'''Migrate password store to Nextcloud pass''' '''Migrate password store to Nextcloud pass'''
@ -794,8 +913,12 @@ class NcPasswordClient:
field = line field = line
value = '' value = ''
if field != '' and value != '': if field != '' and value != '':
if field == 'login':
field = 'username'
obj[field] = value obj[field] = value
print(json.dumps(self.nc.create_password(obj), indent=2)) self.debug(
{ "action": "created_password", "object": self.nc.create_password(obj) }
)
count += 1 count += 1
def _init_log(self): def _init_log(self):