diff --git a/nc_password_client/__init__.py b/nc_password_client/__init__.py
index e69de29..56fafa5 100644
--- a/nc_password_client/__init__.py
+++ b/nc_password_client/__init__.py
@@ -0,0 +1,2 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
diff --git a/nc_password_client/nc_password_client.py b/nc_password_client/nc_password_client.py
index 457a82a..e100e1f 100644
--- a/nc_password_client/nc_password_client.py
+++ b/nc_password_client/nc_password_client.py
@@ -3,17 +3,600 @@
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2024 Antonio J. Delgado
+# Based on https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py
+
"""Nextcloud Password client"""
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+import traceback
import json
import sys
import os
import logging
from logging.handlers import SysLogHandler
+from xml.dom import minidom
+import binascii
import click
import click_config_file
-from nextcloud import NextcloudHandler
+try:
+ import requests
+ HAS_REQUESTS_LIB = True
+except ImportError:
+ HAS_REQUESTS_LIB = False
+ IMPORT_ERROR = traceback.format_exc()
+try:
+ import pysodium
+ HAS_PYSODIUM_LIB = True
+except ImportError:
+ HAS_PYSODIUM_LIB = False
+ IMPORT_ERROR = traceback.format_exc()
+
+
+DIOS_MIO = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+class NextcloudErrorHandler:
+ '''Handle errors in Nextcloud'''
+ def __init__(self):
+ self.config = {}
+ self._init_log()
+
+ def _init_log(self):
+ ''' Initialize log object '''
+ self._log = logging.getLogger("nc_password_client")
+ self._log.setLevel(logging.DEBUG)
+
+ sysloghandler = SysLogHandler()
+ sysloghandler.setLevel(logging.DEBUG)
+ self._log.addHandler(sysloghandler)
+
+ streamhandler = logging.StreamHandler(sys.stdout)
+ streamhandler.setLevel(
+ logging.getLevelName(self.config.get("debug_level", 'INFO'))
+ )
+ self._log.addHandler(streamhandler)
+
+ if 'log_file' in self.config:
+ log_file = self.config['log_file']
+ else:
+ home_folder = os.environ.get(
+ 'HOME', os.environ.get('USERPROFILE', '')
+ )
+ log_folder = os.path.join(home_folder, "log")
+ log_file = os.path.join(log_folder, "nc_password_client.log")
+
+ if not os.path.exists(os.path.dirname(log_file)):
+ os.mkdir(os.path.dirname(log_file))
+
+ filehandler = logging.handlers.RotatingFileHandler(
+ log_file, maxBytes=102400000
+ )
+ # create formatter
+ formatter = logging.Formatter(
+ '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
+ )
+ filehandler.setFormatter(formatter)
+ filehandler.setLevel(logging.DEBUG)
+ self._log.addHandler(filehandler)
+ return True
+
+ def status_code_error(self, status):
+ '''Process status code error???'''
+ try:
+ self._log.error(
+ 'Nextcloud returned with status code %s',
+ status
+ )
+ except Exception:
+ self._log.error(
+ 'Nextcloud returned with status code %s',
+ status
+ )
+
+
+def parameter_spects(spec_arguments):
+ '''Specification of parameters???'''
+ argument_spec = dict(
+ host=dict(required=False, type='str'),
+ api_token=dict(required=False, type='str', no_log=True, aliases=['access_token']),
+ user=dict(required=False, type='str'),
+ ssl_mode=dict(required=False, type='str', default='https')
+ )
+
+ return {**argument_spec, **spec_arguments}
+
+
+def decrypt(encrypted, key):
+ '''Decrypt encrypted data'''
+ nonce = encrypted[0:pysodium.crypto_secretbox_NONCEBYTES]
+ ciphertext = encrypted[pysodium.crypto_secretbox_NONCEBYTES:]
+ return pysodium.crypto_secretbox_open(ciphertext, nonce, key)
+
+
+def decrypt_item(item_key, item, key='label'):
+ '''Decrypt single item'''
+ if len(item[key]) == 0:
+ return ""
+ vals = binascii.unhexlify(item[key])
+ return decrypt(vals, binascii.unhexlify(item_key)).decode()
+
+
+class NextcloudHandler:
+ '''Handle Nextcloud Password API'''
+ def __init__(self, params):
+ self._init_log(params)
+ self.exit = NextcloudErrorHandler()
+ self.http = 'https'
+ self.ssl = True
+ if params.get('ssl_mode') == 'http':
+ self.http = 'http'
+ elif params.get('ssl_mode') == 'skip':
+ self.ssl = False
+ elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'http':
+ self.http = 'http'
+ elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'skip':
+ self.ssl = False
+
+ self.details = params.get('details') or False
+ self.x_api_session = None
+
+ self.host = params.get('host') or os.environ.get('NEXTCLOUD_HOST')
+ if self.host is None:
+ self.exit.status_code_error('Unable to continue. No Nextcloud Host is given.')
+
+ self.user = params.get('user') or os.environ.get('NEXTCLOUD_USER')
+ if self.user is None:
+ self.exit.status_code_error('Unable to continue. No Nextcloud User is given.')
+
+ self.token = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN')
+ if self.token is None:
+ self.exit.status_code_error('Unable to continue. No Nextcloud Token is given.')
+
+ self.e2e_password = False
+ if params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD'):
+ # manage passwords client side encryption
+ self.e2e_password = True
+ password = params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD')
+ retval = self.request_passwords_session()
+ password_salt = binascii.unhexlify(retval['challenge']['salts'][0])
+ generic_hash_key = binascii.unhexlify(retval['challenge']['salts'][1])
+ password_hash_salt = binascii.unhexlify(retval['challenge']['salts'][2])
+
+ generic_hash = pysodium.crypto_generichash(
+ password.encode() + password_salt,
+ k=generic_hash_key,
+ outlen=pysodium.crypto_generichash_BYTES_MAX
+ )
+ password_hash = pysodium.crypto_pwhash(
+ pysodium.crypto_box_SEEDBYTES,
+ generic_hash,
+ password_hash_salt,
+ pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
+ pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
+ pysodium.crypto_pwhash_ALG_ARGON2ID13
+ )
+ # self.cse['keys']['CSEv1r1']
+ self.cse = self.open_passwords_session(password_hash.hex())
+ # decrypt CSEv1r1 chain
+ vals = binascii.unhexlify(self.cse['keys']['CSEv1r1'])
+ salt = vals[0:pysodium.crypto_pwhash_SALTBYTES]
+ text = vals[pysodium.crypto_pwhash_SALTBYTES:]
+ key = pysodium.crypto_pwhash(
+ pysodium.crypto_box_SEEDBYTES,
+ password,
+ salt,
+ pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
+ pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
+ pysodium.crypto_pwhash_ALG_ARGON2ID13
+ )
+ self.keychain = json.loads(decrypt(text, key))
+
+ def _init_log(self, params):
+ ''' Initialize log object '''
+ self._log = logging.getLogger("nc_password_client")
+ self._log.setLevel(logging.DEBUG)
+
+ sysloghandler = SysLogHandler()
+ sysloghandler.setLevel(logging.DEBUG)
+ self._log.addHandler(sysloghandler)
+
+ streamhandler = logging.StreamHandler(sys.stdout)
+ streamhandler.setLevel(
+ logging.getLevelName(params.get("debug_level", 'INFO'))
+ )
+ self._log.addHandler(streamhandler)
+
+ if 'log_file' in params:
+ log_file = params['log_file']
+ else:
+ home_folder = os.environ.get(
+ 'HOME', os.environ.get('USERPROFILE', '')
+ )
+ log_folder = os.path.join(home_folder, "log")
+ log_file = os.path.join(log_folder, "nc_password_client.log")
+
+ if not os.path.exists(os.path.dirname(log_file)):
+ os.mkdir(os.path.dirname(log_file))
+
+ filehandler = logging.handlers.RotatingFileHandler(
+ log_file, maxBytes=102400000
+ )
+ # create formatter
+ formatter = logging.Formatter(
+ '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
+ )
+ filehandler.setFormatter(formatter)
+ filehandler.setLevel(logging.DEBUG)
+ self._log.addHandler(filehandler)
+ return True
+
+ def headers(self):
+ '''Generate headers'''
+ headers = {
+ 'Accept': 'application/json',
+ 'OCS-APIRequest': 'true'
+ }
+ if self.x_api_session:
+ headers['X-API-SESSION'] = self.x_api_session
+ return headers
+
+ def get(self, path):
+ '''Do a GET request'''
+ r = requests.get(
+ f'{self.http}://{self.host}/{path}',
+ auth=(self.user, self.token), verify=self.ssl, headers=self.headers()
+ )
+
+ if r.status_code == 200:
+ return r
+ elif r.status_code == 404:
+ self.exit.status_code_error(f'File {path} does not exist')
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def propfind(self, path):
+ '''Do a PROPFIND request'''
+ s = requests.Session()
+ s.auth = (self.user, self.token)
+ r = s.request(
+ method='PROPFIND',
+ url=f'{self.http}://{self.host}/{path}',
+ headers={'Depth': '0'},
+ data=DIOS_MIO,
+ verify=self.ssl
+ )
+
+ if r.status_code == 207:
+ dom = minidom.parseString(r.text.encode('ascii', 'xmlcharrefreplace'))
+ try:
+ return {
+ 'last_modified': dom.getElementsByTagName(
+ 'd:getlastmodified'
+ )[0].firstChild.data,
+ 'content_type': dom.getElementsByTagName(
+ 'd:getcontenttype'
+ )[0].firstChild.data,
+ 'file_id': int(dom.getElementsByTagName(
+ 'oc:fileid'
+ )[0].firstChild.data),
+ 'size': int(dom.getElementsByTagName(
+ 'oc:size'
+ )[0].firstChild.data),
+ 'favorite': int(dom.getElementsByTagName(
+ 'oc:favorite'
+ )[0].firstChild.data),
+ 'owner': dom.getElementsByTagName(
+ 'oc:owner-display-name'
+ )[0].firstChild.data,
+ 'href': dom.getElementsByTagName(
+ 'd:href'
+ )[0].firstChild.data
+ }
+ except Exception:
+ # I guess it's folder, because it has no content_type
+ return {
+ 'last_modified': dom.getElementsByTagName(
+ 'd:getlastmodified'
+ )[0].firstChild.data,
+ 'content_type': 'inode/directory',
+ 'file_id': dom.getElementsByTagName(
+ 'oc:fileid'
+ )[0].firstChild.data,
+ 'size': dom.getElementsByTagName(
+ 'oc:size'
+ )[0].firstChild.data,
+ 'favorite': dom.getElementsByTagName(
+ 'oc:favorite'
+ )[0].firstChild.data,
+ 'owner': dom.getElementsByTagName(
+ 'oc:owner-display-name'
+ )[0].firstChild.data,
+ 'href': dom.getElementsByTagName(
+ 'd:href'
+ )[0].firstChild.data
+ }
+
+ elif r.status_code == 404:
+ return {}
+
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def put(self, path, src=None):
+ '''Do a PUT request'''
+ if src:
+ r = requests.put(
+ f'{self.http}://{self.host}/{path}',
+ data=open(src, 'rb'), auth=(self.user, self.token), verify=self.ssl
+ )
+ else:
+ r = requests.put(
+ f'{self.http}://{self.host}/{path}',
+ headers=self.headers, auth=(self.user, self.token), verify=self.ssl
+ )
+
+ if r.status_code in [200, 201, 204]:
+ return r, True
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def delete(self, path):
+ '''Do a DELETE request'''
+ r = requests.delete(
+ f'{self.http}://{self.host}/{path}',
+ auth=(self.user, self.token), verify=self.ssl
+ )
+
+ if r.status_code in [200, 204]:
+ return r, True
+ elif r.status_code == 404:
+ return r, False
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def talk(self, message, channel):
+ '''Post in Talk/Chat'''
+ body = {
+ 'message': message,
+ 'replyTo': 0
+ }
+
+ spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat"
+
+ r = requests.post(
+ f'{self.http}://{self.host}/{spreed_v1_path}/{channel}',
+ data=body,
+ headers=self.headers(),
+ auth=(self.user, self.token),
+ verify=self.ssl
+ )
+
+ if r.status_code == 201:
+ return r, True
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def request_passwords_session(self):
+ '''Request a Passwords API session'''
+ r = self.get("index.php/apps/passwords/api/1.0/session/request")
+ if r.status_code == 200:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def open_passwords_session(self, password_hash):
+ '''Open a Passwords API session'''
+ post_obj = {
+ 'challenge': password_hash
+ }
+
+ r = requests.post(
+ f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/session/open',
+ data=post_obj,
+ headers=self.headers(),
+ auth=(self.user, self.token),
+ verify=self.ssl
+ )
+ if r.status_code == 200:
+ self.x_api_session = r.headers.get('X-API-SESSION')
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def close_passwords_session(self):
+ '''Close Passwords API session'''
+ r = self.get("index.php/apps/passwords/api/1.0/session/close")
+ if r.status_code == 200:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def list_passwords(self):
+ '''List all passwords'''
+ r = self.get("index.php/apps/passwords/api/1.0/password/list")
+ if r.status_code == 200:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def list_passwords_folders(self):
+ '''List passwords folders'''
+ r = self.get("index.php/apps/passwords/api/1.0/folder/list")
+ if r.status_code == 200:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def delete_passwords_folder(self, name):
+ '''Delete a passwords folder'''
+ all_folders = self.list_passwords_folders()
+ folder_id = None
+ for folder in all_folders:
+ if folder['label'] == name:
+ folder_id = folder['id']
+ self._log.debug(
+ "Found folder with id '%s' to delete",
+ folder['id']
+ )
+ if folder_id:
+ post_obj = {
+ 'id': folder_id
+ }
+ r = requests.delete(
+ f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/delete',
+ data=post_obj,
+ headers=self.headers(),
+ auth=(self.user, self.token),
+ verify=self.ssl
+ )
+ if r.status_code == 201:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+ else:
+ self._log.error(
+ "Fodler '{name}' not found."
+ )
+
+ def exists_passwords_folder(self, name):
+ '''Test if a passwords folder exists'''
+ for folder in self.list_passwords_folders():
+ if folder.get('label') == name:
+ return True
+ return False
+
+ def create_passwords_folder(self, name):
+ '''Create passwords folder'''
+ if not self.exists_passwords_folder(name):
+ post_obj = {
+ 'label': name
+ }
+
+ r = requests.post(
+ f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/create',
+ data=post_obj,
+ headers=self.headers(),
+ auth=(self.user, self.token),
+ verify=self.ssl
+ )
+
+ if r.status_code == 201:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+ else:
+ self._log.warning(
+ "Password folder '%s' already exists",
+ name
+ )
+
+ def get_passwords_folder(self, name):
+ '''Get Passwords folder'''
+ for folder in self.list_passwords_folders():
+ if folder.get('label') == name:
+ return folder.get('id')
+ return None
+
+ def get_password(self, name):
+ '''Get a password'''
+ r = self.list_passwords()
+ ret = []
+ for item in r:
+ if item['cseType'] == 'CSEv1r1' and self.e2e_password:
+ item_key = self.keychain['keys'].get(item['cseKey'])
+ if item_key:
+ if decrypt_item(item_key, item, 'label') == name:
+ if self.details:
+ item['password'] = decrypt_item(item_key, item, 'password')
+ item['url'] = decrypt_item(item_key, item, 'url')
+ item['username'] = decrypt_item(item_key, item, 'username')
+ item['label'] = decrypt_item(item_key, item, 'label')
+ item['notes'] = decrypt_item(item_key, item, 'notes')
+ item['customFields'] = decrypt_item(item_key, item, 'customFields')
+ ret.append(item)
+ else:
+ ret.append(decrypt_item(item_key, item, 'password'))
+ else:
+ if item['label'] == name:
+ if self.details:
+ ret.append(item)
+ else:
+ ret.append(item['password'])
+ return ret
+
+ def fetch_generated_password(self):
+ '''Fetch a generated password'''
+ r = self.get('index.php/apps/passwords/api/1.0/service/password')
+ if r.status_code == 200:
+ return [r.json().get('password')]
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def create_password(self, post_obj):
+ '''Create/add a password'''
+ r = requests.post(
+ f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create',
+ data=post_obj,
+ headers=self.headers(),
+ auth=(self.user, self.token),
+ verify=self.ssl
+ )
+
+ if r.status_code == 201:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def delete_password(self, post_obj):
+ '''Delete a password'''
+ r = requests.delete(
+ f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete',
+ data=post_obj,
+ headers=self.headers(),
+ auth=(self.user, self.token),
+ verify=self.ssl
+ )
+
+ if r.status_code == 200:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def update_password(self, post_obj):
+ '''Update a password'''
+ r = requests.patch(
+ f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update',
+ data=post_obj,
+ headers=self.headers(),
+ auth=(self.user, self.token),
+ verify=self.ssl
+ )
+
+ if r.status_code == 200:
+ return r.json()
+ else:
+ self.exit.status_code_error(r.status_code)
+
+ def user(self):
+ '''Get user'''
+ return self.user
class NcPasswordClient:
'''Nextcloud Password Client'''
@@ -50,10 +633,22 @@ class NcPasswordClient:
'''List all passwords'''
print(json.dumps(self.nc.list_passwords(), indent=2))
+ def list_passwords_folders(self):
+ '''List all password folders'''
+ print(json.dumps(self.nc.list_passwords_folders(), indent=2))
+
def create_password(self, obj):
'''Create a password with an object'''
print(json.dumps(self.nc.create_password(obj), indent=2))
+ def create_passwords_folder(self, name):
+ '''Create a passwords folder'''
+ print(json.dumps(self.nc.create_passwords_folder(name), indent=2))
+
+ def delete_passwords_folder(self, name):
+ '''Delete a passwords folder'''
+ print(json.dumps(self.nc.delete_passwords_folder(name), indent=2))
+
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
@@ -132,7 +727,9 @@ class NcPasswordClient:
def cli(ctx, debug_level, log_file, host, user, api_token, cse_password):
'''Client function to pass context'''
ctx.ensure_object(dict)
- ctx.obj['NcPasswordClient'] = NcPasswordClient(debug_level, log_file, host, user, api_token, cse_password)
+ ctx.obj['NcPasswordClient'] = NcPasswordClient(
+ debug_level, log_file, host, user, api_token, cse_password
+ )
@cli.command()
@click.option('--name', '-n', required=True, help='Name of the password to show')
@@ -145,7 +742,7 @@ def show(ctx, name):
@cli.command()
@click_config_file.configuration_option()
@click.pass_context
-def list(ctx):
+def ls(ctx):
'''Show all password'''
ctx.obj['NcPasswordClient'].list_passwords()
@@ -154,8 +751,31 @@ def list(ctx):
@click_config_file.configuration_option()
@click.pass_context
def create_password(ctx, obj):
- '''Show all password'''
+ '''Create a password'''
ctx.obj['NcPasswordClient'].create_password(json.loads(obj))
+@cli.command()
+@click.option('--name', '-n', required=True, help='Name of the passwords folder to create')
+@click_config_file.configuration_option()
+@click.pass_context
+def create_passwords_folder(ctx, name):
+ '''Create a password folder'''
+ ctx.obj['NcPasswordClient'].create_passwords_folder(name)
+
+@cli.command()
+@click_config_file.configuration_option()
+@click.pass_context
+def list_passwords_folders(ctx):
+ '''List all password folders'''
+ ctx.obj['NcPasswordClient'].list_passwords_folders()
+
+@cli.command()
+@click.option('--name', '-n', required=True, help='Name of the passwords folder to delete')
+@click_config_file.configuration_option()
+@click.pass_context
+def delete_passwords_folder(ctx, name):
+ '''Delete a password folder'''
+ ctx.obj['NcPasswordClient'].delete_passwords_folder(name)
+
if __name__ == "__main__":
cli(obj={})
diff --git a/nc_password_client/nextcloud.py b/nc_password_client/nextcloud.py
deleted file mode 100644
index b7d40ac..0000000
--- a/nc_password_client/nextcloud.py
+++ /dev/null
@@ -1,498 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-# Based on https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py
-"""Nextcloud handler class"""
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-import os
-import sys
-import traceback
-import logging
-from logging.handlers import SysLogHandler
-from xml.dom import minidom
-import binascii
-import json
-
-try:
- import requests
- HAS_REQUESTS_LIB = True
-except ImportError:
- HAS_REQUESTS_LIB = False
- IMPORT_ERROR = traceback.format_exc()
-
-try:
- import pysodium
- HAS_PYSODIUM_LIB = True
-except ImportError:
- HAS_PYSODIUM_LIB = False
- IMPORT_ERROR = traceback.format_exc()
-
-
-dios_mio = """
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-"""
-
-
-class NextcloudErrorHandler:
- def __init__(self):
- self.config = {}
- self._init_log()
-
- def _init_log(self):
- ''' Initialize log object '''
- self._log = logging.getLogger("nc_password_client")
- self._log.setLevel(logging.DEBUG)
-
- sysloghandler = SysLogHandler()
- sysloghandler.setLevel(logging.DEBUG)
- self._log.addHandler(sysloghandler)
-
- streamhandler = logging.StreamHandler(sys.stdout)
- streamhandler.setLevel(
- logging.getLevelName(self.config.get("debug_level", 'INFO'))
- )
- self._log.addHandler(streamhandler)
-
- if 'log_file' in self.config:
- log_file = self.config['log_file']
- else:
- home_folder = os.environ.get(
- 'HOME', os.environ.get('USERPROFILE', '')
- )
- log_folder = os.path.join(home_folder, "log")
- log_file = os.path.join(log_folder, "nc_password_client.log")
-
- if not os.path.exists(os.path.dirname(log_file)):
- os.mkdir(os.path.dirname(log_file))
-
- filehandler = logging.handlers.RotatingFileHandler(
- log_file, maxBytes=102400000
- )
- # create formatter
- formatter = logging.Formatter(
- '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
- )
- filehandler.setFormatter(formatter)
- filehandler.setLevel(logging.DEBUG)
- self._log.addHandler(filehandler)
- return True
-
- def status_code_error(self, status):
- try:
- self._log.error('Nextcloud returned with status code {SC}'.format(SC=status))
- except Exception:
- self._log.error('Nextcloud returned with status code {SC}'.format(SC=status))
-
-
-def parameter_spects(spec_arguments):
- argument_spec = dict(
- host=dict(required=False, type='str'),
- api_token=dict(required=False, type='str', no_log=True, aliases=['access_token']),
- user=dict(required=False, type='str'),
- ssl_mode=dict(required=False, type='str', default='https')
- )
-
- return {**argument_spec, **spec_arguments}
-
-
-def decrypt(encrypted, key):
- nonce = encrypted[0:pysodium.crypto_secretbox_NONCEBYTES]
- ciphertext = encrypted[pysodium.crypto_secretbox_NONCEBYTES:]
- return pysodium.crypto_secretbox_open(ciphertext, nonce, key)
-
-
-def decrypt_item(item_key, item, key='label'):
- if len(item[key]) == 0:
- return ""
- vals = binascii.unhexlify(item[key])
- return decrypt(vals, binascii.unhexlify(item_key)).decode()
-
-
-class NextcloudHandler:
- def __init__(self, params):
- self._init_log(params)
- self.exit = NextcloudErrorHandler()
- self.HTTP = 'https'
- self.ssl = True
- if params.get('ssl_mode') == 'http':
- self.HTTP = 'http'
- elif params.get('ssl_mode') == 'skip':
- self.ssl = False
- elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'http':
- self.HTTP = 'http'
- elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'skip':
- self.ssl = False
-
- self.details = params.get('details') or False
- self.x_api_session = None
-
- self.HOST = params.get('host') or os.environ.get('NEXTCLOUD_HOST')
- if self.HOST is None:
- self.exit.status_code_error('Unable to continue. No Nextcloud Host is given.')
-
- self.USER = params.get('user') or os.environ.get('NEXTCLOUD_USER')
- if self.USER is None:
- self.exit.status_code_error('Unable to continue. No Nextcloud User is given.')
-
- self.TOKEN = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN')
- if self.TOKEN is None:
- self.exit.status_code_error('Unable to continue. No Nextcloud Token is given.')
-
- self.E2E_PASSWORD = False
- if params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD'):
- """
- manage passwords client side encryption
- """
- self.E2E_PASSWORD = True
- password = params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD')
- retval = self.request_passwords_session()
- passwordSalt = binascii.unhexlify(retval['challenge']['salts'][0])
- genericHashKey = binascii.unhexlify(retval['challenge']['salts'][1])
- passwordHashSalt = binascii.unhexlify(retval['challenge']['salts'][2])
-
- input = password.encode() + passwordSalt
- genericHash = pysodium.crypto_generichash(
- input,
- k=genericHashKey,
- outlen=pysodium.crypto_generichash_BYTES_MAX
- )
- passwordHash = pysodium.crypto_pwhash(
- pysodium.crypto_box_SEEDBYTES,
- genericHash,
- passwordHashSalt,
- pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
- pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
- pysodium.crypto_pwhash_ALG_ARGON2ID13
- )
- # self.cse['keys']['CSEv1r1']
- self.cse = self.open_passwords_session(passwordHash.hex())
- # decrypt CSEv1r1 chain
- vals = binascii.unhexlify(self.cse['keys']['CSEv1r1'])
- salt = vals[0:pysodium.crypto_pwhash_SALTBYTES]
- text = vals[pysodium.crypto_pwhash_SALTBYTES:]
- key = pysodium.crypto_pwhash(
- pysodium.crypto_box_SEEDBYTES,
- password,
- salt,
- pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
- pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
- pysodium.crypto_pwhash_ALG_ARGON2ID13
- )
- self.keychain = json.loads(decrypt(text, key))
-
- def _init_log(self, params):
- ''' Initialize log object '''
- self._log = logging.getLogger("nc_password_client")
- self._log.setLevel(logging.DEBUG)
-
- sysloghandler = SysLogHandler()
- sysloghandler.setLevel(logging.DEBUG)
- self._log.addHandler(sysloghandler)
-
- streamhandler = logging.StreamHandler(sys.stdout)
- streamhandler.setLevel(
- logging.getLevelName(params.get("debug_level", 'INFO'))
- )
- self._log.addHandler(streamhandler)
-
- if 'log_file' in params:
- log_file = params['log_file']
- else:
- home_folder = os.environ.get(
- 'HOME', os.environ.get('USERPROFILE', '')
- )
- log_folder = os.path.join(home_folder, "log")
- log_file = os.path.join(log_folder, "nc_password_client.log")
-
- if not os.path.exists(os.path.dirname(log_file)):
- os.mkdir(os.path.dirname(log_file))
-
- filehandler = logging.handlers.RotatingFileHandler(
- log_file, maxBytes=102400000
- )
- # create formatter
- formatter = logging.Formatter(
- '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
- )
- filehandler.setFormatter(formatter)
- filehandler.setLevel(logging.DEBUG)
- self._log.addHandler(filehandler)
- return True
-
- def headers(self):
- headers = {
- 'Accept': 'application/json',
- 'OCS-APIRequest': 'true'
- }
- if self.x_api_session:
- headers['X-API-SESSION'] = self.x_api_session
- return headers
-
- def get(self, path):
- r = requests.get(
- '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
- auth=(self.USER, self.TOKEN), verify=self.ssl, headers=self.headers()
- )
-
- if r.status_code == 200:
- return r
- elif r.status_code == 404:
- self.exit.status_code_error('File {FILE} does not exist'.format(FILE=path))
- else:
- self.exit.status_code_error(r.status_code)
-
- def propfind(self, path):
- s = requests.Session()
- s.auth = (self.USER, self.TOKEN)
- r = s.request(
- method='PROPFIND',
- url='{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
- headers={'Depth': '0'},
- data=dios_mio,
- verify=self.ssl
- )
-
- if r.status_code == 207:
- dom = minidom.parseString(r.text.encode('ascii', 'xmlcharrefreplace'))
- try:
- return {
- 'last_modified': dom.getElementsByTagName('d:getlastmodified')[0].firstChild.data,
- 'content_type': dom.getElementsByTagName('d:getcontenttype')[0].firstChild.data,
- 'file_id': int(dom.getElementsByTagName('oc:fileid')[0].firstChild.data),
- 'size': int(dom.getElementsByTagName('oc:size')[0].firstChild.data),
- 'favorite': int(dom.getElementsByTagName('oc:favorite')[0].firstChild.data),
- 'owner': dom.getElementsByTagName('oc:owner-display-name')[0].firstChild.data,
- 'href': dom.getElementsByTagName('d:href')[0].firstChild.data
- }
- except Exception:
- # I guess it's folder, because it has no content_type
- return {
- 'last_modified': dom.getElementsByTagName('d:getlastmodified')[0].firstChild.data,
- 'content_type': 'inode/directory',
- 'file_id': dom.getElementsByTagName('oc:fileid')[0].firstChild.data,
- 'size': dom.getElementsByTagName('oc:size')[0].firstChild.data,
- 'favorite': dom.getElementsByTagName('oc:favorite')[0].firstChild.data,
- 'owner': dom.getElementsByTagName('oc:owner-display-name')[0].firstChild.data,
- 'href': dom.getElementsByTagName('d:href')[0].firstChild.data
- }
-
- elif r.status_code == 404:
- return {}
-
- else:
- self.exit.status_code_error(r.status_code)
-
- def put(self, path, src=None):
-
- if src:
- r = requests.put(
- '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
- data=open(src, 'rb'), auth=(self.USER, self.TOKEN), verify=self.ssl
- )
- else:
- r = requests.put(
- '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
- headers=self.headers, auth=(self.USER, self.TOKEN), verify=self.ssl
- )
-
- if r.status_code in [200, 201, 204]:
- return r, True
- else:
- self.exit.status_code_error(r.status_code)
-
- def delete(self, path):
- r = requests.delete(
- '{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
- auth=(self.USER, self.TOKEN), verify=self.ssl
- )
-
- if r.status_code in [200, 204]:
- return r, True
- elif r.status_code == 404:
- return r, False
- else:
- self.exit.status_code_error(r.status_code)
-
- def talk(self, message, channel):
- body = {
- 'message': message,
- 'replyTo': 0
- }
-
- spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat"
-
- r = requests.post(
- '{HTTP}://{HOST}/{V1}/{CHANNEL}'.format(HTTP=self.HTTP, HOST=self.HOST, V1=spreed_v1_path, CHANNEL=channel),
- data=body,
- headers=self.headers(),
- auth=(self.USER, self.TOKEN),
- verify=self.ssl
- )
-
- if r.status_code == 201:
- return r, True
- else:
- self.exit.status_code_error(r.status_code)
-
- def request_passwords_session(self):
- r = self.get("index.php/apps/passwords/api/1.0/session/request")
- if r.status_code == 200:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def open_passwords_session(self, passwordHash):
- post_obj = {
- 'challenge': passwordHash
- }
-
- r = requests.post(
- '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/session/open'.format(HTTP=self.HTTP, HOST=self.HOST),
- data=post_obj,
- headers=self.headers(),
- auth=(self.USER, self.TOKEN),
- verify=self.ssl
- )
- if r.status_code == 200:
- self.x_api_session = r.headers.get('X-API-SESSION')
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def close_passwords_session(self):
- r = self.get("index.php/apps/passwords/api/1.0/session/close")
- if r.status_code == 200:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def list_passwords(self):
- r = self.get("index.php/apps/passwords/api/1.0/password/list")
- if r.status_code == 200:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def list_passwords_folders(self):
- r = self.get("index.php/apps/passwords/api/1.0/folder/list")
- if r.status_code == 200:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def create_passwords_folder(self, name):
- post_obj = {
- 'label': name
- }
-
- r = requests.post(
- '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/folder/create'.format(HTTP=self.HTTP, HOST=self.HOST),
- data=post_obj,
- headers=self.headers(),
- auth=(self.USER, self.TOKEN),
- verify=self.ssl
- )
-
- if r.status_code == 201:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def get_passwords_folder(self, name):
- for folder in self.list_passwords_folders():
- if folder.get('label') == name:
- return folder.get('id')
- return None
-
- def get_password(self, name):
- r = self.list_passwords()
- ret = []
- for item in r:
- if item['cseType'] == 'CSEv1r1' and self.E2E_PASSWORD:
- item_key = self.keychain['keys'].get(item['cseKey'])
- if item_key:
- if decrypt_item(item_key, item, 'label') == name:
- if self.details:
- item['password'] = decrypt_item(item_key, item, 'password')
- item['url'] = decrypt_item(item_key, item, 'url')
- item['username'] = decrypt_item(item_key, item, 'username')
- item['label'] = decrypt_item(item_key, item, 'label')
- item['notes'] = decrypt_item(item_key, item, 'notes')
- item['customFields'] = decrypt_item(item_key, item, 'customFields')
- ret.append(item)
- else:
- ret.append(decrypt_item(item_key, item, 'password'))
- else:
- if item['label'] == name:
- if self.details:
- ret.append(item)
- else:
- ret.append(item['password'])
- return ret
-
- def fetch_generated_password(self):
- r = self.get('index.php/apps/passwords/api/1.0/service/password')
- if r.status_code == 200:
- return [r.json().get('password')]
- else:
- self.exit.status_code_error(r.status_code)
-
- def create_password(self, post_obj):
- r = requests.post(
- '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/create'.format(HTTP=self.HTTP, HOST=self.HOST),
- data=post_obj,
- headers=self.headers(),
- auth=(self.USER, self.TOKEN),
- verify=self.ssl
- )
-
- if r.status_code == 201:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def delete_password(self, post_obj):
- r = requests.delete(
- '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/delete'.format(HTTP=self.HTTP, HOST=self.HOST),
- data=post_obj,
- headers=self.headers(),
- auth=(self.USER, self.TOKEN),
- verify=self.ssl
- )
-
- if r.status_code == 200:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def update_password(self, post_obj):
- r = requests.patch(
- '{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/update'.format(HTTP=self.HTTP, HOST=self.HOST),
- data=post_obj,
- headers=self.headers(),
- auth=(self.USER, self.TOKEN),
- verify=self.ssl
- )
-
- if r.status_code == 200:
- return r.json()
- else:
- self.exit.status_code_error(r.status_code)
-
- def user(self):
- return self.USER
diff --git a/pyproject.toml b/pyproject.toml
index 6c659a0..737b280 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -22,4 +22,4 @@ dependencies = [
"click",
"click_config_file",
]
-requires-python = ">=3"
\ No newline at end of file
+requires-python = ">=3"
diff --git a/requirements.txt b/requirements.txt
index 66bf966..5751fe7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,4 @@
click
-click_config_file
\ No newline at end of file
+click_config_file
+requests
+pysodium
diff --git a/setup.py b/setup.py
index 85a0451..38ba080 100644
--- a/setup.py
+++ b/setup.py
@@ -16,8 +16,12 @@ setuptools.setup(
author_email="ad@susurrando.com",
url="https://repos.susurrando.com/adelgado/nc_password_client",
description="Nextcloud Password client",
+ packages=setuptools.find_packages(),
+ zip_safe=False,
+ include_package_data=True,
long_description="README.md",
long_description_content_type="text/markdown",
license="GPLv3",
+ platforms='any',
# keywords=["my", "script", "does", "things"]
)