Fix remove duplicates command

This commit is contained in:
Antonio J. Delgado 2024-11-28 20:02:32 +02:00
parent 2192e1b895
commit 63d703daff

View file

@ -123,6 +123,11 @@ class NextcloudHandler:
'.cache', '.cache',
'nc_password_client.cache' 'nc_password_client.cache'
) )
if 'comparation_fields' not in params:
self.comparation_fields = [
'username', 'password', 'url'
]
self.field_replacements = {} self.field_replacements = {}
for field_replacement in params['field_replacements']: for field_replacement in params['field_replacements']:
key, value = field_replacement.split(':') key, value = field_replacement.split(':')
@ -944,25 +949,33 @@ class NextcloudHandler:
def delete_password(self, post_obj): def delete_password(self, post_obj):
'''Delete a password''' '''Delete a password'''
min_obj = {
'id': post_obj['id'],
}
try: try:
r = self.session.delete( r = self.session.delete(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete', f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete',
data=post_obj, data=min_obj,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl, timeout=self.timeout verify=self.ssl, timeout=self.timeout
) )
if r.status_code < 299: if r.status_code < 299:
for password in self.cache['cached_passwords']: if r.json():
if self.cache['cached_passwords']['id'] == post_obj['id']: counter = 0
self.cache['cached_passwords'].pop(password.numerator-1) for password in self.cache['cached_passwords']:
self._write_cache() if password['id'] == post_obj['id']:
self.cache['cached_passwords'].pop(counter)
self._write_cache()
counter += 1
return r.json() return r.json()
self.error( self.error(
{ {
"action": "delete_password", "action": "delete_password",
"object": min_obj,
"message": f"Nextcloud instance returned status code {r.status_code}", "message": f"Nextcloud instance returned status code {r.status_code}",
"returned_content": r.content,
} }
) )
except requests.exceptions.ReadTimeout as error: except requests.exceptions.ReadTimeout as error:
@ -1035,7 +1048,7 @@ class NextcloudHandler:
return True return True
return False return False
def is_same_password(self, obj1, obj2): def is_same_password(self, obj1, obj2, ignore_empty_fields=False):
'''Test if two password objects are the same or similar''' '''Test if two password objects are the same or similar'''
safer_obj1 = dict(obj1, **{ 'password': '***' }) safer_obj1 = dict(obj1, **{ 'password': '***' })
safer_obj2 = dict(obj2, **{ 'password': '***' }) safer_obj2 = dict(obj2, **{ 'password': '***' })
@ -1060,22 +1073,51 @@ class NextcloudHandler:
# } # }
# } # }
# ) # )
if self.is_same_key('username', obj1, obj2): match = True
if self.is_same_key('password', obj1, obj2): for field in self.comparation_fields:
if self.is_same_key('url', obj1, obj2): if ignore_empty_fields and (obj1[field] == '' or obj2[field] == ''):
#if self.is_same_key('folder', obj1, obj2): continue
self.debug( if not self.is_same_key(field, obj1, obj2):
{ if field == 'url':
"action": "notify_match", url1 = self._split_url(obj1[field])
"object": { url2 = self._split_url(obj2[field])
"obj1": safer_obj1, if ('hostname' in url1 and 'hostname' in url2 and
"obj2": safer_obj2 url1['hostname'] == url2['hostname']):
self.debug(
{
"action": "is_same_password",
"message": f"Hostname in URL of both passwords match but URL is no match '{obj1[field]}' != '{obj2[field]}'"
} }
} )
) continue
return True match = False
break
if match:
self.debug(
{
"action": "notify_match",
"object": {
"obj1": safer_obj1,
"obj2": safer_obj2
}
}
)
return True
return False return False
def _split_url(self, url):
result = {}
match = re.search('([a-z]*)://([^/]*)/(.*)', url)
if match:
result['protocol'] = match.group(1)
result['hostname'] = match.group(2)
result['path'] = match.group(3)
split_hostname = result['hostname'].split('.')
result['tld'] = split_hostname[len(split_hostname)-1]
result['domain'] = f"{split_hostname[len(split_hostname)-1]}.{split_hostname[len(split_hostname)-2]}"
return result
def get_folder_id(self, folder_name): def get_folder_id(self, folder_name):
'''Get a folder id from the name''' '''Get a folder id from the name'''
for folder in self.list_passwords_folders(): for folder in self.list_passwords_folders():
@ -1253,12 +1295,13 @@ class NcPasswordClient:
sys.exit(2) sys.exit(2)
for password in self.nc.list_passwords(): for password in self.nc.list_passwords():
if password[field] == name: if password[field] == name:
safer_obj = dict(password, **{ 'password': '***' })
self.debug( self.debug(
{ "action": "delete_password", "object": safer_obj } {
) "action": "deleted_password",
self.debug( "message": "Result of deleting password",
{ "action": "deleted_password", "object": self.nc.delete_password(password)} "object": password,
"result": self.nc.delete_password(password)
}
) )
return True return True
self.warning( self.warning(
@ -1360,8 +1403,9 @@ class NcPasswordClient:
self.nc.delete_password(item) self.nc.delete_password(item)
return True return True
def remove_duplicates(self, limit): def remove_duplicates(self, limit, comparation_fields):
'''Remove duplicate passwords''' '''Remove duplicate passwords'''
self.nc.comparation_fields = comparation_fields
checked_passwords = [] checked_passwords = []
count = 0 count = 0
if limit == 0: if limit == 0:
@ -1725,11 +1769,17 @@ def create_passwords_folder(ctx, name):
default=-1, default=-1,
help='Maximun number of passwords to remove. -1 for unlimited.' help='Maximun number of passwords to remove. -1 for unlimited.'
) )
@click.option(
'--comparation-fields', '-c',
multiple=True,
default=['username', 'password', 'url'],
help='Field names to compare'
)
@click_config_file.configuration_option() @click_config_file.configuration_option()
@click.pass_context @click.pass_context
def remove_duplicates(ctx, limit): def remove_duplicates(ctx, limit, comparation_fields):
'''Remove duplicate passwords''' '''Remove duplicate passwords'''
ctx.obj['NcPasswordClient'].remove_duplicates(limit) ctx.obj['NcPasswordClient'].remove_duplicates(limit, comparation_fields)
@cli.command() @cli.command()
@click_config_file.configuration_option() @click_config_file.configuration_option()