Add tries for more requests

This commit is contained in:
Antonio J. Delgado 2024-11-11 10:46:46 +02:00
parent 88b4dbf276
commit a9b53b884e

View file

@ -312,17 +312,22 @@ class NextcloudHandler:
if src: if src:
r = requests.put( r = requests.put(
f'{self.http}://{self.host}/{path}', f'{self.http}://{self.host}/{path}',
data=open(src, 'rb'), auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout data=open(src, 'rb'),
auth=(self.user, self.token),
verify=self.ssl,
timeout=self.timeout
) )
else: else:
r = requests.put( r = requests.put(
f'{self.http}://{self.host}/{path}', f'{self.http}://{self.host}/{path}',
headers=self.headers, auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout headers=self.headers,
auth=(self.user, self.token),
verify=self.ssl,
timeout=self.timeout
) )
if r.status_code in [200, 201, 204]: if r.status_code in [200, 201, 204]:
return r, True return r, True
else:
self._log.error( self._log.error(
"Nextcloud instance returned status code: %s", "Nextcloud instance returned status code: %s",
r.status_code r.status_code
@ -568,8 +573,8 @@ r.status_code
else: else:
self._log.error( self._log.error(
"Nextcloud instance returned status code: %s", "Nextcloud instance returned status code: %s",
r.status_code r.status_code
) )
def exists_password(self, obj): def exists_password(self, obj):
'''Test if a password exist with the same name''' '''Test if a password exist with the same name'''
@ -583,6 +588,7 @@ r.status_code
if 'folder' in post_obj: if 'folder' in post_obj:
post_obj['folder'] = self.get_folder_id(post_obj['folder']) post_obj['folder'] = self.get_folder_id(post_obj['folder'])
if not self.exists_password(post_obj): if not self.exists_password(post_obj):
try:
r = requests.post( r = requests.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create', f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create',
data=post_obj, data=post_obj,
@ -593,12 +599,17 @@ r.status_code
if r.status_code == 201: if r.status_code == 201:
return r.json() return r.json()
else:
self._log.error(r.json()) self._log.error(r.json())
self._log.error( self._log.error(
"Nextcloud instance returned status code: %s", "Nextcloud instance returned status code: %s",
r.status_code r.status_code
) )
except requests.exceptions.ReadTimeout as error:
self._log.error(
"Timeout (%s sec) error doing GET request. %s",
self.timeout,
error
)
else: else:
self._log.warning( self._log.warning(
"Password with name '%s' already exists", "Password with name '%s' already exists",
@ -607,6 +618,7 @@ r.status_code
def delete_password(self, post_obj): def delete_password(self, post_obj):
'''Delete a password''' '''Delete a password'''
try:
r = requests.delete( r = requests.delete(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete', f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/delete',
data=post_obj, data=post_obj,
@ -620,11 +632,19 @@ r.status_code
else: else:
self._log.error( self._log.error(
"Nextcloud instance returned status code: %s", "Nextcloud instance returned status code: %s",
r.status_code r.status_code
) )
except requests.exceptions.ReadTimeout as error:
self._log.error(
"Timeout (%s sec) error doing GET request. %s",
self.timeout,
error
)
def update_password(self, post_obj): def update_password(self, post_obj):
'''Update a password''' '''Update a password'''
try:
r = requests.patch( r = requests.patch(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update', f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/update',
data=post_obj, data=post_obj,
@ -638,8 +658,14 @@ r.status_code
else: else:
self._log.error( self._log.error(
"Nextcloud instance returned status code: %s", "Nextcloud instance returned status code: %s",
r.status_code r.status_code
) )
except requests.exceptions.ReadTimeout as error:
self._log.error(
"Timeout (%s sec) error doing GET request. %s",
self.timeout,
error
)
def is_same_key(self, key, dict1, dict2): def is_same_key(self, key, dict1, dict2):
'''Test if two dictionaries have the same key with the same value''' '''Test if two dictionaries have the same key with the same value'''
@ -655,7 +681,8 @@ r.status_code
if ( if (
self.is_same_key('username', obj1, obj2) and self.is_same_key('username', obj1, obj2) and
self.is_same_key('password', obj1, obj2) and self.is_same_key('password', obj1, obj2) and
self.is_same_key('url', obj1, obj2) self.is_same_key('url', obj1, obj2) and
self.is_same_key('folder', obj1, obj2)
): ):
return True return True
return False return False
@ -910,7 +937,11 @@ def delete_passwords_folder(ctx, name):
ctx.obj['NcPasswordClient'].delete_passwords_folder(name) ctx.obj['NcPasswordClient'].delete_passwords_folder(name)
@cli.command() @cli.command()
@click.option('--limit', '-l', default=-1, help='Maximun number of passwords to migrate. -1 for unlimited.') @click.option(
'--limit', '-l',
default=-1,
help='Maximun number of passwords to migrate. -1 for unlimited.'
)
@click_config_file.configuration_option() @click_config_file.configuration_option()
@click.pass_context @click.pass_context
def migrate_pass(ctx, limit): def migrate_pass(ctx, limit):