Add import CSV file command

This commit is contained in:
Antonio J. Delgado 2024-11-22 14:52:15 +02:00
parent 9c18aee704
commit f539617594

View file

@ -17,6 +17,7 @@ import os
import time
import re
import logging
import csv
from xml.dom import minidom
import binascii
import click
@ -121,6 +122,16 @@ class NextcloudHandler:
'.cache',
'nc_password_client.cache'
)
self.field_replacements = {}
for field_replacement in params['field_replacements']:
key, value = field_replacement.split(':')
self.field_replacements[key] = value
self.debug(
{
"action": "Initializing Netcloud handler",
"field_replacements": self.field_replacements
}
)
self._read_cache()
self.session = requests.Session()
headers = {
@ -830,27 +841,43 @@ class NextcloudHandler:
def create_password(self, post_obj, update):
'''Create/add a password'''
if post_obj.get('folder', '') != '':
folder_id = self.get_folder_id(post_obj['folder'])
new_obj = {}
for key in post_obj.keys():
if key in self.field_replacements:
new_obj[self.field_replacements[key]] = post_obj[key]
else:
new_obj[key] = post_obj[key]
self.debug(
{
"action": "create_password",
"message": "Result of replacing fields",
"original_object": post_obj,
"new_object": new_obj
}
)
if new_obj.get('folder', '') != '':
folder_id = self.get_folder_id(new_obj['folder'])
if not folder_id:
folder_id = self.create_passwords_folder(post_obj['folder'])['id']
post_obj['folder'] = folder_id
folder_id = self.create_passwords_folder(new_obj['folder'])['id']
new_obj['folder'] = folder_id
else:
post_obj.pop('folder', None)
if not 'username' in post_obj:
post_obj['username'] = ''
if not 'url' in post_obj:
post_obj['url'] = ''
safer_obj = dict(post_obj, **{ 'password': '***' })
exists_password = self.exists_password(post_obj)
new_obj.pop('folder', None)
if not 'username' in new_obj:
new_obj['username'] = ''
if not 'url' in new_obj:
new_obj['url'] = ''
exists_password = self.exists_password(new_obj)
if not exists_password:
try:
self.debug(
{ "action": "create_password", "object": safer_obj }
{
"action": "create_password",
"object": new_obj
}
)
r = self.session.post(
f'{self.http}://{self.host}/index.php/apps/passwords/api/1.0/password/create',
data=post_obj,
data=new_obj,
headers=self.headers(),
auth=(self.user, self.token),
verify=self.ssl, timeout=self.timeout
@ -858,16 +885,16 @@ class NextcloudHandler:
if r.status_code == 201:
if self.cache['cached_passwords']:
self.cache['cached_passwords'].append(post_obj)
self.cache['cached_passwords'].append(new_obj)
else:
self.cache['cached_passwords'] = [ post_obj ]
self.cache['cached_passwords'] = [ new_obj ]
return r.json()
self.error(r.json())
self.error(
{
"action": "create_password",
"message": f"Nextcloud instance returned status code: {r.status_code}",
"object": post_obj
"object": new_obj
}
)
except requests.exceptions.ReadTimeout as error:
@ -875,20 +902,20 @@ class NextcloudHandler:
{
"action": "create_password",
"message": f"Timeout ({self.timeout} sec) error doing GET request",
"object": post_obj,
"object": new_obj,
"error": error
}
)
else:
if update:
post_obj['id'] = exists_password['id']
return self.update_password(post_obj)
new_obj['id'] = exists_password['id']
return self.update_password(new_obj)
self.warning(
{
"action": "create_password",
"message": "Password with that name/label already exists",
"object": safer_obj,
"label": post_obj['label']
"object": new_obj,
"label": new_obj['label']
}
)
return False
@ -1035,7 +1062,7 @@ class NcPasswordClient:
def __init__(
self, debug_level, log_file, host, user, api_token, cse_password,
timeout, cache_duration, https_proxy, output_format
timeout, cache_duration, https_proxy, output_format, field_replacements
):
self.config = {}
self.output_format = output_format
@ -1063,6 +1090,7 @@ class NcPasswordClient:
"https_proxy": https_proxy,
"logger": self._log,
"output_format": output_format,
"field_replacements": field_replacements,
}
self.nc = NextcloudHandler(params)
@ -1146,8 +1174,9 @@ class NcPasswordClient:
'''List all password folders'''
self.info(self.nc.list_passwords_folders())
def create_password(self, obj, update, label, password, folder, username,
url, notes, custom_fields, sha1_hash, hidden, favorite):
def create_password(
self, obj, update, label=None, password=None, folder=None, username=None,
url=None, notes=None, custom_fields=None, sha1_hash=None, hidden=None, favorite=None):
'''Create a password with an object'''
if obj is None:
obj = {}
@ -1377,6 +1406,72 @@ class NcPasswordClient:
)
return False
def import_csv(
self, limit, update, csv_file, encoding='utf-8',
delimiter=';', quotechar='"', first_line_fields=True, fieldnames=None
):
'''Import passwords from a CSV file'''
self.debug(
{
"action": "import_csv",
"message": "Importing from CSV file",
"csv_file": csv_file
}
)
if first_line_fields:
with open(csv_file, encoding=encoding) as file_object:
reader = csv.reader(file_object, delimiter=delimiter, quotechar=quotechar)
for field_names in reader:
self.debug(
{
"action": "import_csv",
"message": "Fields in the first row",
"field_names": field_names
}
)
break
else:
if fieldnames is None:
self.error(
{
"action": "import_csv",
"message": "You must indicate a list of fieldnames or first-line-fields"
}
)
sys.exit(4)
field_names = fieldnames.split(',')
with open(csv_file, encoding=encoding) as file_object:
reader = csv.DictReader(
file_object, fieldnames=field_names,
delimiter=delimiter, quotechar=quotechar
)
count = 0
r_count = 0
for row in reader:
if r_count > 0:
password = row
self.debug(
{
"action": "import_csv",
"message": "Importing password",
"object": password
}
)
if self.create_password(password, update=update):
count += 1
if count == limit:
break
r_count += 1
self.debug(
{
"action": "import_csv",
"message": "Importation summary",
"rows_in_file": r_count,
"imported_password": count
}
)
return True
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("nc_password_client")
@ -1449,17 +1544,30 @@ class NcPasswordClient:
'--https-proxy', '-P',
help='HTTPS proxy to use to connect to the Nextcloud instance'
)
@click.option(
'--field-replacements', '-F',
multiple=True,
default=[
"login:username",
"uri:url",
"pass:password",
"name:label",
"user_name:username",
"web:url",
],
help='Pairs of field names to be replaced separated by a : (colon). Like: -F login:username'
)
@click_config_file.configuration_option()
@click.pass_context
def cli(
ctx, debug_level, log_file, host, user, api_token, cse_password,
timeout, cache_duration, https_proxy, output_format
timeout, cache_duration, https_proxy, output_format, field_replacements
):
'''Client function to pass context'''
ctx.ensure_object(dict)
ctx.obj['NcPasswordClient'] = NcPasswordClient(
debug_level, log_file, host, user, api_token, cse_password,
timeout, cache_duration, https_proxy, output_format
timeout, cache_duration, https_proxy, output_format, field_replacements
)
@cli.command()
@ -1556,7 +1664,7 @@ def ls(ctx):
'--update', '-u',
is_flag=True,
default=False,
help='Update exdisting passwords with the data from Password Store'
help='Update existing passwords with the data from Password Store'
)
@click_config_file.configuration_option()
@click.pass_context
@ -1639,7 +1747,7 @@ def search(ctx, pattern, limit):
'--update', '-u',
is_flag=True,
default=False,
help='Update exdisting passwords with the data from Password Store'
help='Update existing passwords with the data from Password Store'
)
@click_config_file.configuration_option()
@click.pass_context
@ -1659,5 +1767,59 @@ def delete_all_passwords(ctx, yes_i_am_sure):
'''Delete all passwords'''
ctx.obj['NcPasswordClient'].delete_all_passwords(yes_i_am_sure)
@cli.command()
@click.option(
'--limit', '-l',
default=-1,
help='Maximun number of passwords to migrate. -1 for unlimited.'
)
@click.option(
'--update', '-u',
is_flag=True,
default=False,
help='Update existing passwords with the data from Password Store'
)
@click.option(
'--csv-file', '-c',
required=True,
type=click.Path(exists=True),
help='CSV file with your passwords to import'
)
@click.option(
'--encoding', '-e',
default='utf-8',
help='Character encoding of the CSV file'
)
@click.option(
'--delimiter', '-d',
default=';',
help='Character delimiter of columns in the rows of the CSV file'
)
@click.option(
'--quotechar', '-q',
default='"',
help='Character to quote strings in the rows of the CSV file'
)
@click.option(
'--first-line-fields', '-f',
is_flag=True,
default=True,
help='The first line contains the field names'
)
@click.option(
'--fieldnames', '-F',
default='name,username,password,url',
help='Field names separated by commans in case the first line does NOT contain the field names'
)
@click_config_file.configuration_option()
@click.pass_context
def import_csv(
ctx, limit, update, csv_file, encoding,
delimiter, quotechar, first_line_fields, fieldnames):
'''Migrate Password Store passwords to Nextcloud Passwords'''
ctx.obj['NcPasswordClient'].import_csv(
limit, update, csv_file, encoding,
delimiter, quotechar, first_line_fields, fieldnames)
if __name__ == "__main__":
cli(obj={})