Compare commits

..

12 commits
v0.0.5 ... main

4 changed files with 177 additions and 28 deletions

View file

@ -112,6 +112,7 @@ class NextcloudHandler:
self._log = params['logger'] self._log = params['logger']
else: else:
self._init_log(params) self._init_log(params)
self.params = params
self.http = 'https' self.http = 'https'
self.timeout = params.get('timeout', 3) self.timeout = params.get('timeout', 3)
self.ssl = True self.ssl = True
@ -294,10 +295,7 @@ class NextcloudHandler:
"message": "Fernet token for passwords local cache is invalid, discarding the local cache.", "message": "Fernet token for passwords local cache is invalid, discarding the local cache.",
} }
) )
self.cache = { self.reset_cache()
"last_update": -1,
"cached_passwords": []
}
else: else:
self.debug( self.debug(
@ -306,10 +304,7 @@ class NextcloudHandler:
"message": "The cache file was empty, so initializing cache" "message": "The cache file was empty, so initializing cache"
} }
) )
self.cache = { self.reset_cache()
"last_update": -1,
"cached_passwords": []
}
else: else:
self.debug( self.debug(
{ {
@ -317,6 +312,10 @@ class NextcloudHandler:
"message": "There wasn't a cache file, so initializing cache" "message": "There wasn't a cache file, so initializing cache"
} }
) )
self.reset_cache()
def reset_cache(self):
'''Reset cached password'''
self.cache = { self.cache = {
"last_update": -1, "last_update": -1,
"cached_passwords": [] "cached_passwords": []
@ -350,14 +349,34 @@ class NextcloudHandler:
return obj return obj
def _output(self, obj, unsafe=False): def _output(self, obj, unsafe=False):
if 'timestamp' not in obj:
obj['timestamp'] = time.time()
if unsafe: if unsafe:
out_obj = obj out_obj = obj
else: else:
out_obj = self._safer_obj(obj) out_obj = self._safer_obj(obj)
if self.output_format == 'json': if self.output_format == 'json':
try:
return json.dumps(out_obj, indent=2) return json.dumps(out_obj, indent=2)
except json.decoder.JSONDecodeError as error:
self.error(
{
'action': '_outpout',
'message': 'Error decoding JSON',
'error': error,
}
)
if self.output_format == 'yaml': if self.output_format == 'yaml':
try:
return dump(out_obj, Dumper=Dumper) return dump(out_obj, Dumper=Dumper)
except Exception as error:
self.error(
{
'action': '_outpout',
'message': 'Error decoding YAML',
'error': error,
}
)
output = '' output = ''
if isinstance(out_obj, list): if isinstance(out_obj, list):
output += '-' * os.get_terminal_size(0)[0] output += '-' * os.get_terminal_size(0)[0]
@ -373,6 +392,10 @@ class NextcloudHandler:
def debug(self, obj, unsafe=False): def debug(self, obj, unsafe=False):
'''Show debug information''' '''Show debug information'''
obj['connection'] = {
"host": self.params['host'],
"user": self.params['user']
}
self._log.debug( self._log.debug(
self._output(obj, unsafe=unsafe) self._output(obj, unsafe=unsafe)
) )
@ -426,7 +449,7 @@ class NextcloudHandler:
def get(self, path): def get(self, path):
'''Do a GET request''' '''Do a GET request'''
self.debug({ "action": "get", "message": f"Requesting {path}" }) self.debug({ "action": "get", "message": f"Requesting '{self.http}://{self.host}/{path}'" })
try: try:
r = self.session.get( r = self.session.get(
f'{self.http}://{self.host}/{path}', f'{self.http}://{self.host}/{path}',
@ -438,6 +461,8 @@ class NextcloudHandler:
self.debug( self.debug(
{"action": "get", "status_code": r.status_code, "size": len(r.content)} {"action": "get", "status_code": r.status_code, "size": len(r.content)}
) )
if r.headers['Content-Type'] == 'text/html; charset=UTF-8':
return None
return r.json() return r.json()
if r.status_code == 404: if r.status_code == 404:
self.error( self.error(
@ -755,7 +780,7 @@ class NextcloudHandler:
"folder_name": name "folder_name": name
} }
) )
return False return None
def exists_passwords_folder(self, name): def exists_passwords_folder(self, name):
'''Test if a passwords folder exists''' '''Test if a passwords folder exists'''
@ -764,6 +789,13 @@ class NextcloudHandler:
return folder return folder
return False return False
def _is_uuid(self, string):
'''Test if a string is a UUID'''
match = re.match(r'^[0-9a-f\-]*$', string)
if match:
return True
return False
def create_passwords_folder(self, name): def create_passwords_folder(self, name):
'''Create passwords folder''' '''Create passwords folder'''
if not self.exists_passwords_folder(name): if not self.exists_passwords_folder(name):
@ -883,9 +915,14 @@ class NextcloudHandler:
} }
) )
if new_obj.get('folder', '') != '': if new_obj.get('folder', '') != '':
if self._is_uuid(new_obj['folder']):
folder_id = new_obj['folder']
else:
folder_id = self.get_folder_id(new_obj['folder']) folder_id = self.get_folder_id(new_obj['folder'])
if not folder_id: if not folder_id:
folder_id = self.create_passwords_folder(new_obj['folder'])['id'] created_folder = self.create_passwords_folder(new_obj['folder'])
if created_folder:
folder_id = created_folder['id']
new_obj['folder'] = folder_id new_obj['folder'] = folder_id
else: else:
new_obj.pop('folder', None) new_obj.pop('folder', None)
@ -934,9 +971,17 @@ class NextcloudHandler:
} }
) )
else: else:
if update: if update and 'id' in exists_password:
new_obj['id'] = exists_password['id'] new_obj['id'] = exists_password['id']
return self.update_password(new_obj) return self.update_password(new_obj)
if 'id' not in exists_password:
self.debug(
{
"action": "create_password",
"message": "Found an existing password without an 'id'",
"existing_password": exists_password
}
)
self.warning( self.warning(
{ {
"action": "create_password", "action": "create_password",
@ -975,7 +1020,7 @@ class NextcloudHandler:
"action": "delete_password", "action": "delete_password",
"object": min_obj, "object": min_obj,
"message": f"Nextcloud instance returned status code {r.status_code}", "message": f"Nextcloud instance returned status code {r.status_code}",
"returned_content": r.content, "returned_content": f"{r.content}",
} }
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
@ -983,7 +1028,7 @@ class NextcloudHandler:
{ {
"action": "delete_password", "action": "delete_password",
"message": f"Timeout ({self.timeout} sec) error doing GET request", "message": f"Timeout ({self.timeout} sec) error doing GET request",
"error": error, "error": f"{error}",
} }
) )
return False return False
@ -1115,12 +1160,48 @@ class NextcloudHandler:
result['path'] = match.group(3) result['path'] = match.group(3)
split_hostname = result['hostname'].split('.') split_hostname = result['hostname'].split('.')
result['tld'] = split_hostname[len(split_hostname)-1] result['tld'] = split_hostname[len(split_hostname)-1]
result['domain'] = f"{split_hostname[len(split_hostname)-1]}.{split_hostname[len(split_hostname)-2]}" result['domain'] = f"{split_hostname[len(split_hostname)-2]}.{result['tld']}"
return result
match = re.search('([a-z]*)://([^/]*)/$', url)
if match:
result['protocol'] = match.group(1)
result['hostname'] = match.group(2)
result['path'] = ''
split_hostname = result['hostname'].split('.')
result['tld'] = split_hostname[len(split_hostname)-1]
result['domain'] = f"{split_hostname[len(split_hostname)-2]}.{result['tld']}"
return result
match = re.search('([a-z]*)://([^/]*)$', url)
if match:
result['protocol'] = match.group(1)
result['hostname'] = match.group(2)
result['path'] = ''
split_hostname = result['hostname'].split('.')
result['tld'] = split_hostname[len(split_hostname)-1]
result['domain'] = f"{split_hostname[len(split_hostname)-2]}.{result['tld']}"
return result
match = re.search('([^/]*)$', url)
if match:
result['protocol'] = ''
result['hostname'] = match.group(1)
result['path'] = ''
split_hostname = result['hostname'].split('.')
result['tld'] = split_hostname[len(split_hostname)-1]
result['domain'] = f"{split_hostname[len(split_hostname)-2]}.{result['tld']}"
return result
self.debug(
{
"action": "_split_url",
"message": f"URL '{url}' can't be split, no match for our regular expressions"
}
)
return result return result
def get_folder_id(self, folder_name): def get_folder_id(self, folder_name):
'''Get a folder id from the name''' '''Get a folder id from the name'''
for folder in self.list_passwords_folders(): passwords_folders = self.list_passwords()
if passwords_folders:
for folder in passwords_folders:
if folder['label'] == folder_name: if folder['label'] == folder_name:
return folder['id'] return folder['id']
return False return False
@ -1148,12 +1229,17 @@ class NcPasswordClient:
'nc_password_client.log' 'nc_password_client.log'
) )
self._init_log() self._init_log()
self.timeout = timeout
self.cache_duration = cache_duration
self.https_proxy = https_proxy
self.output_format = output_format
self.field_replacements = field_replacements
params = { params = {
"host": host, "host": host,
"user": user, "user": user,
"api_token": api_token, "api_token": api_token,
"cse_password": cse_password, "cse_password": cse_password,
"timeout": timeout, "timeout": self.timeout,
"cache_duration": cache_duration, "cache_duration": cache_duration,
"https_proxy": https_proxy, "https_proxy": https_proxy,
"logger": self._log, "logger": self._log,
@ -1161,6 +1247,7 @@ class NcPasswordClient:
"field_replacements": field_replacements, "field_replacements": field_replacements,
} }
self.nc = NextcloudHandler(params) self.nc = NextcloudHandler(params)
self.destination_nc = None
def _safer_obj(self, obj, fields=None): def _safer_obj(self, obj, fields=None):
if fields is None: if fields is None:
@ -1406,6 +1493,7 @@ class NcPasswordClient:
def remove_duplicates(self, limit, comparation_fields): def remove_duplicates(self, limit, comparation_fields):
'''Remove duplicate passwords''' '''Remove duplicate passwords'''
self.nc.comparation_fields = comparation_fields self.nc.comparation_fields = comparation_fields
self.nc.reset_cache()
checked_passwords = [] checked_passwords = []
count = 0 count = 0
if limit == 0: if limit == 0:
@ -1542,6 +1630,35 @@ class NcPasswordClient:
) )
return True return True
def migrate_passwords(
self,
destination_host,
destination_user,
destination_api_token,
destination_cse_password
):
'''Migrate passwords from one NextCloud instance to another'''
params = {
"host": destination_host,
"user": destination_user,
"api_token": destination_api_token,
"cse_password": destination_cse_password,
"timeout": self.timeout,
"cache_duration": self.cache_duration,
"https_proxy": self.https_proxy,
"logger": self._log,
"output_format": self.output_format,
"field_replacements": self.field_replacements,
}
self.destination_nc = NextcloudHandler(params)
all_passwords = self.nc.list_passwords()
if all_passwords:
for password in all_passwords:
self.destination_nc.create_password(password, update=True)
else:
return False
return True
def _init_log(self): def _init_log(self):
''' Initialize log object ''' ''' Initialize log object '''
self._log = logging.getLogger("nc_password_client") self._log = logging.getLogger("nc_password_client")
@ -1640,6 +1757,37 @@ def cli(
timeout, cache_duration, https_proxy, output_format, field_replacements timeout, cache_duration, https_proxy, output_format, field_replacements
) )
@cli.command()
@click.option(
'--destination-host', '-D',
required=True,
help='Destination host'
)
@click.option(
'--destination-user', '-U',
required=True,
help='Destination host user name'
)
@click.option(
'--destination-api-token', '-A',
required=True,
help="Destination host user's API token "
)
@click.option(
'--destination-cse-password', '-C',
help='Destination Nextcloud host user\'s end-to-end encryption password'
)
@click_config_file.configuration_option()
@click.pass_context
def migrate_passwords(ctx, destination_host, destination_user, destination_api_token, destination_cse_password):
'''Migrate passwords between two NextCloud Password instances'''
ctx.obj['NcPasswordClient'].migrate_passwords(
destination_host,
destination_user,
destination_api_token,
destination_cse_password
)
@cli.command() @cli.command()
@click.option('--name', '-n', required=True, help='Name of the password to show') @click.option('--name', '-n', required=True, help='Name of the password to show')
@click.option( @click.option(

View file

@ -7,7 +7,7 @@ Homepage = "https://repos.susurrando.com/adelgado/nc_password_client"
[project] [project]
name = "nc_password_client" name = "nc_password_client"
version = "0.0.5" version = "0.0.6"
description = "Nextcloud Password client" description = "Nextcloud Password client"
readme = "README.md" readme = "README.md"
authors = [{ name = "Antonio J. Delgado", email = "ad@susurrando.com" }] authors = [{ name = "Antonio J. Delgado", email = "ad@susurrando.com" }]

View file

@ -5,3 +5,4 @@ pysodium
passpy passpy
secretstorage secretstorage
pyyaml pyyaml
libsodium

View file

@ -1,6 +1,6 @@
[metadata] [metadata]
name = nc_password_client name = nc_password_client
version = 0.0.5 version = 0.0.6
[options] [options]
packages = nc_password_client packages = nc_password_client