Add migration command

This commit is contained in:
Antonio J. Delgado 2024-11-11 08:16:32 +02:00
parent 679a4300b7
commit 3a9c124707
2 changed files with 115 additions and 34 deletions

130
nc_password_client/nc_password_client.py Normal file → Executable file
View file

@ -19,6 +19,7 @@ from xml.dom import minidom
import binascii import binascii
import click import click
import click_config_file import click_config_file
import passpy
try: try:
import requests import requests
HAS_REQUESTS_LIB = True HAS_REQUESTS_LIB = True
@ -99,18 +100,18 @@ class NextcloudErrorHandler:
def status_code_error(self, status): def status_code_error(self, status):
'''Process status code error???''' '''Process status code error???'''
try: if status > 399:
self._log.error( self._log.error(
'Nextcloud returned with status code %s', 'Nextcloud returned status code %s',
status status
) )
except Exception: sys.exit(1)
self._log.error( else:
'Nextcloud returned with status code %s', self._log.debug(
'Nextcloud returned status %s',
status status
) )
def parameter_spects(spec_arguments): def parameter_spects(spec_arguments):
'''Specification of parameters???''' '''Specification of parameters???'''
argument_spec = dict( argument_spec = dict(
@ -144,6 +145,7 @@ class NextcloudHandler:
self._init_log(params) self._init_log(params)
self.exit = NextcloudErrorHandler() self.exit = NextcloudErrorHandler()
self.http = 'https' self.http = 'https'
self.timeout = params.get('timeout', 3)
self.ssl = True self.ssl = True
if params.get('ssl_mode') == 'http': if params.get('ssl_mode') == 'http':
self.http = 'http' self.http = 'http'
@ -261,7 +263,8 @@ class NextcloudHandler:
'''Do a GET request''' '''Do a GET request'''
r = requests.get( r = requests.get(
f'{self.http}://{self.host}/{path}', f'{self.http}://{self.host}/{path}',
auth=(self.user, self.token), verify=self.ssl, headers=self.headers() auth=(self.user, self.token), verify=self.ssl, headers=self.headers(),
timeout=self.timeout
) )
if r.status_code == 200: if r.status_code == 200:
@ -344,12 +347,12 @@ class NextcloudHandler:
if src: if src:
r = requests.put( r = requests.put(
f'{self.http}://{self.host}/{path}', f'{self.http}://{self.host}/{path}',
data=open(src, 'rb'), auth=(self.user, self.token), verify=self.ssl data=open(src, 'rb'), auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout
) )
else: else:
r = requests.put( r = requests.put(
f'{self.http}://{self.host}/{path}', f'{self.http}://{self.host}/{path}',
headers=self.headers, auth=(self.user, self.token), verify=self.ssl headers=self.headers, auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout
) )
if r.status_code in [200, 201, 204]: if r.status_code in [200, 201, 204]:
@ -361,7 +364,7 @@ class NextcloudHandler:
'''Do a DELETE request''' '''Do a DELETE request'''
r = requests.delete( r = requests.delete(
f'{self.http}://{self.host}/{path}', f'{self.http}://{self.host}/{path}',
auth=(self.user, self.token), verify=self.ssl auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout
) )
if r.status_code in [200, 204]: if r.status_code in [200, 204]:
@ -385,7 +388,7 @@ class NextcloudHandler:
data=body, data=body,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl verify=self.ssl, timeout=self.timeout
) )
if r.status_code == 201: if r.status_code == 201:
@ -412,7 +415,7 @@ class NextcloudHandler:
data=post_obj, data=post_obj,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl verify=self.ssl, timeout=self.timeout
) )
if r.status_code == 200: if r.status_code == 200:
self.x_api_session = r.headers.get('X-API-SESSION') self.x_api_session = r.headers.get('X-API-SESSION')
@ -464,7 +467,8 @@ class NextcloudHandler:
data=post_obj, data=post_obj,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl verify=self.ssl,
timeout=self.timeout
) )
if r.status_code == 201: if r.status_code == 201:
return r.json() return r.json()
@ -494,7 +498,8 @@ class NextcloudHandler:
data=post_obj, data=post_obj,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl verify=self.ssl,
timeout=self.timeout
) )
if r.status_code == 201: if r.status_code == 201:
@ -549,20 +554,33 @@ class NextcloudHandler:
else: else:
self.exit.status_code_error(r.status_code) self.exit.status_code_error(r.status_code)
def exists_password(self, name):
for password in self.list_passwords():
if password['label'] == name:
return True
return False
def create_password(self, post_obj): def create_password(self, post_obj):
'''Create/add a password''' '''Create/add a password'''
if not self.exists_password(post_obj['label']):
r = requests.post( r = requests.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create', f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create',
data=post_obj, data=post_obj,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl verify=self.ssl, timeout=self.timeout
) )
if r.status_code == 201: if r.status_code == 201:
return r.json() return r.json()
else: else:
self._log.error(r.json())
self.exit.status_code_error(r.status_code) self.exit.status_code_error(r.status_code)
else:
self._log.warning(
"Password with name '%s' already exists",
post_obj['label']
)
def delete_password(self, post_obj): def delete_password(self, post_obj):
'''Delete a password''' '''Delete a password'''
@ -571,7 +589,7 @@ class NextcloudHandler:
data=post_obj, data=post_obj,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl verify=self.ssl, timeout=self.timeout
) )
if r.status_code == 200: if r.status_code == 200:
@ -586,7 +604,7 @@ class NextcloudHandler:
data=post_obj, data=post_obj,
headers=self.headers(), headers=self.headers(),
auth=(self.user, self.token), auth=(self.user, self.token),
verify=self.ssl verify=self.ssl, timeout=self.timeout
) )
if r.status_code == 200: if r.status_code == 200:
@ -594,14 +612,10 @@ class NextcloudHandler:
else: else:
self.exit.status_code_error(r.status_code) self.exit.status_code_error(r.status_code)
def user(self):
'''Get user'''
return self.user
class NcPasswordClient: class NcPasswordClient:
'''Nextcloud Password Client''' '''Nextcloud Password Client'''
def __init__(self, debug_level, log_file, host, user, api_token, cse_password): def __init__(self, debug_level, log_file, host, user, api_token, cse_password, timeout):
self.config = {} self.config = {}
self.config['debug_level'] = debug_level self.config['debug_level'] = debug_level
if log_file is None: if log_file is None:
@ -621,7 +635,8 @@ class NcPasswordClient:
"host": host, "host": host,
"user": user, "user": user,
"api_token": api_token, "api_token": api_token,
"cse_password": cse_password "cse_password": cse_password,
"timeout": timeout,
} }
self.nc = NextcloudHandler(params) self.nc = NextcloudHandler(params)
@ -639,8 +654,27 @@ class NcPasswordClient:
def create_password(self, obj): def create_password(self, obj):
'''Create a password with an object''' '''Create a password with an object'''
self._log.debug(
"Creating password %s",
obj
)
print(json.dumps(self.nc.create_password(obj), indent=2)) print(json.dumps(self.nc.create_password(obj), indent=2))
def delete_password(self, name):
'''Delete a password'''
for password in self.nc.list_passwords():
if password['label'] == name:
self._log.debug(
"Deleting password:%s",
json.dumps(password)
)
print(json.dumps(self.nc.delete_password(password), indent=2))
return True
self._log.warning(
"Password with name '%s' doesn't exist",
name
)
def create_passwords_folder(self, name): def create_passwords_folder(self, name):
'''Create a passwords folder''' '''Create a passwords folder'''
print(json.dumps(self.nc.create_passwords_folder(name), indent=2)) print(json.dumps(self.nc.create_passwords_folder(name), indent=2))
@ -649,6 +683,31 @@ class NcPasswordClient:
'''Delete a passwords folder''' '''Delete a passwords folder'''
print(json.dumps(self.nc.delete_passwords_folder(name), indent=2)) print(json.dumps(self.nc.delete_passwords_folder(name), indent=2))
def migrate_pass(self):
'''Migrate password store to Nextcloud pass'''
store = passpy.store.Store()
for item in store.find(''):
obj = {
"label": os.path.basename(item),
}
folder = os.path.dirname(item)
if folder != '':
obj['folder'] = folder
raw_values = store.get_key(item).split('\n')
obj['password'] = raw_values[0]
raw_values.pop(0)
for line in raw_values:
if ': ' in line:
split_line = line.split(': ')
field = split_line[0]
value = split_line[1]
else:
field = line
value = ''
if field != '' and value != '':
obj[field] = value
print(json.dumps(self.nc.create_password(obj), indent=2))
def _init_log(self): def _init_log(self):
''' Initialize log object ''' ''' Initialize log object '''
self._log = logging.getLogger("nc_password_client") self._log = logging.getLogger("nc_password_client")
@ -722,13 +781,19 @@ class NcPasswordClient:
'--cse-password', '-p', '--cse-password', '-p',
help='Nextcloud user\'s end-to-end encryption password' help='Nextcloud user\'s end-to-end encryption password'
) )
@click.option(
'--timeout', '-t',
default=3,
type=click.INT,
help='Nextcloud user\'s end-to-end encryption password'
)
@click_config_file.configuration_option() @click_config_file.configuration_option()
@click.pass_context @click.pass_context
def cli(ctx, debug_level, log_file, host, user, api_token, cse_password): def cli(ctx, debug_level, log_file, host, user, api_token, cse_password, timeout):
'''Client function to pass context''' '''Client function to pass context'''
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj['NcPasswordClient'] = NcPasswordClient( ctx.obj['NcPasswordClient'] = NcPasswordClient(
debug_level, log_file, host, user, api_token, cse_password debug_level, log_file, host, user, api_token, cse_password, timeout
) )
@cli.command() @cli.command()
@ -754,6 +819,14 @@ def create_password(ctx, obj):
'''Create a password''' '''Create a password'''
ctx.obj['NcPasswordClient'].create_password(json.loads(obj)) ctx.obj['NcPasswordClient'].create_password(json.loads(obj))
@cli.command()
@click.option('--name', '-n', required=True, help='Name of the password to delete')
@click_config_file.configuration_option()
@click.pass_context
def delete_password(ctx, name):
'''Delete a password'''
ctx.obj['NcPasswordClient'].delete_password(name)
@cli.command() @cli.command()
@click.option('--name', '-n', required=True, help='Name of the passwords folder to create') @click.option('--name', '-n', required=True, help='Name of the passwords folder to create')
@click_config_file.configuration_option() @click_config_file.configuration_option()
@ -777,5 +850,12 @@ def delete_passwords_folder(ctx, name):
'''Delete a password folder''' '''Delete a password folder'''
ctx.obj['NcPasswordClient'].delete_passwords_folder(name) ctx.obj['NcPasswordClient'].delete_passwords_folder(name)
@cli.command()
@click_config_file.configuration_option()
@click.pass_context
def migrate_pass(ctx):
'''Migrate password store passwords to Nextcloud Passwords'''
ctx.obj['NcPasswordClient'].migrate_pass()
if __name__ == "__main__": if __name__ == "__main__":
cli(obj={}) cli(obj={})

View file

@ -2,3 +2,4 @@ click
click_config_file click_config_file
requests requests
pysodium pysodium
passpy