From 679a4300b77d3a120d17287ca6d6bf200a5434ba Mon Sep 17 00:00:00 2001 From: "Antonio J. Delgado" Date: Sun, 10 Nov 2024 18:38:23 +0200 Subject: [PATCH] Incorporte class to single file and add more API endpoints --- nc_password_client/__init__.py | 2 + nc_password_client/nc_password_client.py | 628 ++++++++++++++++++++++- nc_password_client/nextcloud.py | 498 ------------------ pyproject.toml | 2 +- requirements.txt | 4 +- setup.py | 4 + 6 files changed, 634 insertions(+), 504 deletions(-) delete mode 100644 nc_password_client/nextcloud.py diff --git a/nc_password_client/__init__.py b/nc_password_client/__init__.py index e69de29..56fafa5 100644 --- a/nc_password_client/__init__.py +++ b/nc_password_client/__init__.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- diff --git a/nc_password_client/nc_password_client.py b/nc_password_client/nc_password_client.py index 457a82a..e100e1f 100644 --- a/nc_password_client/nc_password_client.py +++ b/nc_password_client/nc_password_client.py @@ -3,17 +3,600 @@ # # This script is licensed under GNU GPL version 2.0 or above # (c) 2024 Antonio J. Delgado +# Based on https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py + """Nextcloud Password client""" +from __future__ import absolute_import, division, print_function +__metaclass__ = type +import traceback import json import sys import os import logging from logging.handlers import SysLogHandler +from xml.dom import minidom +import binascii import click import click_config_file -from nextcloud import NextcloudHandler +try: + import requests + HAS_REQUESTS_LIB = True +except ImportError: + HAS_REQUESTS_LIB = False + IMPORT_ERROR = traceback.format_exc() +try: + import pysodium + HAS_PYSODIUM_LIB = True +except ImportError: + HAS_PYSODIUM_LIB = False + IMPORT_ERROR = traceback.format_exc() + + +DIOS_MIO = """ + + + + + + + + + + + + + + + +""" + +class NextcloudErrorHandler: + '''Handle errors in Nextcloud''' + def __init__(self): + self.config = {} + self._init_log() + + def _init_log(self): + ''' Initialize log object ''' + self._log = logging.getLogger("nc_password_client") + self._log.setLevel(logging.DEBUG) + + sysloghandler = SysLogHandler() + sysloghandler.setLevel(logging.DEBUG) + self._log.addHandler(sysloghandler) + + streamhandler = logging.StreamHandler(sys.stdout) + streamhandler.setLevel( + logging.getLevelName(self.config.get("debug_level", 'INFO')) + ) + self._log.addHandler(streamhandler) + + if 'log_file' in self.config: + log_file = self.config['log_file'] + else: + home_folder = os.environ.get( + 'HOME', os.environ.get('USERPROFILE', '') + ) + log_folder = os.path.join(home_folder, "log") + log_file = os.path.join(log_folder, "nc_password_client.log") + + if not os.path.exists(os.path.dirname(log_file)): + os.mkdir(os.path.dirname(log_file)) + + filehandler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=102400000 + ) + # create formatter + formatter = logging.Formatter( + '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' + ) + filehandler.setFormatter(formatter) + filehandler.setLevel(logging.DEBUG) + self._log.addHandler(filehandler) + return True + + def status_code_error(self, status): + '''Process status code error???''' + try: + self._log.error( + 'Nextcloud returned with status code %s', + status + ) + except Exception: + self._log.error( + 'Nextcloud returned with status code %s', + status + ) + + +def parameter_spects(spec_arguments): + '''Specification of parameters???''' + argument_spec = dict( + host=dict(required=False, type='str'), + api_token=dict(required=False, type='str', no_log=True, aliases=['access_token']), + user=dict(required=False, type='str'), + ssl_mode=dict(required=False, type='str', default='https') + ) + + return {**argument_spec, **spec_arguments} + + +def decrypt(encrypted, key): + '''Decrypt encrypted data''' + nonce = encrypted[0:pysodium.crypto_secretbox_NONCEBYTES] + ciphertext = encrypted[pysodium.crypto_secretbox_NONCEBYTES:] + return pysodium.crypto_secretbox_open(ciphertext, nonce, key) + + +def decrypt_item(item_key, item, key='label'): + '''Decrypt single item''' + if len(item[key]) == 0: + return "" + vals = binascii.unhexlify(item[key]) + return decrypt(vals, binascii.unhexlify(item_key)).decode() + + +class NextcloudHandler: + '''Handle Nextcloud Password API''' + def __init__(self, params): + self._init_log(params) + self.exit = NextcloudErrorHandler() + self.http = 'https' + self.ssl = True + if params.get('ssl_mode') == 'http': + self.http = 'http' + elif params.get('ssl_mode') == 'skip': + self.ssl = False + elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'http': + self.http = 'http' + elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'skip': + self.ssl = False + + self.details = params.get('details') or False + self.x_api_session = None + + self.host = params.get('host') or os.environ.get('NEXTCLOUD_HOST') + if self.host is None: + self.exit.status_code_error('Unable to continue. No Nextcloud Host is given.') + + self.user = params.get('user') or os.environ.get('NEXTCLOUD_USER') + if self.user is None: + self.exit.status_code_error('Unable to continue. No Nextcloud User is given.') + + self.token = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN') + if self.token is None: + self.exit.status_code_error('Unable to continue. No Nextcloud Token is given.') + + self.e2e_password = False + if params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD'): + # manage passwords client side encryption + self.e2e_password = True + password = params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD') + retval = self.request_passwords_session() + password_salt = binascii.unhexlify(retval['challenge']['salts'][0]) + generic_hash_key = binascii.unhexlify(retval['challenge']['salts'][1]) + password_hash_salt = binascii.unhexlify(retval['challenge']['salts'][2]) + + generic_hash = pysodium.crypto_generichash( + password.encode() + password_salt, + k=generic_hash_key, + outlen=pysodium.crypto_generichash_BYTES_MAX + ) + password_hash = pysodium.crypto_pwhash( + pysodium.crypto_box_SEEDBYTES, + generic_hash, + password_hash_salt, + pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE, + pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE, + pysodium.crypto_pwhash_ALG_ARGON2ID13 + ) + # self.cse['keys']['CSEv1r1'] + self.cse = self.open_passwords_session(password_hash.hex()) + # decrypt CSEv1r1 chain + vals = binascii.unhexlify(self.cse['keys']['CSEv1r1']) + salt = vals[0:pysodium.crypto_pwhash_SALTBYTES] + text = vals[pysodium.crypto_pwhash_SALTBYTES:] + key = pysodium.crypto_pwhash( + pysodium.crypto_box_SEEDBYTES, + password, + salt, + pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE, + pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE, + pysodium.crypto_pwhash_ALG_ARGON2ID13 + ) + self.keychain = json.loads(decrypt(text, key)) + + def _init_log(self, params): + ''' Initialize log object ''' + self._log = logging.getLogger("nc_password_client") + self._log.setLevel(logging.DEBUG) + + sysloghandler = SysLogHandler() + sysloghandler.setLevel(logging.DEBUG) + self._log.addHandler(sysloghandler) + + streamhandler = logging.StreamHandler(sys.stdout) + streamhandler.setLevel( + logging.getLevelName(params.get("debug_level", 'INFO')) + ) + self._log.addHandler(streamhandler) + + if 'log_file' in params: + log_file = params['log_file'] + else: + home_folder = os.environ.get( + 'HOME', os.environ.get('USERPROFILE', '') + ) + log_folder = os.path.join(home_folder, "log") + log_file = os.path.join(log_folder, "nc_password_client.log") + + if not os.path.exists(os.path.dirname(log_file)): + os.mkdir(os.path.dirname(log_file)) + + filehandler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=102400000 + ) + # create formatter + formatter = logging.Formatter( + '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' + ) + filehandler.setFormatter(formatter) + filehandler.setLevel(logging.DEBUG) + self._log.addHandler(filehandler) + return True + + def headers(self): + '''Generate headers''' + headers = { + 'Accept': 'application/json', + 'OCS-APIRequest': 'true' + } + if self.x_api_session: + headers['X-API-SESSION'] = self.x_api_session + return headers + + def get(self, path): + '''Do a GET request''' + r = requests.get( + f'{self.http}://{self.host}/{path}', + auth=(self.user, self.token), verify=self.ssl, headers=self.headers() + ) + + if r.status_code == 200: + return r + elif r.status_code == 404: + self.exit.status_code_error(f'File {path} does not exist') + else: + self.exit.status_code_error(r.status_code) + + def propfind(self, path): + '''Do a PROPFIND request''' + s = requests.Session() + s.auth = (self.user, self.token) + r = s.request( + method='PROPFIND', + url=f'{self.http}://{self.host}/{path}', + headers={'Depth': '0'}, + data=DIOS_MIO, + verify=self.ssl + ) + + if r.status_code == 207: + dom = minidom.parseString(r.text.encode('ascii', 'xmlcharrefreplace')) + try: + return { + 'last_modified': dom.getElementsByTagName( + 'd:getlastmodified' + )[0].firstChild.data, + 'content_type': dom.getElementsByTagName( + 'd:getcontenttype' + )[0].firstChild.data, + 'file_id': int(dom.getElementsByTagName( + 'oc:fileid' + )[0].firstChild.data), + 'size': int(dom.getElementsByTagName( + 'oc:size' + )[0].firstChild.data), + 'favorite': int(dom.getElementsByTagName( + 'oc:favorite' + )[0].firstChild.data), + 'owner': dom.getElementsByTagName( + 'oc:owner-display-name' + )[0].firstChild.data, + 'href': dom.getElementsByTagName( + 'd:href' + )[0].firstChild.data + } + except Exception: + # I guess it's folder, because it has no content_type + return { + 'last_modified': dom.getElementsByTagName( + 'd:getlastmodified' + )[0].firstChild.data, + 'content_type': 'inode/directory', + 'file_id': dom.getElementsByTagName( + 'oc:fileid' + )[0].firstChild.data, + 'size': dom.getElementsByTagName( + 'oc:size' + )[0].firstChild.data, + 'favorite': dom.getElementsByTagName( + 'oc:favorite' + )[0].firstChild.data, + 'owner': dom.getElementsByTagName( + 'oc:owner-display-name' + )[0].firstChild.data, + 'href': dom.getElementsByTagName( + 'd:href' + )[0].firstChild.data + } + + elif r.status_code == 404: + return {} + + else: + self.exit.status_code_error(r.status_code) + + def put(self, path, src=None): + '''Do a PUT request''' + if src: + r = requests.put( + f'{self.http}://{self.host}/{path}', + data=open(src, 'rb'), auth=(self.user, self.token), verify=self.ssl + ) + else: + r = requests.put( + f'{self.http}://{self.host}/{path}', + headers=self.headers, auth=(self.user, self.token), verify=self.ssl + ) + + if r.status_code in [200, 201, 204]: + return r, True + else: + self.exit.status_code_error(r.status_code) + + def delete(self, path): + '''Do a DELETE request''' + r = requests.delete( + f'{self.http}://{self.host}/{path}', + auth=(self.user, self.token), verify=self.ssl + ) + + if r.status_code in [200, 204]: + return r, True + elif r.status_code == 404: + return r, False + else: + self.exit.status_code_error(r.status_code) + + def talk(self, message, channel): + '''Post in Talk/Chat''' + body = { + 'message': message, + 'replyTo': 0 + } + + spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat" + + r = requests.post( + f'{self.http}://{self.host}/{spreed_v1_path}/{channel}', + data=body, + headers=self.headers(), + auth=(self.user, self.token), + verify=self.ssl + ) + + if r.status_code == 201: + return r, True + else: + self.exit.status_code_error(r.status_code) + + def request_passwords_session(self): + '''Request a Passwords API session''' + r = self.get("index.php/apps/passwords/api/1.0/session/request") + if r.status_code == 200: + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def open_passwords_session(self, password_hash): + '''Open a Passwords API session''' + post_obj = { + 'challenge': password_hash + } + + r = requests.post( + f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/session/open', + data=post_obj, + headers=self.headers(), + auth=(self.user, self.token), + verify=self.ssl + ) + if r.status_code == 200: + self.x_api_session = r.headers.get('X-API-SESSION') + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def close_passwords_session(self): + '''Close Passwords API session''' + r = self.get("index.php/apps/passwords/api/1.0/session/close") + if r.status_code == 200: + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def list_passwords(self): + '''List all passwords''' + r = self.get("index.php/apps/passwords/api/1.0/password/list") + if r.status_code == 200: + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def list_passwords_folders(self): + '''List passwords folders''' + r = self.get("index.php/apps/passwords/api/1.0/folder/list") + if r.status_code == 200: + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def delete_passwords_folder(self, name): + '''Delete a passwords folder''' + all_folders = self.list_passwords_folders() + folder_id = None + for folder in all_folders: + if folder['label'] == name: + folder_id = folder['id'] + self._log.debug( + "Found folder with id '%s' to delete", + folder['id'] + ) + if folder_id: + post_obj = { + 'id': folder_id + } + r = requests.delete( + f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/delete', + data=post_obj, + headers=self.headers(), + auth=(self.user, self.token), + verify=self.ssl + ) + if r.status_code == 201: + return r.json() + else: + self.exit.status_code_error(r.status_code) + else: + self._log.error( + "Fodler '{name}' not found." + ) + + def exists_passwords_folder(self, name): + '''Test if a passwords folder exists''' + for folder in self.list_passwords_folders(): + if folder.get('label') == name: + return True + return False + + def create_passwords_folder(self, name): + '''Create passwords folder''' + if not self.exists_passwords_folder(name): + post_obj = { + 'label': name + } + + r = requests.post( + f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/create', + data=post_obj, + headers=self.headers(), + auth=(self.user, self.token), + verify=self.ssl + ) + + if r.status_code == 201: + return r.json() + else: + self.exit.status_code_error(r.status_code) + else: + self._log.warning( + "Password folder '%s' already exists", + name + ) + + def get_passwords_folder(self, name): + '''Get Passwords folder''' + for folder in self.list_passwords_folders(): + if folder.get('label') == name: + return folder.get('id') + return None + + def get_password(self, name): + '''Get a password''' + r = self.list_passwords() + ret = [] + for item in r: + if item['cseType'] == 'CSEv1r1' and self.e2e_password: + item_key = self.keychain['keys'].get(item['cseKey']) + if item_key: + if decrypt_item(item_key, item, 'label') == name: + if self.details: + item['password'] = decrypt_item(item_key, item, 'password') + item['url'] = decrypt_item(item_key, item, 'url') + item['username'] = decrypt_item(item_key, item, 'username') + item['label'] = decrypt_item(item_key, item, 'label') + item['notes'] = decrypt_item(item_key, item, 'notes') + item['customFields'] = decrypt_item(item_key, item, 'customFields') + ret.append(item) + else: + ret.append(decrypt_item(item_key, item, 'password')) + else: + if item['label'] == name: + if self.details: + ret.append(item) + else: + ret.append(item['password']) + return ret + + def fetch_generated_password(self): + '''Fetch a generated password''' + r = self.get('index.php/apps/passwords/api/1.0/service/password') + if r.status_code == 200: + return [r.json().get('password')] + else: + self.exit.status_code_error(r.status_code) + + def create_password(self, post_obj): + '''Create/add a password''' + r = requests.post( + f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create', + data=post_obj, + headers=self.headers(), + auth=(self.user, self.token), + verify=self.ssl + ) + + if r.status_code == 201: + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def delete_password(self, post_obj): + '''Delete a password''' + r = requests.delete( + f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete', + data=post_obj, + headers=self.headers(), + auth=(self.user, self.token), + verify=self.ssl + ) + + if r.status_code == 200: + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def update_password(self, post_obj): + '''Update a password''' + r = requests.patch( + f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update', + data=post_obj, + headers=self.headers(), + auth=(self.user, self.token), + verify=self.ssl + ) + + if r.status_code == 200: + return r.json() + else: + self.exit.status_code_error(r.status_code) + + def user(self): + '''Get user''' + return self.user class NcPasswordClient: '''Nextcloud Password Client''' @@ -50,10 +633,22 @@ class NcPasswordClient: '''List all passwords''' print(json.dumps(self.nc.list_passwords(), indent=2)) + def list_passwords_folders(self): + '''List all password folders''' + print(json.dumps(self.nc.list_passwords_folders(), indent=2)) + def create_password(self, obj): '''Create a password with an object''' print(json.dumps(self.nc.create_password(obj), indent=2)) + def create_passwords_folder(self, name): + '''Create a passwords folder''' + print(json.dumps(self.nc.create_passwords_folder(name), indent=2)) + + def delete_passwords_folder(self, name): + '''Delete a passwords folder''' + print(json.dumps(self.nc.delete_passwords_folder(name), indent=2)) + def _init_log(self): ''' Initialize log object ''' self._log = logging.getLogger("nc_password_client") @@ -132,7 +727,9 @@ class NcPasswordClient: def cli(ctx, debug_level, log_file, host, user, api_token, cse_password): '''Client function to pass context''' ctx.ensure_object(dict) - ctx.obj['NcPasswordClient'] = NcPasswordClient(debug_level, log_file, host, user, api_token, cse_password) + ctx.obj['NcPasswordClient'] = NcPasswordClient( + debug_level, log_file, host, user, api_token, cse_password + ) @cli.command() @click.option('--name', '-n', required=True, help='Name of the password to show') @@ -145,7 +742,7 @@ def show(ctx, name): @cli.command() @click_config_file.configuration_option() @click.pass_context -def list(ctx): +def ls(ctx): '''Show all password''' ctx.obj['NcPasswordClient'].list_passwords() @@ -154,8 +751,31 @@ def list(ctx): @click_config_file.configuration_option() @click.pass_context def create_password(ctx, obj): - '''Show all password''' + '''Create a password''' ctx.obj['NcPasswordClient'].create_password(json.loads(obj)) +@cli.command() +@click.option('--name', '-n', required=True, help='Name of the passwords folder to create') +@click_config_file.configuration_option() +@click.pass_context +def create_passwords_folder(ctx, name): + '''Create a password folder''' + ctx.obj['NcPasswordClient'].create_passwords_folder(name) + +@cli.command() +@click_config_file.configuration_option() +@click.pass_context +def list_passwords_folders(ctx): + '''List all password folders''' + ctx.obj['NcPasswordClient'].list_passwords_folders() + +@cli.command() +@click.option('--name', '-n', required=True, help='Name of the passwords folder to delete') +@click_config_file.configuration_option() +@click.pass_context +def delete_passwords_folder(ctx, name): + '''Delete a password folder''' + ctx.obj['NcPasswordClient'].delete_passwords_folder(name) + if __name__ == "__main__": cli(obj={}) diff --git a/nc_password_client/nextcloud.py b/nc_password_client/nextcloud.py deleted file mode 100644 index b7d40ac..0000000 --- a/nc_password_client/nextcloud.py +++ /dev/null @@ -1,498 +0,0 @@ -#!/usr/bin/env python3 -# -*- encoding: utf-8 -*- -# Based on https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py -"""Nextcloud handler class""" - -from __future__ import absolute_import, division, print_function -__metaclass__ = type -import os -import sys -import traceback -import logging -from logging.handlers import SysLogHandler -from xml.dom import minidom -import binascii -import json - -try: - import requests - HAS_REQUESTS_LIB = True -except ImportError: - HAS_REQUESTS_LIB = False - IMPORT_ERROR = traceback.format_exc() - -try: - import pysodium - HAS_PYSODIUM_LIB = True -except ImportError: - HAS_PYSODIUM_LIB = False - IMPORT_ERROR = traceback.format_exc() - - -dios_mio = """ - - - - - - - - - - - - - - - -""" - - -class NextcloudErrorHandler: - def __init__(self): - self.config = {} - self._init_log() - - def _init_log(self): - ''' Initialize log object ''' - self._log = logging.getLogger("nc_password_client") - self._log.setLevel(logging.DEBUG) - - sysloghandler = SysLogHandler() - sysloghandler.setLevel(logging.DEBUG) - self._log.addHandler(sysloghandler) - - streamhandler = logging.StreamHandler(sys.stdout) - streamhandler.setLevel( - logging.getLevelName(self.config.get("debug_level", 'INFO')) - ) - self._log.addHandler(streamhandler) - - if 'log_file' in self.config: - log_file = self.config['log_file'] - else: - home_folder = os.environ.get( - 'HOME', os.environ.get('USERPROFILE', '') - ) - log_folder = os.path.join(home_folder, "log") - log_file = os.path.join(log_folder, "nc_password_client.log") - - if not os.path.exists(os.path.dirname(log_file)): - os.mkdir(os.path.dirname(log_file)) - - filehandler = logging.handlers.RotatingFileHandler( - log_file, maxBytes=102400000 - ) - # create formatter - formatter = logging.Formatter( - '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' - ) - filehandler.setFormatter(formatter) - filehandler.setLevel(logging.DEBUG) - self._log.addHandler(filehandler) - return True - - def status_code_error(self, status): - try: - self._log.error('Nextcloud returned with status code {SC}'.format(SC=status)) - except Exception: - self._log.error('Nextcloud returned with status code {SC}'.format(SC=status)) - - -def parameter_spects(spec_arguments): - argument_spec = dict( - host=dict(required=False, type='str'), - api_token=dict(required=False, type='str', no_log=True, aliases=['access_token']), - user=dict(required=False, type='str'), - ssl_mode=dict(required=False, type='str', default='https') - ) - - return {**argument_spec, **spec_arguments} - - -def decrypt(encrypted, key): - nonce = encrypted[0:pysodium.crypto_secretbox_NONCEBYTES] - ciphertext = encrypted[pysodium.crypto_secretbox_NONCEBYTES:] - return pysodium.crypto_secretbox_open(ciphertext, nonce, key) - - -def decrypt_item(item_key, item, key='label'): - if len(item[key]) == 0: - return "" - vals = binascii.unhexlify(item[key]) - return decrypt(vals, binascii.unhexlify(item_key)).decode() - - -class NextcloudHandler: - def __init__(self, params): - self._init_log(params) - self.exit = NextcloudErrorHandler() - self.HTTP = 'https' - self.ssl = True - if params.get('ssl_mode') == 'http': - self.HTTP = 'http' - elif params.get('ssl_mode') == 'skip': - self.ssl = False - elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'http': - self.HTTP = 'http' - elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'skip': - self.ssl = False - - self.details = params.get('details') or False - self.x_api_session = None - - self.HOST = params.get('host') or os.environ.get('NEXTCLOUD_HOST') - if self.HOST is None: - self.exit.status_code_error('Unable to continue. No Nextcloud Host is given.') - - self.USER = params.get('user') or os.environ.get('NEXTCLOUD_USER') - if self.USER is None: - self.exit.status_code_error('Unable to continue. No Nextcloud User is given.') - - self.TOKEN = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN') - if self.TOKEN is None: - self.exit.status_code_error('Unable to continue. No Nextcloud Token is given.') - - self.E2E_PASSWORD = False - if params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD'): - """ - manage passwords client side encryption - """ - self.E2E_PASSWORD = True - password = params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD') - retval = self.request_passwords_session() - passwordSalt = binascii.unhexlify(retval['challenge']['salts'][0]) - genericHashKey = binascii.unhexlify(retval['challenge']['salts'][1]) - passwordHashSalt = binascii.unhexlify(retval['challenge']['salts'][2]) - - input = password.encode() + passwordSalt - genericHash = pysodium.crypto_generichash( - input, - k=genericHashKey, - outlen=pysodium.crypto_generichash_BYTES_MAX - ) - passwordHash = pysodium.crypto_pwhash( - pysodium.crypto_box_SEEDBYTES, - genericHash, - passwordHashSalt, - pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE, - pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE, - pysodium.crypto_pwhash_ALG_ARGON2ID13 - ) - # self.cse['keys']['CSEv1r1'] - self.cse = self.open_passwords_session(passwordHash.hex()) - # decrypt CSEv1r1 chain - vals = binascii.unhexlify(self.cse['keys']['CSEv1r1']) - salt = vals[0:pysodium.crypto_pwhash_SALTBYTES] - text = vals[pysodium.crypto_pwhash_SALTBYTES:] - key = pysodium.crypto_pwhash( - pysodium.crypto_box_SEEDBYTES, - password, - salt, - pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE, - pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE, - pysodium.crypto_pwhash_ALG_ARGON2ID13 - ) - self.keychain = json.loads(decrypt(text, key)) - - def _init_log(self, params): - ''' Initialize log object ''' - self._log = logging.getLogger("nc_password_client") - self._log.setLevel(logging.DEBUG) - - sysloghandler = SysLogHandler() - sysloghandler.setLevel(logging.DEBUG) - self._log.addHandler(sysloghandler) - - streamhandler = logging.StreamHandler(sys.stdout) - streamhandler.setLevel( - logging.getLevelName(params.get("debug_level", 'INFO')) - ) - self._log.addHandler(streamhandler) - - if 'log_file' in params: - log_file = params['log_file'] - else: - home_folder = os.environ.get( - 'HOME', os.environ.get('USERPROFILE', '') - ) - log_folder = os.path.join(home_folder, "log") - log_file = os.path.join(log_folder, "nc_password_client.log") - - if not os.path.exists(os.path.dirname(log_file)): - os.mkdir(os.path.dirname(log_file)) - - filehandler = logging.handlers.RotatingFileHandler( - log_file, maxBytes=102400000 - ) - # create formatter - formatter = logging.Formatter( - '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' - ) - filehandler.setFormatter(formatter) - filehandler.setLevel(logging.DEBUG) - self._log.addHandler(filehandler) - return True - - def headers(self): - headers = { - 'Accept': 'application/json', - 'OCS-APIRequest': 'true' - } - if self.x_api_session: - headers['X-API-SESSION'] = self.x_api_session - return headers - - def get(self, path): - r = requests.get( - '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path), - auth=(self.USER, self.TOKEN), verify=self.ssl, headers=self.headers() - ) - - if r.status_code == 200: - return r - elif r.status_code == 404: - self.exit.status_code_error('File {FILE} does not exist'.format(FILE=path)) - else: - self.exit.status_code_error(r.status_code) - - def propfind(self, path): - s = requests.Session() - s.auth = (self.USER, self.TOKEN) - r = s.request( - method='PROPFIND', - url='{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path), - headers={'Depth': '0'}, - data=dios_mio, - verify=self.ssl - ) - - if r.status_code == 207: - dom = minidom.parseString(r.text.encode('ascii', 'xmlcharrefreplace')) - try: - return { - 'last_modified': dom.getElementsByTagName('d:getlastmodified')[0].firstChild.data, - 'content_type': dom.getElementsByTagName('d:getcontenttype')[0].firstChild.data, - 'file_id': int(dom.getElementsByTagName('oc:fileid')[0].firstChild.data), - 'size': int(dom.getElementsByTagName('oc:size')[0].firstChild.data), - 'favorite': int(dom.getElementsByTagName('oc:favorite')[0].firstChild.data), - 'owner': dom.getElementsByTagName('oc:owner-display-name')[0].firstChild.data, - 'href': dom.getElementsByTagName('d:href')[0].firstChild.data - } - except Exception: - # I guess it's folder, because it has no content_type - return { - 'last_modified': dom.getElementsByTagName('d:getlastmodified')[0].firstChild.data, - 'content_type': 'inode/directory', - 'file_id': dom.getElementsByTagName('oc:fileid')[0].firstChild.data, - 'size': dom.getElementsByTagName('oc:size')[0].firstChild.data, - 'favorite': dom.getElementsByTagName('oc:favorite')[0].firstChild.data, - 'owner': dom.getElementsByTagName('oc:owner-display-name')[0].firstChild.data, - 'href': dom.getElementsByTagName('d:href')[0].firstChild.data - } - - elif r.status_code == 404: - return {} - - else: - self.exit.status_code_error(r.status_code) - - def put(self, path, src=None): - - if src: - r = requests.put( - '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path), - data=open(src, 'rb'), auth=(self.USER, self.TOKEN), verify=self.ssl - ) - else: - r = requests.put( - '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path), - headers=self.headers, auth=(self.USER, self.TOKEN), verify=self.ssl - ) - - if r.status_code in [200, 201, 204]: - return r, True - else: - self.exit.status_code_error(r.status_code) - - def delete(self, path): - r = requests.delete( - '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path), - auth=(self.USER, self.TOKEN), verify=self.ssl - ) - - if r.status_code in [200, 204]: - return r, True - elif r.status_code == 404: - return r, False - else: - self.exit.status_code_error(r.status_code) - - def talk(self, message, channel): - body = { - 'message': message, - 'replyTo': 0 - } - - spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat" - - r = requests.post( - '{HTTP}://{HOST}/{V1}/{CHANNEL}'.format(HTTP=self.HTTP, HOST=self.HOST, V1=spreed_v1_path, CHANNEL=channel), - data=body, - headers=self.headers(), - auth=(self.USER, self.TOKEN), - verify=self.ssl - ) - - if r.status_code == 201: - return r, True - else: - self.exit.status_code_error(r.status_code) - - def request_passwords_session(self): - r = self.get("index.php/apps/passwords/api/1.0/session/request") - if r.status_code == 200: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def open_passwords_session(self, passwordHash): - post_obj = { - 'challenge': passwordHash - } - - r = requests.post( - '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/session/open'.format(HTTP=self.HTTP, HOST=self.HOST), - data=post_obj, - headers=self.headers(), - auth=(self.USER, self.TOKEN), - verify=self.ssl - ) - if r.status_code == 200: - self.x_api_session = r.headers.get('X-API-SESSION') - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def close_passwords_session(self): - r = self.get("index.php/apps/passwords/api/1.0/session/close") - if r.status_code == 200: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def list_passwords(self): - r = self.get("index.php/apps/passwords/api/1.0/password/list") - if r.status_code == 200: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def list_passwords_folders(self): - r = self.get("index.php/apps/passwords/api/1.0/folder/list") - if r.status_code == 200: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def create_passwords_folder(self, name): - post_obj = { - 'label': name - } - - r = requests.post( - '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/folder/create'.format(HTTP=self.HTTP, HOST=self.HOST), - data=post_obj, - headers=self.headers(), - auth=(self.USER, self.TOKEN), - verify=self.ssl - ) - - if r.status_code == 201: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def get_passwords_folder(self, name): - for folder in self.list_passwords_folders(): - if folder.get('label') == name: - return folder.get('id') - return None - - def get_password(self, name): - r = self.list_passwords() - ret = [] - for item in r: - if item['cseType'] == 'CSEv1r1' and self.E2E_PASSWORD: - item_key = self.keychain['keys'].get(item['cseKey']) - if item_key: - if decrypt_item(item_key, item, 'label') == name: - if self.details: - item['password'] = decrypt_item(item_key, item, 'password') - item['url'] = decrypt_item(item_key, item, 'url') - item['username'] = decrypt_item(item_key, item, 'username') - item['label'] = decrypt_item(item_key, item, 'label') - item['notes'] = decrypt_item(item_key, item, 'notes') - item['customFields'] = decrypt_item(item_key, item, 'customFields') - ret.append(item) - else: - ret.append(decrypt_item(item_key, item, 'password')) - else: - if item['label'] == name: - if self.details: - ret.append(item) - else: - ret.append(item['password']) - return ret - - def fetch_generated_password(self): - r = self.get('index.php/apps/passwords/api/1.0/service/password') - if r.status_code == 200: - return [r.json().get('password')] - else: - self.exit.status_code_error(r.status_code) - - def create_password(self, post_obj): - r = requests.post( - '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/create'.format(HTTP=self.HTTP, HOST=self.HOST), - data=post_obj, - headers=self.headers(), - auth=(self.USER, self.TOKEN), - verify=self.ssl - ) - - if r.status_code == 201: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def delete_password(self, post_obj): - r = requests.delete( - '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/delete'.format(HTTP=self.HTTP, HOST=self.HOST), - data=post_obj, - headers=self.headers(), - auth=(self.USER, self.TOKEN), - verify=self.ssl - ) - - if r.status_code == 200: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def update_password(self, post_obj): - r = requests.patch( - '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/update'.format(HTTP=self.HTTP, HOST=self.HOST), - data=post_obj, - headers=self.headers(), - auth=(self.USER, self.TOKEN), - verify=self.ssl - ) - - if r.status_code == 200: - return r.json() - else: - self.exit.status_code_error(r.status_code) - - def user(self): - return self.USER diff --git a/pyproject.toml b/pyproject.toml index 6c659a0..737b280 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,4 +22,4 @@ dependencies = [ "click", "click_config_file", ] -requires-python = ">=3" \ No newline at end of file +requires-python = ">=3" diff --git a/requirements.txt b/requirements.txt index 66bf966..5751fe7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ click -click_config_file \ No newline at end of file +click_config_file +requests +pysodium diff --git a/setup.py b/setup.py index 85a0451..38ba080 100644 --- a/setup.py +++ b/setup.py @@ -16,8 +16,12 @@ setuptools.setup( author_email="ad@susurrando.com", url="https://repos.susurrando.com/adelgado/nc_password_client", description="Nextcloud Password client", + packages=setuptools.find_packages(), + zip_safe=False, + include_package_data=True, long_description="README.md", long_description_content_type="text/markdown", license="GPLv3", + platforms='any', # keywords=["my", "script", "does", "things"] )