Incorporte class to single file and add more API endpoints

This commit is contained in:
Antonio J. Delgado 2024-11-10 18:38:23 +02:00
parent f8af1c4896
commit 679a4300b7
6 changed files with 634 additions and 504 deletions

View file

@ -0,0 +1,2 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

View file

@ -3,17 +3,600 @@
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2024 Antonio J. Delgado
# Based on https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py
"""Nextcloud Password client"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import traceback
import json
import sys
import os
import logging
from logging.handlers import SysLogHandler
from xml.dom import minidom
import binascii
import click
import click_config_file
from nextcloud import NextcloudHandler
try:
import requests
HAS_REQUESTS_LIB = True
except ImportError:
HAS_REQUESTS_LIB = False
IMPORT_ERROR = traceback.format_exc()
try:
import pysodium
HAS_PYSODIUM_LIB = True
except ImportError:
HAS_PYSODIUM_LIB = False
IMPORT_ERROR = traceback.format_exc()
DIOS_MIO = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<d:getlastmodified />
<d:getetag />
<d:getcontenttype />
<d:resourcetype />
<oc:fileid />
<oc:permissions />
<oc:size />
<oc:checksums />
<oc:favorite />
<oc:owner-display-name />
<oc:share-types />
</d:prop>
</d:propfind>
"""
class NextcloudErrorHandler:
'''Handle errors in Nextcloud'''
def __init__(self):
self.config = {}
self._init_log()
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "nc_password_client.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
def status_code_error(self, status):
'''Process status code error???'''
try:
self._log.error(
'Nextcloud returned with status code %s',
status
)
except Exception:
self._log.error(
'Nextcloud returned with status code %s',
status
)
def parameter_spects(spec_arguments):
'''Specification of parameters???'''
argument_spec = dict(
host=dict(required=False, type='str'),
api_token=dict(required=False, type='str', no_log=True, aliases=['access_token']),
user=dict(required=False, type='str'),
ssl_mode=dict(required=False, type='str', default='https')
)
return {**argument_spec, **spec_arguments}
def decrypt(encrypted, key):
'''Decrypt encrypted data'''
nonce = encrypted[0:pysodium.crypto_secretbox_NONCEBYTES]
ciphertext = encrypted[pysodium.crypto_secretbox_NONCEBYTES:]
return pysodium.crypto_secretbox_open(ciphertext, nonce, key)
def decrypt_item(item_key, item, key='label'):
'''Decrypt single item'''
if len(item[key]) == 0:
return ""
vals = binascii.unhexlify(item[key])
return decrypt(vals, binascii.unhexlify(item_key)).decode()
class NextcloudHandler:
'''Handle Nextcloud Password API'''
def __init__(self, params):
self._init_log(params)
self.exit = NextcloudErrorHandler()
self.http = 'https'
self.ssl = True
if params.get('ssl_mode') == 'http':
self.http = 'http'
elif params.get('ssl_mode') == 'skip':
self.ssl = False
elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'http':
self.http = 'http'
elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'skip':
self.ssl = False
self.details = params.get('details') or False
self.x_api_session = None
self.host = params.get('host') or os.environ.get('NEXTCLOUD_HOST')
if self.host is None:
self.exit.status_code_error('Unable to continue. No Nextcloud Host is given.')
self.user = params.get('user') or os.environ.get('NEXTCLOUD_USER')
if self.user is None:
self.exit.status_code_error('Unable to continue. No Nextcloud User is given.')
self.token = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN')
if self.token is None:
self.exit.status_code_error('Unable to continue. No Nextcloud Token is given.')
self.e2e_password = False
if params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD'):
# manage passwords client side encryption
self.e2e_password = True
password = params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD')
retval = self.request_passwords_session()
password_salt = binascii.unhexlify(retval['challenge']['salts'][0])
generic_hash_key = binascii.unhexlify(retval['challenge']['salts'][1])
password_hash_salt = binascii.unhexlify(retval['challenge']['salts'][2])
generic_hash = pysodium.crypto_generichash(
password.encode() + password_salt,
k=generic_hash_key,
outlen=pysodium.crypto_generichash_BYTES_MAX
)
password_hash = pysodium.crypto_pwhash(
pysodium.crypto_box_SEEDBYTES,
generic_hash,
password_hash_salt,
pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_ALG_ARGON2ID13
)
# self.cse['keys']['CSEv1r1']
self.cse = self.open_passwords_session(password_hash.hex())
# decrypt CSEv1r1 chain
vals = binascii.unhexlify(self.cse['keys']['CSEv1r1'])
salt = vals[0:pysodium.crypto_pwhash_SALTBYTES]
text = vals[pysodium.crypto_pwhash_SALTBYTES:]
key = pysodium.crypto_pwhash(
pysodium.crypto_box_SEEDBYTES,
password,
salt,
pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_ALG_ARGON2ID13
)
self.keychain = json.loads(decrypt(text, key))
def _init_log(self, params):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(params.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in params:
log_file = params['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "nc_password_client.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
def headers(self):
'''Generate headers'''
headers = {
'Accept': 'application/json',
'OCS-APIRequest': 'true'
}
if self.x_api_session:
headers['X-API-SESSION'] = self.x_api_session
return headers
def get(self, path):
'''Do a GET request'''
r = requests.get(
f'{self.http}://{self.host}/{path}',
auth=(self.user, self.token), verify=self.ssl, headers=self.headers()
)
if r.status_code == 200:
return r
elif r.status_code == 404:
self.exit.status_code_error(f'File {path} does not exist')
else:
self.exit.status_code_error(r.status_code)
def propfind(self, path):
'''Do a PROPFIND request'''
s = requests.Session()
s.auth = (self.user, self.token)
r = s.request(
method='PROPFIND',
url=f'{self.http}://{self.host}/{path}',
headers={'Depth': '0'},
data=DIOS_MIO,
verify=self.ssl
)
if r.status_code == 207:
dom = minidom.parseString(r.text.encode('ascii', 'xmlcharrefreplace'))
try:
return {
'last_modified': dom.getElementsByTagName(
'd:getlastmodified'
)[0].firstChild.data,
'content_type': dom.getElementsByTagName(
'd:getcontenttype'
)[0].firstChild.data,
'file_id': int(dom.getElementsByTagName(
'oc:fileid'
)[0].firstChild.data),
'size': int(dom.getElementsByTagName(
'oc:size'
)[0].firstChild.data),
'favorite': int(dom.getElementsByTagName(
'oc:favorite'
)[0].firstChild.data),
'owner': dom.getElementsByTagName(
'oc:owner-display-name'
)[0].firstChild.data,
'href': dom.getElementsByTagName(
'd:href'
)[0].firstChild.data
}
except Exception:
# I guess it's folder, because it has no content_type
return {
'last_modified': dom.getElementsByTagName(
'd:getlastmodified'
)[0].firstChild.data,
'content_type': 'inode/directory',
'file_id': dom.getElementsByTagName(
'oc:fileid'
)[0].firstChild.data,
'size': dom.getElementsByTagName(
'oc:size'
)[0].firstChild.data,
'favorite': dom.getElementsByTagName(
'oc:favorite'
)[0].firstChild.data,
'owner': dom.getElementsByTagName(
'oc:owner-display-name'
)[0].firstChild.data,
'href': dom.getElementsByTagName(
'd:href'
)[0].firstChild.data
}
elif r.status_code == 404:
return {}
else:
self.exit.status_code_error(r.status_code)
def put(self, path, src=None):
'''Do a PUT request'''
if src:
r = requests.put(
f'{self.http}://{self.host}/{path}',
data=open(src, 'rb'), auth=(self.user, self.token), verify=self.ssl
)
else:
r = requests.put(
f'{self.http}://{self.host}/{path}',
headers=self.headers, auth=(self.user, self.token), verify=self.ssl
)
if r.status_code in [200, 201, 204]:
return r, True
else:
self.exit.status_code_error(r.status_code)
def delete(self, path):
'''Do a DELETE request'''
r = requests.delete(
f'{self.http}://{self.host}/{path}',
auth=(self.user, self.token), verify=self.ssl
)
if r.status_code in [200, 204]:
return r, True
elif r.status_code == 404:
return r, False
else:
self.exit.status_code_error(r.status_code)
def talk(self, message, channel):
'''Post in Talk/Chat'''
body = {
'message': message,
'replyTo': 0
}
spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat"
r = requests.post(
f'{self.http}://{self.host}/{spreed_v1_path}/{channel}',
data=body,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl
)
if r.status_code == 201:
return r, True
else:
self.exit.status_code_error(r.status_code)
def request_passwords_session(self):
'''Request a Passwords API session'''
r = self.get("index.php/apps/passwords/api/1.0/session/request")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def open_passwords_session(self, password_hash):
'''Open a Passwords API session'''
post_obj = {
'challenge': password_hash
}
r = requests.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/session/open',
data=post_obj,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl
)
if r.status_code == 200:
self.x_api_session = r.headers.get('X-API-SESSION')
return r.json()
else:
self.exit.status_code_error(r.status_code)
def close_passwords_session(self):
'''Close Passwords API session'''
r = self.get("index.php/apps/passwords/api/1.0/session/close")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def list_passwords(self):
'''List all passwords'''
r = self.get("index.php/apps/passwords/api/1.0/password/list")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def list_passwords_folders(self):
'''List passwords folders'''
r = self.get("index.php/apps/passwords/api/1.0/folder/list")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def delete_passwords_folder(self, name):
'''Delete a passwords folder'''
all_folders = self.list_passwords_folders()
folder_id = None
for folder in all_folders:
if folder['label'] == name:
folder_id = folder['id']
self._log.debug(
"Found folder with id '%s' to delete",
folder['id']
)
if folder_id:
post_obj = {
'id': folder_id
}
r = requests.delete(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/delete',
data=post_obj,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl
)
if r.status_code == 201:
return r.json()
else:
self.exit.status_code_error(r.status_code)
else:
self._log.error(
"Fodler '{name}' not found."
)
def exists_passwords_folder(self, name):
'''Test if a passwords folder exists'''
for folder in self.list_passwords_folders():
if folder.get('label') == name:
return True
return False
def create_passwords_folder(self, name):
'''Create passwords folder'''
if not self.exists_passwords_folder(name):
post_obj = {
'label': name
}
r = requests.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/create',
data=post_obj,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl
)
if r.status_code == 201:
return r.json()
else:
self.exit.status_code_error(r.status_code)
else:
self._log.warning(
"Password folder '%s' already exists",
name
)
def get_passwords_folder(self, name):
'''Get Passwords folder'''
for folder in self.list_passwords_folders():
if folder.get('label') == name:
return folder.get('id')
return None
def get_password(self, name):
'''Get a password'''
r = self.list_passwords()
ret = []
for item in r:
if item['cseType'] == 'CSEv1r1' and self.e2e_password:
item_key = self.keychain['keys'].get(item['cseKey'])
if item_key:
if decrypt_item(item_key, item, 'label') == name:
if self.details:
item['password'] = decrypt_item(item_key, item, 'password')
item['url'] = decrypt_item(item_key, item, 'url')
item['username'] = decrypt_item(item_key, item, 'username')
item['label'] = decrypt_item(item_key, item, 'label')
item['notes'] = decrypt_item(item_key, item, 'notes')
item['customFields'] = decrypt_item(item_key, item, 'customFields')
ret.append(item)
else:
ret.append(decrypt_item(item_key, item, 'password'))
else:
if item['label'] == name:
if self.details:
ret.append(item)
else:
ret.append(item['password'])
return ret
def fetch_generated_password(self):
'''Fetch a generated password'''
r = self.get('index.php/apps/passwords/api/1.0/service/password')
if r.status_code == 200:
return [r.json().get('password')]
else:
self.exit.status_code_error(r.status_code)
def create_password(self, post_obj):
'''Create/add a password'''
r = requests.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create',
data=post_obj,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl
)
if r.status_code == 201:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def delete_password(self, post_obj):
'''Delete a password'''
r = requests.delete(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete',
data=post_obj,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl
)
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def update_password(self, post_obj):
'''Update a password'''
r = requests.patch(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update',
data=post_obj,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl
)
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def user(self):
'''Get user'''
return self.user
class NcPasswordClient:
'''Nextcloud Password Client'''
@ -50,10 +633,22 @@ class NcPasswordClient:
'''List all passwords'''
print(json.dumps(self.nc.list_passwords(), indent=2))
def list_passwords_folders(self):
'''List all password folders'''
print(json.dumps(self.nc.list_passwords_folders(), indent=2))
def create_password(self, obj):
'''Create a password with an object'''
print(json.dumps(self.nc.create_password(obj), indent=2))
def create_passwords_folder(self, name):
'''Create a passwords folder'''
print(json.dumps(self.nc.create_passwords_folder(name), indent=2))
def delete_passwords_folder(self, name):
'''Delete a passwords folder'''
print(json.dumps(self.nc.delete_passwords_folder(name), indent=2))
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
@ -132,7 +727,9 @@ class NcPasswordClient:
def cli(ctx, debug_level, log_file, host, user, api_token, cse_password):
'''Client function to pass context'''
ctx.ensure_object(dict)
ctx.obj['NcPasswordClient'] = NcPasswordClient(debug_level, log_file, host, user, api_token, cse_password)
ctx.obj['NcPasswordClient'] = NcPasswordClient(
debug_level, log_file, host, user, api_token, cse_password
)
@cli.command()
@click.option('--name', '-n', required=True, help='Name of the password to show')
@ -145,7 +742,7 @@ def show(ctx, name):
@cli.command()
@click_config_file.configuration_option()
@click.pass_context
def list(ctx):
def ls(ctx):
'''Show all password'''
ctx.obj['NcPasswordClient'].list_passwords()
@ -154,8 +751,31 @@ def list(ctx):
@click_config_file.configuration_option()
@click.pass_context
def create_password(ctx, obj):
'''Show all password'''
'''Create a password'''
ctx.obj['NcPasswordClient'].create_password(json.loads(obj))
@cli.command()
@click.option('--name', '-n', required=True, help='Name of the passwords folder to create')
@click_config_file.configuration_option()
@click.pass_context
def create_passwords_folder(ctx, name):
'''Create a password folder'''
ctx.obj['NcPasswordClient'].create_passwords_folder(name)
@cli.command()
@click_config_file.configuration_option()
@click.pass_context
def list_passwords_folders(ctx):
'''List all password folders'''
ctx.obj['NcPasswordClient'].list_passwords_folders()
@cli.command()
@click.option('--name', '-n', required=True, help='Name of the passwords folder to delete')
@click_config_file.configuration_option()
@click.pass_context
def delete_passwords_folder(ctx, name):
'''Delete a password folder'''
ctx.obj['NcPasswordClient'].delete_passwords_folder(name)
if __name__ == "__main__":
cli(obj={})

View file

@ -1,498 +0,0 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Based on https://github.com/markuman/markuman.nextcloud/blob/latest/plugins/modules/password.py
"""Nextcloud handler class"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import os
import sys
import traceback
import logging
from logging.handlers import SysLogHandler
from xml.dom import minidom
import binascii
import json
try:
import requests
HAS_REQUESTS_LIB = True
except ImportError:
HAS_REQUESTS_LIB = False
IMPORT_ERROR = traceback.format_exc()
try:
import pysodium
HAS_PYSODIUM_LIB = True
except ImportError:
HAS_PYSODIUM_LIB = False
IMPORT_ERROR = traceback.format_exc()
dios_mio = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<d:getlastmodified />
<d:getetag />
<d:getcontenttype />
<d:resourcetype />
<oc:fileid />
<oc:permissions />
<oc:size />
<oc:checksums />
<oc:favorite />
<oc:owner-display-name />
<oc:share-types />
</d:prop>
</d:propfind>
"""
class NextcloudErrorHandler:
def __init__(self):
self.config = {}
self._init_log()
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "nc_password_client.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
def status_code_error(self, status):
try:
self._log.error('Nextcloud returned with status code {SC}'.format(SC=status))
except Exception:
self._log.error('Nextcloud returned with status code {SC}'.format(SC=status))
def parameter_spects(spec_arguments):
argument_spec = dict(
host=dict(required=False, type='str'),
api_token=dict(required=False, type='str', no_log=True, aliases=['access_token']),
user=dict(required=False, type='str'),
ssl_mode=dict(required=False, type='str', default='https')
)
return {**argument_spec, **spec_arguments}
def decrypt(encrypted, key):
nonce = encrypted[0:pysodium.crypto_secretbox_NONCEBYTES]
ciphertext = encrypted[pysodium.crypto_secretbox_NONCEBYTES:]
return pysodium.crypto_secretbox_open(ciphertext, nonce, key)
def decrypt_item(item_key, item, key='label'):
if len(item[key]) == 0:
return ""
vals = binascii.unhexlify(item[key])
return decrypt(vals, binascii.unhexlify(item_key)).decode()
class NextcloudHandler:
def __init__(self, params):
self._init_log(params)
self.exit = NextcloudErrorHandler()
self.HTTP = 'https'
self.ssl = True
if params.get('ssl_mode') == 'http':
self.HTTP = 'http'
elif params.get('ssl_mode') == 'skip':
self.ssl = False
elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'http':
self.HTTP = 'http'
elif os.environ.get('NEXTCLOUD_SSL_MODE') == 'skip':
self.ssl = False
self.details = params.get('details') or False
self.x_api_session = None
self.HOST = params.get('host') or os.environ.get('NEXTCLOUD_HOST')
if self.HOST is None:
self.exit.status_code_error('Unable to continue. No Nextcloud Host is given.')
self.USER = params.get('user') or os.environ.get('NEXTCLOUD_USER')
if self.USER is None:
self.exit.status_code_error('Unable to continue. No Nextcloud User is given.')
self.TOKEN = params.get('api_token') or os.environ.get('NEXTCLOUD_TOKEN')
if self.TOKEN is None:
self.exit.status_code_error('Unable to continue. No Nextcloud Token is given.')
self.E2E_PASSWORD = False
if params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD'):
"""
manage passwords client side encryption
"""
self.E2E_PASSWORD = True
password = params.get('cse_password') or os.environ.get('NEXTCLOUD_CSE_PASSWORD')
retval = self.request_passwords_session()
passwordSalt = binascii.unhexlify(retval['challenge']['salts'][0])
genericHashKey = binascii.unhexlify(retval['challenge']['salts'][1])
passwordHashSalt = binascii.unhexlify(retval['challenge']['salts'][2])
input = password.encode() + passwordSalt
genericHash = pysodium.crypto_generichash(
input,
k=genericHashKey,
outlen=pysodium.crypto_generichash_BYTES_MAX
)
passwordHash = pysodium.crypto_pwhash(
pysodium.crypto_box_SEEDBYTES,
genericHash,
passwordHashSalt,
pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_ALG_ARGON2ID13
)
# self.cse['keys']['CSEv1r1']
self.cse = self.open_passwords_session(passwordHash.hex())
# decrypt CSEv1r1 chain
vals = binascii.unhexlify(self.cse['keys']['CSEv1r1'])
salt = vals[0:pysodium.crypto_pwhash_SALTBYTES]
text = vals[pysodium.crypto_pwhash_SALTBYTES:]
key = pysodium.crypto_pwhash(
pysodium.crypto_box_SEEDBYTES,
password,
salt,
pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE,
pysodium.crypto_pwhash_ALG_ARGON2ID13
)
self.keychain = json.loads(decrypt(text, key))
def _init_log(self, params):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(params.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in params:
log_file = params['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "nc_password_client.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
def headers(self):
headers = {
'Accept': 'application/json',
'OCS-APIRequest': 'true'
}
if self.x_api_session:
headers['X-API-SESSION'] = self.x_api_session
return headers
def get(self, path):
r = requests.get(
'{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
auth=(self.USER, self.TOKEN), verify=self.ssl, headers=self.headers()
)
if r.status_code == 200:
return r
elif r.status_code == 404:
self.exit.status_code_error('File {FILE} does not exist'.format(FILE=path))
else:
self.exit.status_code_error(r.status_code)
def propfind(self, path):
s = requests.Session()
s.auth = (self.USER, self.TOKEN)
r = s.request(
method='PROPFIND',
url='{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
headers={'Depth': '0'},
data=dios_mio,
verify=self.ssl
)
if r.status_code == 207:
dom = minidom.parseString(r.text.encode('ascii', 'xmlcharrefreplace'))
try:
return {
'last_modified': dom.getElementsByTagName('d:getlastmodified')[0].firstChild.data,
'content_type': dom.getElementsByTagName('d:getcontenttype')[0].firstChild.data,
'file_id': int(dom.getElementsByTagName('oc:fileid')[0].firstChild.data),
'size': int(dom.getElementsByTagName('oc:size')[0].firstChild.data),
'favorite': int(dom.getElementsByTagName('oc:favorite')[0].firstChild.data),
'owner': dom.getElementsByTagName('oc:owner-display-name')[0].firstChild.data,
'href': dom.getElementsByTagName('d:href')[0].firstChild.data
}
except Exception:
# I guess it's folder, because it has no content_type
return {
'last_modified': dom.getElementsByTagName('d:getlastmodified')[0].firstChild.data,
'content_type': 'inode/directory',
'file_id': dom.getElementsByTagName('oc:fileid')[0].firstChild.data,
'size': dom.getElementsByTagName('oc:size')[0].firstChild.data,
'favorite': dom.getElementsByTagName('oc:favorite')[0].firstChild.data,
'owner': dom.getElementsByTagName('oc:owner-display-name')[0].firstChild.data,
'href': dom.getElementsByTagName('d:href')[0].firstChild.data
}
elif r.status_code == 404:
return {}
else:
self.exit.status_code_error(r.status_code)
def put(self, path, src=None):
if src:
r = requests.put(
'{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
data=open(src, 'rb'), auth=(self.USER, self.TOKEN), verify=self.ssl
)
else:
r = requests.put(
'{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
headers=self.headers, auth=(self.USER, self.TOKEN), verify=self.ssl
)
if r.status_code in [200, 201, 204]:
return r, True
else:
self.exit.status_code_error(r.status_code)
def delete(self, path):
r = requests.delete(
'{HTTP}://{HOST}/{PATH}'.format(HTTP=self.HTTP, HOST=self.HOST, PATH=path),
auth=(self.USER, self.TOKEN), verify=self.ssl
)
if r.status_code in [200, 204]:
return r, True
elif r.status_code == 404:
return r, False
else:
self.exit.status_code_error(r.status_code)
def talk(self, message, channel):
body = {
'message': message,
'replyTo': 0
}
spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat"
r = requests.post(
'{HTTP}://{HOST}/{V1}/{CHANNEL}'.format(HTTP=self.HTTP, HOST=self.HOST, V1=spreed_v1_path, CHANNEL=channel),
data=body,
headers=self.headers(),
auth=(self.USER, self.TOKEN),
verify=self.ssl
)
if r.status_code == 201:
return r, True
else:
self.exit.status_code_error(r.status_code)
def request_passwords_session(self):
r = self.get("index.php/apps/passwords/api/1.0/session/request")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def open_passwords_session(self, passwordHash):
post_obj = {
'challenge': passwordHash
}
r = requests.post(
'{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/session/open'.format(HTTP=self.HTTP, HOST=self.HOST),
data=post_obj,
headers=self.headers(),
auth=(self.USER, self.TOKEN),
verify=self.ssl
)
if r.status_code == 200:
self.x_api_session = r.headers.get('X-API-SESSION')
return r.json()
else:
self.exit.status_code_error(r.status_code)
def close_passwords_session(self):
r = self.get("index.php/apps/passwords/api/1.0/session/close")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def list_passwords(self):
r = self.get("index.php/apps/passwords/api/1.0/password/list")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def list_passwords_folders(self):
r = self.get("index.php/apps/passwords/api/1.0/folder/list")
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def create_passwords_folder(self, name):
post_obj = {
'label': name
}
r = requests.post(
'{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/folder/create'.format(HTTP=self.HTTP, HOST=self.HOST),
data=post_obj,
headers=self.headers(),
auth=(self.USER, self.TOKEN),
verify=self.ssl
)
if r.status_code == 201:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def get_passwords_folder(self, name):
for folder in self.list_passwords_folders():
if folder.get('label') == name:
return folder.get('id')
return None
def get_password(self, name):
r = self.list_passwords()
ret = []
for item in r:
if item['cseType'] == 'CSEv1r1' and self.E2E_PASSWORD:
item_key = self.keychain['keys'].get(item['cseKey'])
if item_key:
if decrypt_item(item_key, item, 'label') == name:
if self.details:
item['password'] = decrypt_item(item_key, item, 'password')
item['url'] = decrypt_item(item_key, item, 'url')
item['username'] = decrypt_item(item_key, item, 'username')
item['label'] = decrypt_item(item_key, item, 'label')
item['notes'] = decrypt_item(item_key, item, 'notes')
item['customFields'] = decrypt_item(item_key, item, 'customFields')
ret.append(item)
else:
ret.append(decrypt_item(item_key, item, 'password'))
else:
if item['label'] == name:
if self.details:
ret.append(item)
else:
ret.append(item['password'])
return ret
def fetch_generated_password(self):
r = self.get('index.php/apps/passwords/api/1.0/service/password')
if r.status_code == 200:
return [r.json().get('password')]
else:
self.exit.status_code_error(r.status_code)
def create_password(self, post_obj):
r = requests.post(
'{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/create'.format(HTTP=self.HTTP, HOST=self.HOST),
data=post_obj,
headers=self.headers(),
auth=(self.USER, self.TOKEN),
verify=self.ssl
)
if r.status_code == 201:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def delete_password(self, post_obj):
r = requests.delete(
'{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/delete'.format(HTTP=self.HTTP, HOST=self.HOST),
data=post_obj,
headers=self.headers(),
auth=(self.USER, self.TOKEN),
verify=self.ssl
)
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def update_password(self, post_obj):
r = requests.patch(
'{HTTP}://{HOST}/index.php/apps/passwords/api/1.0/password/update'.format(HTTP=self.HTTP, HOST=self.HOST),
data=post_obj,
headers=self.headers(),
auth=(self.USER, self.TOKEN),
verify=self.ssl
)
if r.status_code == 200:
return r.json()
else:
self.exit.status_code_error(r.status_code)
def user(self):
return self.USER

View file

@ -22,4 +22,4 @@ dependencies = [
"click",
"click_config_file",
]
requires-python = ">=3"
requires-python = ">=3"

View file

@ -1,2 +1,4 @@
click
click_config_file
click_config_file
requests
pysodium

View file

@ -16,8 +16,12 @@ setuptools.setup(
author_email="ad@susurrando.com",
url="https://repos.susurrando.com/adelgado/nc_password_client",
description="Nextcloud Password client",
packages=setuptools.find_packages(),
zip_safe=False,
include_package_data=True,
long_description="README.md",
long_description_content_type="text/markdown",
license="GPLv3",
platforms='any',
# keywords=["my", "script", "does", "things"]
)