Add cache, command to delete all passwords, re-use sessions, add proxy

This commit is contained in:
Antonio J. Delgado 2024-11-11 22:52:43 +02:00
parent 6cb7783bed
commit a9f44fd140

View file

@ -13,6 +13,7 @@ import traceback
import json
import sys
import os
import time
import logging
from logging.handlers import SysLogHandler
from xml.dom import minidom
@ -87,6 +88,15 @@ class NextcloudHandler:
self.http = 'https'
self.timeout = params.get('timeout', 3)
self.ssl = True
self.cached_passwords = []
self.last_cache = -1
self.session = requests.Session()
proxies = {
'http': params.get('https_proxy', ''),
'https': params.get('https_proxy', ''),
}
self.session.proxies.update(proxies)
self.cache_duration = params.get('cache_duration', 300)
if params.get('ssl_mode') == 'http':
self.http = 'http'
elif params.get('ssl_mode') == 'skip':
@ -187,9 +197,16 @@ class NextcloudHandler:
def error(self, obj):
'''Show error information'''
try:
self._log.error(
json.dumps(obj, indent=2)
)
except TypeError as error:
self._log.error(
'Additional error showing the error from %s. %s',
obj,
error
)
def _init_log(self, params):
''' Initialize log object '''
@ -242,9 +259,9 @@ class NextcloudHandler:
def get(self, path):
'''Do a GET request'''
self.debug({ "action": "get", "message": "Requesting {path}" })
self.debug({ "action": "get", "message": f"Requesting {path}" })
try:
r = requests.get(
r = self.session.get(
f'{self.http}://{self.host}/{path}',
auth=(self.user, self.token), verify=self.ssl, headers=self.headers(),
timeout=self.timeout
@ -277,15 +294,15 @@ class NextcloudHandler:
{
"action": "get",
"message": f"Timeout ({self.timeout} sec) error doing GET request. %s",
"error": error
"error": f"{error}"
}
)
sys.exit(5)
# sys.exit(5)
return None
def propfind(self, path):
'''Do a PROPFIND request'''
s = requests.Session()
s = self.session
s.auth = (self.user, self.token)
try:
r = s.request(
@ -369,7 +386,7 @@ class NextcloudHandler:
'''Do a PUT request'''
try:
if src:
r = requests.put(
r = self.session.put(
f'{self.http}://{self.host}/{path}',
data=open(src, 'rb'),
auth=(self.user, self.token),
@ -377,7 +394,7 @@ class NextcloudHandler:
timeout=self.timeout
)
else:
r = requests.put(
r = self.session.put(
f'{self.http}://{self.host}/{path}',
headers=self.headers,
auth=(self.user, self.token),
@ -405,7 +422,7 @@ class NextcloudHandler:
def delete(self, path):
'''Do a DELETE request'''
try:
r = requests.delete(
r = self.session.delete(
f'{self.http}://{self.host}/{path}',
auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout
)
@ -440,7 +457,7 @@ class NextcloudHandler:
spreed_v1_path = "ocs/v2.php/apps/spreed/api/v1/chat"
try:
r = requests.post(
r = self.session.post(
f'{self.http}://{self.host}/{spreed_v1_path}/{channel}',
data=body,
headers=self.headers(),
@ -478,7 +495,7 @@ class NextcloudHandler:
'challenge': password_hash
}
r = requests.post(
r = self.session.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/session/open',
data=post_obj,
headers=self.headers(),
@ -503,7 +520,18 @@ class NextcloudHandler:
def list_passwords(self):
'''List all passwords'''
return self.get("index.php/apps/passwords/api/1.0/password/list")
if self.last_cache < (time.time() - self.cache_duration):
self.debug(
{ "action": "list_passwords", "message": "Updating cached list of password" }
)
self.cached_passwords = self.get("index.php/apps/passwords/api/1.0/password/list")
if self.cached_passwords:
self.last_cache = time.time()
else:
self.debug(
{ "action": "list_passwords", "message": "Reusing cached list of password" }
)
return self.cached_passwords
def list_passwords_folders(self):
'''List passwords folders'''
@ -525,7 +553,7 @@ class NextcloudHandler:
post_obj = {
'id': folder_id
}
r = requests.delete(
r = self.session.delete(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/delete',
data=post_obj,
headers=self.headers(),
@ -566,7 +594,7 @@ class NextcloudHandler:
'label': name
}
r = requests.post(
r = self.session.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/folder/create',
data=post_obj,
headers=self.headers(),
@ -637,7 +665,9 @@ class NextcloudHandler:
self.debug(
{ "action": "check_exists_password", "object": obj }
)
for password in self.list_passwords():
all_passwords = self.list_passwords()
if all_passwords:
for password in all_passwords:
if self.is_same_password(obj, password):
return True
return False
@ -659,7 +689,7 @@ class NextcloudHandler:
self.debug(
{ "action": "create_password", "object": safer_obj }
)
r = requests.post(
r = self.session.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create',
data=post_obj,
headers=self.headers(),
@ -668,6 +698,10 @@ class NextcloudHandler:
)
if r.status_code == 201:
if self.cached_passwords:
self.cached_passwords.append(post_obj)
else:
self.cached_passwords = [ post_obj ]
return r.json()
self.error(r.json())
self.error(
@ -699,7 +733,7 @@ class NextcloudHandler:
def delete_password(self, post_obj):
'''Delete a password'''
try:
r = requests.delete(
r = self.session.delete(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete',
data=post_obj,
headers=self.headers(),
@ -728,7 +762,7 @@ class NextcloudHandler:
def update_password(self, post_obj):
'''Update a password'''
try:
r = requests.patch(
r = self.session.patch(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update',
data=post_obj,
headers=self.headers(),
@ -795,7 +829,10 @@ class NextcloudHandler:
class NcPasswordClient:
'''Nextcloud Password Client'''
def __init__(self, debug_level, log_file, host, user, api_token, cse_password, timeout):
def __init__(
self, debug_level, log_file, host, user, api_token, cse_password,
timeout, cache_duration, https_proxy
):
self.config = {}
self.config['debug_level'] = debug_level
if log_file is None:
@ -817,6 +854,8 @@ class NcPasswordClient:
"api_token": api_token,
"cse_password": cse_password,
"timeout": timeout,
"cache_duration": cache_duration,
"https_proxy": https_proxy,
}
self.nc = NextcloudHandler(params)
@ -922,11 +961,30 @@ class NcPasswordClient:
if field == 'login':
field = 'username'
obj[field] = value
new_password = self.nc.create_password(obj)
if new_password:
self.debug(
{ "action": "created_password", "object": self.nc.create_password(obj) }
{ "action": "created_password", "object": new_password }
)
count += 1
def delete_all_passwords(self, yes_i_am_sure):
'''DANGEROUS! Delete ALL passwords from your Nextcloud Password instance'''
if not yes_i_am_sure:
answer = input(
'WARNING! Are you completely sure you want to delete ALL your passwords? (Y/n)'
)
if answer != "Y":
self.info(
{ "action": "delete_all_passwords", "message": "Aborting after answering something different from 'Y'" }
)
return False
for item in self.nc.list_passwords():
self.debug(
{ "action": "delete_all_passwords", "message": "Deleting password", "object": item }
)
self.nc.delete_password(item)
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
@ -1006,13 +1064,27 @@ class NcPasswordClient:
type=click.INT,
help='Nextcloud user\'s end-to-end encryption password'
)
@click.option(
'--cache-duration', '-c',
default=300,
type=click.INT,
help='Number of seconds to hold the list of passwords'
)
@click.option(
'--https-proxy', '-P',
help='HTTPS proxy to use to connect to the Nextcloud instance'
)
@click_config_file.configuration_option()
@click.pass_context
def cli(ctx, debug_level, log_file, host, user, api_token, cse_password, timeout):
def cli(
ctx, debug_level, log_file, host, user, api_token, cse_password,
timeout, cache_duration, https_proxy
):
'''Client function to pass context'''
ctx.ensure_object(dict)
ctx.obj['NcPasswordClient'] = NcPasswordClient(
debug_level, log_file, host, user, api_token, cse_password, timeout
debug_level, log_file, host, user, api_token, cse_password,
timeout, cache_duration, https_proxy
)
@cli.command()
@ -1087,5 +1159,17 @@ def migrate_pass(ctx, limit):
'''Migrate password store passwords to Nextcloud Passwords'''
ctx.obj['NcPasswordClient'].migrate_pass(limit)
@cli.command()
@click.option(
'--yes-I-am-sure',
default=False,
help="DAGEROUS! Don't prompt for confirmation before deleting all passwords."
)
@click_config_file.configuration_option()
@click.pass_context
def delete_all_passwords(ctx, yes_i_am_sure):
'''Delete all passwords'''
ctx.obj['NcPasswordClient'].delete_all_passwords(yes_i_am_sure)
if __name__ == "__main__":
cli(obj={})