diff --git a/nc_password_client/nc_password_client.py b/nc_password_client/nc_password_client.py index 764af00..800be99 100755 --- a/nc_password_client/nc_password_client.py +++ b/nc_password_client/nc_password_client.py @@ -17,6 +17,7 @@ import os import time import re import logging +import csv from xml.dom import minidom import binascii import click @@ -121,6 +122,16 @@ class NextcloudHandler: '.cache', 'nc_password_client.cache' ) + self.field_replacements = {} + for field_replacement in params['field_replacements']: + key, value = field_replacement.split(':') + self.field_replacements[key] = value + self.debug( + { + "action": "Initializing Netcloud handler", + "field_replacements": self.field_replacements + } + ) self._read_cache() self.session = requests.Session() headers = { @@ -830,27 +841,43 @@ class NextcloudHandler: def create_password(self, post_obj, update): '''Create/add a password''' - if post_obj.get('folder', '') != '': - folder_id = self.get_folder_id(post_obj['folder']) + new_obj = {} + for key in post_obj.keys(): + if key in self.field_replacements: + new_obj[self.field_replacements[key]] = post_obj[key] + else: + new_obj[key] = post_obj[key] + self.debug( + { + "action": "create_password", + "message": "Result of replacing fields", + "original_object": post_obj, + "new_object": new_obj + } + ) + if new_obj.get('folder', '') != '': + folder_id = self.get_folder_id(new_obj['folder']) if not folder_id: - folder_id = self.create_passwords_folder(post_obj['folder'])['id'] - post_obj['folder'] = folder_id + folder_id = self.create_passwords_folder(new_obj['folder'])['id'] + new_obj['folder'] = folder_id else: - post_obj.pop('folder', None) - if not 'username' in post_obj: - post_obj['username'] = '' - if not 'url' in post_obj: - post_obj['url'] = '' - safer_obj = dict(post_obj, **{ 'password': '***' }) - exists_password = self.exists_password(post_obj) + new_obj.pop('folder', None) + if not 'username' in new_obj: + new_obj['username'] = '' + if not 'url' in new_obj: + new_obj['url'] = '' + exists_password = self.exists_password(new_obj) if not exists_password: try: self.debug( - { "action": "create_password", "object": safer_obj } + { + "action": "create_password", + "object": new_obj + } ) r = self.session.post( f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create', - data=post_obj, + data=new_obj, headers=self.headers(), auth=(self.user, self.token), verify=self.ssl, timeout=self.timeout @@ -858,16 +885,16 @@ class NextcloudHandler: if r.status_code == 201: if self.cache['cached_passwords']: - self.cache['cached_passwords'].append(post_obj) + self.cache['cached_passwords'].append(new_obj) else: - self.cache['cached_passwords'] = [ post_obj ] + self.cache['cached_passwords'] = [ new_obj ] return r.json() self.error(r.json()) self.error( { "action": "create_password", "message": f"Nextcloud instance returned status code: {r.status_code}", - "object": post_obj + "object": new_obj } ) except requests.exceptions.ReadTimeout as error: @@ -875,20 +902,20 @@ class NextcloudHandler: { "action": "create_password", "message": f"Timeout ({self.timeout} sec) error doing GET request", - "object": post_obj, + "object": new_obj, "error": error } ) else: if update: - post_obj['id'] = exists_password['id'] - return self.update_password(post_obj) + new_obj['id'] = exists_password['id'] + return self.update_password(new_obj) self.warning( { "action": "create_password", "message": "Password with that name/label already exists", - "object": safer_obj, - "label": post_obj['label'] + "object": new_obj, + "label": new_obj['label'] } ) return False @@ -1035,7 +1062,7 @@ class NcPasswordClient: def __init__( self, debug_level, log_file, host, user, api_token, cse_password, - timeout, cache_duration, https_proxy, output_format + timeout, cache_duration, https_proxy, output_format, field_replacements ): self.config = {} self.output_format = output_format @@ -1063,6 +1090,7 @@ class NcPasswordClient: "https_proxy": https_proxy, "logger": self._log, "output_format": output_format, + "field_replacements": field_replacements, } self.nc = NextcloudHandler(params) @@ -1146,8 +1174,9 @@ class NcPasswordClient: '''List all password folders''' self.info(self.nc.list_passwords_folders()) - def create_password(self, obj, update, label, password, folder, username, - url, notes, custom_fields, sha1_hash, hidden, favorite): + def create_password( + self, obj, update, label=None, password=None, folder=None, username=None, + url=None, notes=None, custom_fields=None, sha1_hash=None, hidden=None, favorite=None): '''Create a password with an object''' if obj is None: obj = {} @@ -1377,6 +1406,72 @@ class NcPasswordClient: ) return False + def import_csv( + self, limit, update, csv_file, encoding='utf-8', + delimiter=';', quotechar='"', first_line_fields=True, fieldnames=None + ): + '''Import passwords from a CSV file''' + self.debug( + { + "action": "import_csv", + "message": "Importing from CSV file", + "csv_file": csv_file + } + ) + if first_line_fields: + with open(csv_file, encoding=encoding) as file_object: + reader = csv.reader(file_object, delimiter=delimiter, quotechar=quotechar) + for field_names in reader: + self.debug( + { + "action": "import_csv", + "message": "Fields in the first row", + "field_names": field_names + } + ) + break + else: + if fieldnames is None: + self.error( + { + "action": "import_csv", + "message": "You must indicate a list of fieldnames or first-line-fields" + } + ) + sys.exit(4) + field_names = fieldnames.split(',') + with open(csv_file, encoding=encoding) as file_object: + reader = csv.DictReader( + file_object, fieldnames=field_names, + delimiter=delimiter, quotechar=quotechar + ) + count = 0 + r_count = 0 + for row in reader: + if r_count > 0: + password = row + self.debug( + { + "action": "import_csv", + "message": "Importing password", + "object": password + } + ) + if self.create_password(password, update=update): + count += 1 + if count == limit: + break + r_count += 1 + self.debug( + { + "action": "import_csv", + "message": "Importation summary", + "rows_in_file": r_count, + "imported_password": count + } + ) + return True + def _init_log(self): ''' Initialize log object ''' self._log = logging.getLogger("nc_password_client") @@ -1449,17 +1544,30 @@ class NcPasswordClient: '--https-proxy', '-P', help='HTTPS proxy to use to connect to the Nextcloud instance' ) +@click.option( + '--field-replacements', '-F', + multiple=True, + default=[ + "login:username", + "uri:url", + "pass:password", + "name:label", + "user_name:username", + "web:url", + ], + help='Pairs of field names to be replaced separated by a : (colon). Like: -F login:username' +) @click_config_file.configuration_option() @click.pass_context def cli( ctx, debug_level, log_file, host, user, api_token, cse_password, - timeout, cache_duration, https_proxy, output_format + timeout, cache_duration, https_proxy, output_format, field_replacements ): '''Client function to pass context''' ctx.ensure_object(dict) ctx.obj['NcPasswordClient'] = NcPasswordClient( debug_level, log_file, host, user, api_token, cse_password, - timeout, cache_duration, https_proxy, output_format + timeout, cache_duration, https_proxy, output_format, field_replacements ) @cli.command() @@ -1556,7 +1664,7 @@ def ls(ctx): '--update', '-u', is_flag=True, default=False, - help='Update exdisting passwords with the data from Password Store' + help='Update existing passwords with the data from Password Store' ) @click_config_file.configuration_option() @click.pass_context @@ -1639,7 +1747,7 @@ def search(ctx, pattern, limit): '--update', '-u', is_flag=True, default=False, - help='Update exdisting passwords with the data from Password Store' + help='Update existing passwords with the data from Password Store' ) @click_config_file.configuration_option() @click.pass_context @@ -1659,5 +1767,59 @@ def delete_all_passwords(ctx, yes_i_am_sure): '''Delete all passwords''' ctx.obj['NcPasswordClient'].delete_all_passwords(yes_i_am_sure) +@cli.command() +@click.option( + '--limit', '-l', + default=-1, + help='Maximun number of passwords to migrate. -1 for unlimited.' +) +@click.option( + '--update', '-u', + is_flag=True, + default=False, + help='Update existing passwords with the data from Password Store' +) +@click.option( + '--csv-file', '-c', + required=True, + type=click.Path(exists=True), + help='CSV file with your passwords to import' +) +@click.option( + '--encoding', '-e', + default='utf-8', + help='Character encoding of the CSV file' +) +@click.option( + '--delimiter', '-d', + default=';', + help='Character delimiter of columns in the rows of the CSV file' +) +@click.option( + '--quotechar', '-q', + default='"', + help='Character to quote strings in the rows of the CSV file' +) +@click.option( + '--first-line-fields', '-f', + is_flag=True, + default=True, + help='The first line contains the field names' +) +@click.option( + '--fieldnames', '-F', + default='name,username,password,url', + help='Field names separated by commans in case the first line does NOT contain the field names' +) +@click_config_file.configuration_option() +@click.pass_context +def import_csv( + ctx, limit, update, csv_file, encoding, + delimiter, quotechar, first_line_fields, fieldnames): + '''Migrate Password Store passwords to Nextcloud Passwords''' + ctx.obj['NcPasswordClient'].import_csv( + limit, update, csv_file, encoding, + delimiter, quotechar, first_line_fields, fieldnames) + if __name__ == "__main__": cli(obj={})