From 09155d0f46a4ae8033b5cfdf0112f9fd0a36e5da Mon Sep 17 00:00:00 2001 From: "Antonio J. Delgado" Date: Sun, 10 Nov 2024 13:26:36 +0200 Subject: [PATCH] Add functions to call api --- nc_password_client/nc_password_client.py | 59 +++++++++++--- nc_password_client/nextcloud.py | 97 ++++++++++++++++++++++-- 2 files changed, 139 insertions(+), 17 deletions(-) diff --git a/nc_password_client/nc_password_client.py b/nc_password_client/nc_password_client.py index 44b93ad..cc955a1 100644 --- a/nc_password_client/nc_password_client.py +++ b/nc_password_client/nc_password_client.py @@ -5,6 +5,7 @@ # (c) 2024 Antonio J. Delgado """Nextcloud Password client""" +import json import sys import os import logging @@ -17,9 +18,10 @@ from nextcloud import NextcloudHandler class NcPasswordClient: '''Nextcloud Password Client''' - def __init__(self, **kwargs): - self.config = kwargs - if 'log_file' not in kwargs or kwargs['log_file'] is None: + def __init__(self, debug_level, log_file, host, user, api_token, cse_password): + self.config = {} + self.config['debug_level'] = debug_level + if log_file is None: self.config['log_file'] = os.path.join( os.environ.get( 'HOME', @@ -32,6 +34,21 @@ class NcPasswordClient: 'nc_password_client.log' ) self._init_log() + params = { + "host": host, + "user": user, + "api_token": api_token, + "cse_password": cse_password + } + self.nc = NextcloudHandler(params) + + def get_password(self, name): + '''Get a single password''' + print(json.dumps(self.nc.get_password(name), indent=2)) + + def list_passwords(self): + '''List all passwords''' + print(json.dumps(self.nc.list_passwords(), indent=2)) def _init_log(self): ''' Initialize log object ''' @@ -73,7 +90,7 @@ class NcPasswordClient: return True -@click.command() +@click.group() @click.option( "--debug-level", "-d", @@ -93,18 +110,40 @@ class NcPasswordClient: help='Nextcloud host name' ) @click.option( - '--username', '-u', + '--user', '-u', required=True, help='Nextcloud user name' ) @click.option( - '--password', '-p', + '--api-token', '-a', required=True, - help='Nextcloud user password or, better, application token' + help='Nextcloud user\'s application token' +) +@click.option( + '--cse-password', '-p', + help='Nextcloud user\'s end-to-end encryption password' ) @click_config_file.configuration_option() -def __main__(**kwargs): - return NcPasswordClient(**kwargs) +@click.pass_context +def cli(ctx, debug_level, log_file, host, user, api_token, cse_password): + '''Client function to pass context''' + ctx.ensure_object(dict) + ctx.obj['NcPasswordClient'] = NcPasswordClient(debug_level, log_file, host, user, api_token, cse_password) + +@cli.command() +@click.option('--name', '-n', required=True, help='Name of the password to show') +@click_config_file.configuration_option() +@click.pass_context +def show(ctx, name): + '''Show a single password''' + ctx.obj['NcPasswordClient'].get_password(name) + +@cli.command() +@click_config_file.configuration_option() +@click.pass_context +def list(ctx): + '''Show all password''' + ctx.obj['NcPasswordClient'].list_passwords() if __name__ == "__main__": - __main__() + cli(obj={}) diff --git a/nc_password_client/nextcloud.py b/nc_password_client/nextcloud.py index 8c1f8de..b7d40ac 100644 --- a/nc_password_client/nextcloud.py +++ b/nc_password_client/nextcloud.py @@ -1,12 +1,15 @@ #!/usr/bin/env python3 # -*- encoding: utf-8 -*- -# Copyright https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py +# Based on https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py """Nextcloud handler class""" from __future__ import absolute_import, division, print_function __metaclass__ = type import os +import sys import traceback +import logging +from logging.handlers import SysLogHandler from xml.dom import minidom import binascii import json @@ -46,14 +49,54 @@ dios_mio = """ class NextcloudErrorHandler: - def __init__(self, fail_json): - self.fail = fail_json + def __init__(self): + self.config = {} + self._init_log() + + def _init_log(self): + ''' Initialize log object ''' + self._log = logging.getLogger("nc_password_client") + self._log.setLevel(logging.DEBUG) + + sysloghandler = SysLogHandler() + sysloghandler.setLevel(logging.DEBUG) + self._log.addHandler(sysloghandler) + + streamhandler = logging.StreamHandler(sys.stdout) + streamhandler.setLevel( + logging.getLevelName(self.config.get("debug_level", 'INFO')) + ) + self._log.addHandler(streamhandler) + + if 'log_file' in self.config: + log_file = self.config['log_file'] + else: + home_folder = os.environ.get( + 'HOME', os.environ.get('USERPROFILE', '') + ) + log_folder = os.path.join(home_folder, "log") + log_file = os.path.join(log_folder, "nc_password_client.log") + + if not os.path.exists(os.path.dirname(log_file)): + os.mkdir(os.path.dirname(log_file)) + + filehandler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=102400000 + ) + # create formatter + formatter = logging.Formatter( + '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' + ) + filehandler.setFormatter(formatter) + filehandler.setLevel(logging.DEBUG) + self._log.addHandler(filehandler) + return True def status_code_error(self, status): try: - self.fail(msg='Nextcloud returned with status code {SC}'.format(SC=status)) + self._log.error('Nextcloud returned with status code {SC}'.format(SC=status)) except Exception: - self.fail('Nextcloud returned with status code {SC}'.format(SC=status)) + self._log.error('Nextcloud returned with status code {SC}'.format(SC=status)) def parameter_spects(spec_arguments): @@ -81,8 +124,9 @@ def decrypt_item(item_key, item, key='label'): class NextcloudHandler: - def __init__(self, params, fail_json): - self.exit = NextcloudErrorHandler(fail_json) + def __init__(self, params): + self._init_log(params) + self.exit = NextcloudErrorHandler() self.HTTP = 'https' self.ssl = True if params.get('ssl_mode') == 'http': @@ -151,6 +195,45 @@ class NextcloudHandler: ) self.keychain = json.loads(decrypt(text, key)) + def _init_log(self, params): + ''' Initialize log object ''' + self._log = logging.getLogger("nc_password_client") + self._log.setLevel(logging.DEBUG) + + sysloghandler = SysLogHandler() + sysloghandler.setLevel(logging.DEBUG) + self._log.addHandler(sysloghandler) + + streamhandler = logging.StreamHandler(sys.stdout) + streamhandler.setLevel( + logging.getLevelName(params.get("debug_level", 'INFO')) + ) + self._log.addHandler(streamhandler) + + if 'log_file' in params: + log_file = params['log_file'] + else: + home_folder = os.environ.get( + 'HOME', os.environ.get('USERPROFILE', '') + ) + log_folder = os.path.join(home_folder, "log") + log_file = os.path.join(log_folder, "nc_password_client.log") + + if not os.path.exists(os.path.dirname(log_file)): + os.mkdir(os.path.dirname(log_file)) + + filehandler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=102400000 + ) + # create formatter + formatter = logging.Formatter( + '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' + ) + filehandler.setFormatter(formatter) + filehandler.setLevel(logging.DEBUG) + self._log.addHandler(filehandler) + return True + def headers(self): headers = { 'Accept': 'application/json',