From a9f44fd140667c07193fb654bd3febda8412d2a2 Mon Sep 17 00:00:00 2001 From: "Antonio J. Delgado" Date: Mon, 11 Nov 2024 22:52:43 +0200 Subject: [PATCH] Add cache, command to delete all passwords, re-use sessions, add proxy --- nc_password_client/nc_password_client.py | 140 ++++++++++++++++++----- 1 file changed, 112 insertions(+), 28 deletions(-) diff --git a/nc_password_client/nc_password_client.py b/nc_password_client/nc_password_client.py index e41f9ba..7bfcb4d 100755 --- a/nc_password_client/nc_password_client.py +++ b/nc_password_client/nc_password_client.py @@ -13,6 +13,7 @@ import traceback import json import sys import os +import time import logging from logging.handlers import SysLogHandler from xml.dom import minidom @@ -87,6 +88,15 @@ class NextcloudHandler: self.http = 'https' self.timeout = params.get('timeout', 3) self.ssl = True + self.cached_passwords = [] + self.last_cache = -1 + self.session = requests.Session() + proxies = { + 'http': params.get('https_proxy', ''), + 'https': params.get('https_proxy', ''), + } + self.session.proxies.update(proxies) + self.cache_duration = params.get('cache_duration', 300) if params.get('ssl_mode') == 'http': self.http = 'http' elif params.get('ssl_mode') == 'skip': @@ -187,9 +197,16 @@ class NextcloudHandler: def error(self, obj): '''Show error information''' - self._log.error( - json.dumps(obj, indent=2) - ) + try: + self._log.error( + json.dumps(obj, indent=2) + ) + except TypeError as error: + self._log.error( + 'Additional error showing the error from %s. %s', + obj, + error + ) def _init_log(self, params): ''' Initialize log object ''' @@ -242,9 +259,9 @@ class NextcloudHandler: def get(self, path): '''Do a GET request''' - self.debug({ "action": "get", "message": "Requesting {path}" }) + self.debug({ "action": "get", "message": f"Requesting {path}" }) try: - r = requests.get( + r = self.session.get( f'{self.http}://{self.host}/{path}', auth=(self.user, self.token), verify=self.ssl, headers=self.headers(), timeout=self.timeout @@ -277,15 +294,15 @@ class NextcloudHandler: { "action": "get", "message": f"Timeout ({self.timeout} sec) error doing GET request. %s", - "error": error + "error": f"{error}" } ) - sys.exit(5) + # sys.exit(5) return None def propfind(self, path): '''Do a PROPFIND request''' - s = requests.Session() + s = self.session s.auth = (self.user, self.token) try: r = s.request( @@ -369,7 +386,7 @@ class NextcloudHandler: '''Do a PUT request''' try: if src: - r = requests.put( + r = self.session.put( f'{self.http}://{self.host}/{path}', data=open(src, 'rb'), auth=(self.user, self.token), @@ -377,7 +394,7 @@ class NextcloudHandler: timeout=self.timeout ) else: - r = requests.put( + r = self.session.put( f'{self.http}://{self.host}/{path}', headers=self.headers, auth=(self.user, self.token), @@ -405,7 +422,7 @@ class NextcloudHandler: def delete(self, path): '''Do a DELETE request''' try: - r = requests.delete( + r = self.session.delete( f'{self.http}://{self.host}/{path}', auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout ) @@ -440,7 +457,7 @@ class NextcloudHandler: spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat" try: - r = requests.post( + r = self.session.post( f'{self.http}://{self.host}/{spreed_v1_path}/{channel}', data=body, headers=self.headers(), @@ -478,7 +495,7 @@ class NextcloudHandler: 'challenge': password_hash } - r = requests.post( + r = self.session.post( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/session/open', data=post_obj, headers=self.headers(), @@ -503,7 +520,18 @@ class NextcloudHandler: def list_passwords(self): '''List all passwords''' - return self.get("index.php/apps/passwords/api/1.0/password/list") + if self.last_cache < (time.time() - self.cache_duration): + self.debug( + { "action": "list_passwords", "message": "Updating cached list of password" } + ) + self.cached_passwords = self.get("index.php/apps/passwords/api/1.0/password/list") + if self.cached_passwords: + self.last_cache = time.time() + else: + self.debug( + { "action": "list_passwords", "message": "Reusing cached list of password" } + ) + return self.cached_passwords def list_passwords_folders(self): '''List passwords folders''' @@ -525,7 +553,7 @@ class NextcloudHandler: post_obj = { 'id': folder_id } - r = requests.delete( + r = self.session.delete( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/delete', data=post_obj, headers=self.headers(), @@ -566,7 +594,7 @@ class NextcloudHandler: 'label': name } - r = requests.post( + r = self.session.post( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/create', data=post_obj, headers=self.headers(), @@ -637,9 +665,11 @@ class NextcloudHandler: self.debug( { "action": "check_exists_password", "object": obj } ) - for password in self.list_passwords(): - if self.is_same_password(obj, password): - return True + all_passwords = self.list_passwords() + if all_passwords: + for password in all_passwords: + if self.is_same_password(obj, password): + return True return False def create_password(self, post_obj): @@ -659,7 +689,7 @@ class NextcloudHandler: self.debug( { "action": "create_password", "object": safer_obj } ) - r = requests.post( + r = self.session.post( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create', data=post_obj, headers=self.headers(), @@ -668,6 +698,10 @@ class NextcloudHandler: ) if r.status_code == 201: + if self.cached_passwords: + self.cached_passwords.append(post_obj) + else: + self.cached_passwords = [ post_obj ] return r.json() self.error(r.json()) self.error( @@ -699,7 +733,7 @@ class NextcloudHandler: def delete_password(self, post_obj): '''Delete a password''' try: - r = requests.delete( + r = self.session.delete( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete', data=post_obj, headers=self.headers(), @@ -728,7 +762,7 @@ class NextcloudHandler: def update_password(self, post_obj): '''Update a password''' try: - r = requests.patch( + r = self.session.patch( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update', data=post_obj, headers=self.headers(), @@ -795,7 +829,10 @@ class NextcloudHandler: class NcPasswordClient: '''Nextcloud Password Client''' - def __init__(self, debug_level, log_file, host, user, api_token, cse_password, timeout): + def __init__( + self, debug_level, log_file, host, user, api_token, cse_password, + timeout, cache_duration, https_proxy + ): self.config = {} self.config['debug_level'] = debug_level if log_file is None: @@ -817,6 +854,8 @@ class NcPasswordClient: "api_token": api_token, "cse_password": cse_password, "timeout": timeout, + "cache_duration": cache_duration, + "https_proxy": https_proxy, } self.nc = NextcloudHandler(params) @@ -922,10 +961,29 @@ class NcPasswordClient: if field == 'login': field = 'username' obj[field] = value - self.debug( - { "action": "created_password", "object": self.nc.create_password(obj) } + new_password = self.nc.create_password(obj) + if new_password: + self.debug( + { "action": "created_password", "object": new_password } + ) + count += 1 + + def delete_all_passwords(self, yes_i_am_sure): + '''DANGEROUS! Delete ALL passwords from your Nextcloud Password instance''' + if not yes_i_am_sure: + answer = input( + 'WARNING! Are you completely sure you want to delete ALL your passwords? (Y/n)' + ) + if answer != "Y": + self.info( + { "action": "delete_all_passwords", "message": "Aborting after answering something different from 'Y'" } ) - count += 1 + return False + for item in self.nc.list_passwords(): + self.debug( + { "action": "delete_all_passwords", "message": "Deleting password", "object": item } + ) + self.nc.delete_password(item) def _init_log(self): ''' Initialize log object ''' @@ -1006,13 +1064,27 @@ class NcPasswordClient: type=click.INT, help='Nextcloud user\'s end-to-end encryption password' ) +@click.option( + '--cache-duration', '-c', + default=300, + type=click.INT, + help='Number of seconds to hold the list of passwords' +) +@click.option( + '--https-proxy', '-P', + help='HTTPS proxy to use to connect to the Nextcloud instance' +) @click_config_file.configuration_option() @click.pass_context -def cli(ctx, debug_level, log_file, host, user, api_token, cse_password, timeout): +def cli( + ctx, debug_level, log_file, host, user, api_token, cse_password, + timeout, cache_duration, https_proxy + ): '''Client function to pass context''' ctx.ensure_object(dict) ctx.obj['NcPasswordClient'] = NcPasswordClient( - debug_level, log_file, host, user, api_token, cse_password, timeout + debug_level, log_file, host, user, api_token, cse_password, + timeout, cache_duration, https_proxy ) @cli.command() @@ -1087,5 +1159,17 @@ def migrate_pass(ctx, limit): '''Migrate password store passwords to Nextcloud Passwords''' ctx.obj['NcPasswordClient'].migrate_pass(limit) +@cli.command() +@click.option( + '--yes-I-am-sure', + default=False, + help="DAGEROUS! Don't prompt for confirmation before deleting all passwords." +) +@click_config_file.configuration_option() +@click.pass_context +def delete_all_passwords(ctx, yes_i_am_sure): + '''Delete all passwords''' + ctx.obj['NcPasswordClient'].delete_all_passwords(yes_i_am_sure) + if __name__ == "__main__": cli(obj={})