Compare commits

..

1 commit

Author SHA1 Message Date
771e93b316 Add webdav support 2024-09-25 19:20:44 +03:00
2 changed files with 142 additions and 75 deletions

View file

@ -5,17 +5,18 @@
# (c) 2022 Antonio J. Delgado # (c) 2022 Antonio J. Delgado
''' Find duplicate contacts ''' ''' Find duplicate contacts '''
import json
import sys import sys
import os import os
import logging import logging
from logging.handlers import SysLogHandler from logging.handlers import SysLogHandler
import shutil import shutil
from pprint import pprint from pprint import pprint
import json
import click import click
import click_config_file import click_config_file
import vobject import vobject
import deepdiff import deepdiff
from webdav3.client import Client
class FindDuplicateContacts: class FindDuplicateContacts:
@ -37,42 +38,90 @@ class FindDuplicateContacts:
'__project_codename__.log' '__project_codename__.log'
) )
self._init_log() self._init_log()
self._log.debug(
"Will ignore fields '%s'",
self.config['ignore_field']
)
self.entries = []
self.duplicates_folder = os.path.join( self.duplicates_folder = os.path.join(
self.config['directory'], self.config['directory'],
self.config['duplicates_destination'] self.config['duplicates_destination']
) )
if not os.path.exists(self.duplicates_folder):
os.mkdir(self.duplicates_folder)
self.entries = []
for entry in os.scandir(self.config['directory']):
self.entries.append(entry)
self.read_cards()
if 'uri' in self.config and self.config['uri'] != '' and self.config['uri']:
if not self._check_connection():
sys.exit(1)
self.mode = 'web'
self._web_mode()
else:
self.mode = 'local'
self._local_mode()
self._read_cards()
self.compare_cards() self.compare_cards()
def read_cards(self): def _local_mode(self):
if not os.path.exists(self.duplicates_folder):
os.mkdir(self.duplicates_folder)
for entry in os.scandir(self.config['directory']):
self._log.debug(
"Found entry '%s' in '%s'",
entry.path,
self.config['directory']
)
if not entry.is_dir():
self.entries.append(entry.path)
def _web_mode(self):
if not self.client.check(self.duplicates_folder):
self.client.mkdir(self.duplicates_folder)
for entry in self.client.list(self.config['directory']):
if entry != f"{self.config['directory']}/":
entry_path = os.path.join(self.config['directory'], entry)
self._log.debug(
"Found entry '%s' in '%s'",
entry,
self.config['directory']
)
if not self.client.is_dir(os.path.join(self.config['directory'], entry)):
self.entries.append(entry_path)
def _check_connection(self):
options = {
'webdav_hostname': f"{self.config['uri']}/",
'webdav_login': self.config['username'],
'webdav_password': self.config['password']
}
self._log.debug(
"Checking connection to '%s' as '%s'...",
self.config['uri'],
self.config['username']
)
self.client = Client(options)
return self.client.check('/')
def _read_cards(self):
'''Read all vCards''' '''Read all vCards'''
self.cards = [] self.cards = []
for entry in self.entries: for entry in self.entries:
self._log.debug( self._log.debug(
"Reading vcard '%s'...", "Reading vcard '%s'...",
entry.path entry
) )
card = {} card = {}
card['filename'] = entry.path card['filename'] = entry
card['content'] = {} card['content'] = {}
if not entry.is_dir(): if self.mode == 'local':
with open(entry.path, 'r', encoding='UTF-8') as filep: with open(entry, 'r', encoding='UTF-8') as filep:
content=filep.read() content=filep.read()
else:
content = self.client.read(entry)
if len(content) > 0: if len(content) > 0:
vcard = vobject.readOne(content) vcard = vobject.readOne(content)
for key in vcard.contents.keys(): for key in vcard.contents.keys():
if key not in self.config['ignore_field']: if key not in self.config['ignore_field']:
card['content'][key] = list() card['content'][key] = []
for item in vcard.contents[key]: for item in vcard.contents[key]:
card['content'][key].append(item.value) card['content'][key].append(item.value)
self.cards.append(card) self.cards.append(card)
@ -80,30 +129,23 @@ class FindDuplicateContacts:
def compare_cards(self): def compare_cards(self):
'''Compare all vCards''' '''Compare all vCards'''
checked_cards = [] checked_cards = []
self.removed_cards = []
count = 0 count = 0
for card in self.cards: for card in self.cards:
if card['filename'] not in self.removed_cards:
count +=1 count +=1
print(f"Contact {count} of {len(self.cards)}:\b") sys.stdout.write(f"Checking contact {count} of {len(self.cards)}\r")
duplicated = False duplicated = False
for checked_card in checked_cards: for checked_card in checked_cards:
if checked_card['filename'] not in self.removed_cards:
if self.are_same_dict(card['content'], checked_card['content']): if self.are_same_dict(card['content'], checked_card['content']):
duplicated = True duplicated = True
self._log.info( self._log.info(
"Exact duplicates:\n '%s'\n '%s", "Totally duplicates:\n '%s'\n '%s",
card['filename'], card['filename'],
checked_card['filename'] checked_card['filename']
) )
if not self.config['dummy']: self._move(
shutil.move(
card['filename'], card['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card['filename'])) os.path.join(self.duplicates_folder, os.path.basename(card['filename']))
) )
else:
print(f"{card['content']}\n{checked_card['content']}.\nI would move '{card['filename']}'")
else:
if self.are_partially_same_dict(card['content'], checked_card['content'], key='fn'): if self.are_partially_same_dict(card['content'], checked_card['content'], key='fn'):
if self.manual_check_cards(card, checked_card): if self.manual_check_cards(card, checked_card):
duplicated = True duplicated = True
@ -133,8 +175,6 @@ class FindDuplicateContacts:
print("Differences:") print("Differences:")
ddiff = deepdiff.DeepDiff(card1['content'], card2['content'], ignore_order=True) ddiff = deepdiff.DeepDiff(card1['content'], card2['content'], ignore_order=True)
pprint(ddiff) pprint(ddiff)
if len(ddiff.keys()) == 0:
print('Exact matches')
advice1 = "" advice1 = ""
advice2 = "" advice2 = ""
if len(ddiff.keys()) == 1: if len(ddiff.keys()) == 1:
@ -148,42 +188,60 @@ class FindDuplicateContacts:
print('Anything else and we keep both') print('Anything else and we keep both')
option = input('What to do?') option = input('What to do?')
if option == "1": if option == "1":
if not self.config['dummy']: self._move(
shutil.move(
card2['filename'], card2['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card2['filename'])) os.path.join(self.duplicates_folder, os.path.basename(card2['filename']))
) )
self.removed_cards.append(card2['filename'])
return True return True
elif option == "2": elif option == "2":
if not self.config['dummy']: self._move(
shutil.move(
card1['filename'], card1['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card1['filename'])) os.path.join(self.duplicates_folder, os.path.basename(card1['filename']))
) )
self.removed_cards.append(card1['filename'])
return True return True
else: else:
print('Doing nothing.') print('Doing nothing.')
return False return False
def _move(self, source, destination):
if self.mode == 'local':
shutil.move(source, destination)
else:
self.client.move(source, destination)
def are_partially_same_dict(self, d1, d2, key='id'): def are_partially_same_dict(self, d1, d2, key='id'):
'''Test if two dictionaries are similar''' '''Test if two dictionaries are similar'''
if not isinstance(d1[key], list): if not isinstance(d1[key], list):
d1[key] = [ d1[key] ] d1[key] = [ d1[key] ]
d2[key] = [ d2[key] ] d2[key] = [ d2[key] ]
return False
if d1[key][0] == d2[key][0] or d1[key][0].lower() == d2[key][0].lower(): if d1[key][0] == d2[key][0] or d1[key][0].lower() == d2[key][0].lower():
return True return True
def are_same_dict(self, d1, d2): def are_same_dict(self, d1, d2):
'''Test if two dictionaries are equal''' '''Test if two dictionaries are equal'''
ddiff = deepdiff.DeepDiff(d1, d2, ignore_order=True) ddiff = deepdiff.DeepDiff(d1, d2, ignore_order=True)
if len(ddiff.keys()) != 0: if not ddiff:
if ('dictionary_item_added' in ddiff or
'dictionary_item_removed' in ddiff or
'values_changed' in ddiff):
return False
return True return True
if 'dictionary_item_added' in ddiff or 'dictionary_item_removed' in ddiff:
return False
if 'values_changed' in ddiff:
real_change = False
for key in ddiff['values_changed'].keys():
if isinstance(
ddiff['values_changed'][key]['new_value'],
str
):
if (ddiff['values_changed'][key]['new_value'].lower()
!=
ddiff['values_changed'][key]['old_value'].lower()):
real_change = True
if real_change:
# return False
# else:
# # print(ddiff)
return False
return False
def _init_log(self): def _init_log(self):
''' Initialize log object ''' ''' Initialize log object '''
@ -247,11 +305,19 @@ class FindDuplicateContacts:
help='Fields to ignore when considering duplicate contacts.', help='Fields to ignore when considering duplicate contacts.',
) )
@click.option( @click.option(
'--dummy', '--uri',
'-n', '-u',
is_flag=True, help='URI to the WebDAV folder.'
default=False, )
help='Run without moving duplicate files.' @click.option(
'--username',
'-U',
help='WebDAV user name for authentication.'
)
@click.option(
'--password',
'-p',
help='User password for WebDAV.'
) )
@click_config_file.configuration_option() @click_config_file.configuration_option()
def __main__(**kwargs): def __main__(**kwargs):

View file

@ -2,3 +2,4 @@ click
click_config_file click_config_file
vobject vobject
deepdiff deepdiff
webdavclient3