Compare commits

...

1 commit

Author SHA1 Message Date
771e93b316 Add webdav support 2024-09-25 19:20:44 +03:00
2 changed files with 119 additions and 47 deletions

View file

@ -5,6 +5,7 @@
# (c) 2022 Antonio J. Delgado
''' Find duplicate contacts '''
import json
import sys
import os
import logging
@ -15,6 +16,7 @@ import click
import click_config_file
import vobject
import deepdiff
from webdav3.client import Client
class FindDuplicateContacts:
@ -36,45 +38,93 @@ class FindDuplicateContacts:
'__project_codename__.log'
)
self._init_log()
self._log.debug(
"Will ignore fields '%s'",
self.config['ignore_field']
)
self.entries = []
self.duplicates_folder = os.path.join(
self.config['directory'],
self.config['duplicates_destination']
)
if not os.path.exists(self.duplicates_folder):
os.mkdir(self.duplicates_folder)
self.entries = []
for entry in os.scandir(self.config['directory']):
self.entries.append(entry)
self.read_cards()
if 'uri' in self.config and self.config['uri'] != '' and self.config['uri']:
if not self._check_connection():
sys.exit(1)
self.mode = 'web'
self._web_mode()
else:
self.mode = 'local'
self._local_mode()
self._read_cards()
self.compare_cards()
def read_cards(self):
def _local_mode(self):
if not os.path.exists(self.duplicates_folder):
os.mkdir(self.duplicates_folder)
for entry in os.scandir(self.config['directory']):
self._log.debug(
"Found entry '%s' in '%s'",
entry.path,
self.config['directory']
)
if not entry.is_dir():
self.entries.append(entry.path)
def _web_mode(self):
if not self.client.check(self.duplicates_folder):
self.client.mkdir(self.duplicates_folder)
for entry in self.client.list(self.config['directory']):
if entry != f"{self.config['directory']}/":
entry_path = os.path.join(self.config['directory'], entry)
self._log.debug(
"Found entry '%s' in '%s'",
entry,
self.config['directory']
)
if not self.client.is_dir(os.path.join(self.config['directory'], entry)):
self.entries.append(entry_path)
def _check_connection(self):
options = {
'webdav_hostname': f"{self.config['uri']}/",
'webdav_login': self.config['username'],
'webdav_password': self.config['password']
}
self._log.debug(
"Checking connection to '%s' as '%s'...",
self.config['uri'],
self.config['username']
)
self.client = Client(options)
return self.client.check('/')
def _read_cards(self):
'''Read all vCards'''
self.cards = []
for entry in self.entries:
self._log.debug(
"Reading vcard '%s'...",
entry.path
entry
)
card = {}
card['filename'] = entry.path
card['filename'] = entry
card['content'] = {}
if not entry.is_dir():
with open(entry.path, 'r', encoding='UTF-8') as filep:
if self.mode == 'local':
with open(entry, 'r', encoding='UTF-8') as filep:
content=filep.read()
if len(content) > 0:
vcard = vobject.readOne(content)
else:
content = self.client.read(entry)
for key in vcard.contents.keys():
if key not in self.config['ignore_field']:
card['content'][key] = list()
for item in vcard.contents[key]:
card['content'][key].append(item.value)
self.cards.append(card)
if len(content) > 0:
vcard = vobject.readOne(content)
for key in vcard.contents.keys():
if key not in self.config['ignore_field']:
card['content'][key] = []
for item in vcard.contents[key]:
card['content'][key].append(item.value)
self.cards.append(card)
def compare_cards(self):
'''Compare all vCards'''
@ -82,7 +132,7 @@ class FindDuplicateContacts:
count = 0
for card in self.cards:
count +=1
print(f"Contact {count} of {len(self.cards)}:\b")
sys.stdout.write(f"Checking contact {count} of {len(self.cards)}\r")
duplicated = False
for checked_card in checked_cards:
if self.are_same_dict(card['content'], checked_card['content']):
@ -92,7 +142,7 @@ class FindDuplicateContacts:
card['filename'],
checked_card['filename']
)
shutil.move(
self._move(
card['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card['filename']))
)
@ -138,13 +188,13 @@ class FindDuplicateContacts:
print('Anything else and we keep both')
option = input('What to do?')
if option == "1":
shutil.move(
self._move(
card2['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card2['filename']))
)
return True
elif option == "2":
shutil.move(
self._move(
card1['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card1['filename']))
)
@ -153,39 +203,45 @@ class FindDuplicateContacts:
print('Doing nothing.')
return False
def _move(self, source, destination):
if self.mode == 'local':
shutil.move(source, destination)
else:
self.client.move(source, destination)
def are_partially_same_dict(self, d1, d2, key='id'):
'''Test if two dictionaries are similar'''
if not isinstance(d1[key], list):
d1[key] = [ d1[key] ]
d2[key] = [ d2[key] ]
return False
if d1[key][0] == d2[key][0] or d1[key][0].lower() == d2[key][0].lower():
return True
def are_same_dict(self, d1, d2):
'''Test if two dictionaries are equal'''
ddiff = deepdiff.DeepDiff(d1, d2, ignore_order=True)
if ddiff == dict():
if not ddiff:
return True
else:
if 'dictionary_item_added' in ddiff or 'dictionary_item_removed' in ddiff:
if 'dictionary_item_added' in ddiff or 'dictionary_item_removed' in ddiff:
return False
if 'values_changed' in ddiff:
real_change = False
for key in ddiff['values_changed'].keys():
if isinstance(
ddiff['values_changed'][key]['new_value'],
str
):
if (ddiff['values_changed'][key]['new_value'].lower()
!=
ddiff['values_changed'][key]['old_value'].lower()):
real_change = True
if real_change:
# return False
# else:
# # print(ddiff)
return False
else:
if 'values_changed' in ddiff:
real_change = False
for key in ddiff['values_changed'].keys():
if isinstance(
ddiff['values_changed'][key]['new_value'],
str
):
if (ddiff['values_changed'][key]['new_value'].lower()
!=
ddiff['values_changed'][key]['old_value'].lower()):
real_change = True
if real_change:
return False
else:
# print(ddiff)
return False
return False
def _init_log(self):
''' Initialize log object '''
@ -248,6 +304,21 @@ class FindDuplicateContacts:
],
help='Fields to ignore when considering duplicate contacts.',
)
@click.option(
'--uri',
'-u',
help='URI to the WebDAV folder.'
)
@click.option(
'--username',
'-U',
help='WebDAV user name for authentication.'
)
@click.option(
'--password',
'-p',
help='User password for WebDAV.'
)
@click_config_file.configuration_option()
def __main__(**kwargs):
return FindDuplicateContacts(**kwargs)

View file

@ -1,4 +1,5 @@
click
click_config_file
vobject
deepdiff
deepdiff
webdavclient3