Compare commits

..

3 commits

2 changed files with 72 additions and 139 deletions

View file

@ -5,18 +5,17 @@
# (c) 2022 Antonio J. Delgado # (c) 2022 Antonio J. Delgado
''' Find duplicate contacts ''' ''' Find duplicate contacts '''
import json
import sys import sys
import os import os
import logging import logging
from logging.handlers import SysLogHandler from logging.handlers import SysLogHandler
import shutil import shutil
from pprint import pprint from pprint import pprint
import json
import click import click
import click_config_file import click_config_file
import vobject import vobject
import deepdiff import deepdiff
from webdav3.client import Client
class FindDuplicateContacts: class FindDuplicateContacts:
@ -38,119 +37,78 @@ class FindDuplicateContacts:
'__project_codename__.log' '__project_codename__.log'
) )
self._init_log() self._init_log()
self._log.debug(
"Will ignore fields '%s'",
self.config['ignore_field']
)
self.entries = []
self.duplicates_folder = os.path.join( self.duplicates_folder = os.path.join(
self.config['directory'], self.config['directory'],
self.config['duplicates_destination'] self.config['duplicates_destination']
) )
if 'uri' in self.config and self.config['uri'] != '' and self.config['uri']:
if not self._check_connection():
sys.exit(1)
self.mode = 'web'
self._web_mode()
else:
self.mode = 'local'
self._local_mode()
self._read_cards()
self.compare_cards()
def _local_mode(self):
if not os.path.exists(self.duplicates_folder): if not os.path.exists(self.duplicates_folder):
os.mkdir(self.duplicates_folder) os.mkdir(self.duplicates_folder)
self.entries = []
for entry in os.scandir(self.config['directory']): for entry in os.scandir(self.config['directory']):
self._log.debug( self.entries.append(entry)
"Found entry '%s' in '%s'",
entry.path,
self.config['directory']
)
if not entry.is_dir():
self.entries.append(entry.path)
def _web_mode(self): self.read_cards()
if not self.client.check(self.duplicates_folder):
self.client.mkdir(self.duplicates_folder)
for entry in self.client.list(self.config['directory']):
if entry != f"{self.config['directory']}/":
entry_path = os.path.join(self.config['directory'], entry)
self._log.debug(
"Found entry '%s' in '%s'",
entry,
self.config['directory']
)
if not self.client.is_dir(os.path.join(self.config['directory'], entry)):
self.entries.append(entry_path)
def _check_connection(self): self.compare_cards()
options = {
'webdav_hostname': f"{self.config['uri']}/",
'webdav_login': self.config['username'],
'webdav_password': self.config['password']
}
self._log.debug(
"Checking connection to '%s' as '%s'...",
self.config['uri'],
self.config['username']
)
self.client = Client(options)
return self.client.check('/')
def _read_cards(self): def read_cards(self):
'''Read all vCards''' '''Read all vCards'''
self.cards = [] self.cards = []
for entry in self.entries: for entry in self.entries:
self._log.debug( self._log.debug(
"Reading vcard '%s'...", "Reading vcard '%s'...",
entry entry.path
) )
card = {} card = {}
card['filename'] = entry card['filename'] = entry.path
card['content'] = {} card['content'] = {}
if self.mode == 'local': if not entry.is_dir():
with open(entry, 'r', encoding='UTF-8') as filep: with open(entry.path, 'r', encoding='UTF-8') as filep:
content=filep.read() content=filep.read()
else: if len(content) > 0:
content = self.client.read(entry) vcard = vobject.readOne(content)
if len(content) > 0: for key in vcard.contents.keys():
vcard = vobject.readOne(content) if key not in self.config['ignore_field']:
card['content'][key] = list()
for key in vcard.contents.keys(): for item in vcard.contents[key]:
if key not in self.config['ignore_field']: card['content'][key].append(item.value)
card['content'][key] = [] self.cards.append(card)
for item in vcard.contents[key]:
card['content'][key].append(item.value)
self.cards.append(card)
def compare_cards(self): def compare_cards(self):
'''Compare all vCards''' '''Compare all vCards'''
checked_cards = [] checked_cards = []
self.removed_cards = []
count = 0 count = 0
for card in self.cards: for card in self.cards:
count +=1 if card['filename'] not in self.removed_cards:
sys.stdout.write(f"Checking contact {count} of {len(self.cards)}\r") count +=1
duplicated = False print(f"Contact {count} of {len(self.cards)}:\b")
for checked_card in checked_cards: duplicated = False
if self.are_same_dict(card['content'], checked_card['content']): for checked_card in checked_cards:
duplicated = True if checked_card['filename'] not in self.removed_cards:
self._log.info( if self.are_same_dict(card['content'], checked_card['content']):
"Totally duplicates:\n '%s'\n '%s", duplicated = True
card['filename'], self._log.info(
checked_card['filename'] "Exact duplicates:\n '%s'\n '%s",
) card['filename'],
self._move( checked_card['filename']
card['filename'], )
os.path.join(self.duplicates_folder, os.path.basename(card['filename'])) if not self.config['dummy']:
) shutil.move(
if self.are_partially_same_dict(card['content'], checked_card['content'], key='fn'): card['filename'],
if self.manual_check_cards(card, checked_card): os.path.join(self.duplicates_folder, os.path.basename(card['filename']))
duplicated = True )
if not duplicated: else:
checked_cards.append(card) print(f"{card['content']}\n{checked_card['content']}.\nI would move '{card['filename']}'")
else:
if self.are_partially_same_dict(card['content'], checked_card['content'], key='fn'):
if self.manual_check_cards(card, checked_card):
duplicated = True
if not duplicated:
checked_cards.append(card)
self._log.info( self._log.info(
"Found %s unique cards", "Found %s unique cards",
len(checked_cards) len(checked_cards)
@ -175,6 +133,8 @@ class FindDuplicateContacts:
print("Differences:") print("Differences:")
ddiff = deepdiff.DeepDiff(card1['content'], card2['content'], ignore_order=True) ddiff = deepdiff.DeepDiff(card1['content'], card2['content'], ignore_order=True)
pprint(ddiff) pprint(ddiff)
if len(ddiff.keys()) == 0:
print('Exact matches')
advice1 = "" advice1 = ""
advice2 = "" advice2 = ""
if len(ddiff.keys()) == 1: if len(ddiff.keys()) == 1:
@ -188,60 +148,42 @@ class FindDuplicateContacts:
print('Anything else and we keep both') print('Anything else and we keep both')
option = input('What to do?') option = input('What to do?')
if option == "1": if option == "1":
self._move( if not self.config['dummy']:
card2['filename'], shutil.move(
os.path.join(self.duplicates_folder, os.path.basename(card2['filename'])) card2['filename'],
) os.path.join(self.duplicates_folder, os.path.basename(card2['filename']))
)
self.removed_cards.append(card2['filename'])
return True return True
elif option == "2": elif option == "2":
self._move( if not self.config['dummy']:
card1['filename'], shutil.move(
os.path.join(self.duplicates_folder, os.path.basename(card1['filename'])) card1['filename'],
) os.path.join(self.duplicates_folder, os.path.basename(card1['filename']))
)
self.removed_cards.append(card1['filename'])
return True return True
else: else:
print('Doing nothing.') print('Doing nothing.')
return False return False
def _move(self, source, destination):
if self.mode == 'local':
shutil.move(source, destination)
else:
self.client.move(source, destination)
def are_partially_same_dict(self, d1, d2, key='id'): def are_partially_same_dict(self, d1, d2, key='id'):
'''Test if two dictionaries are similar''' '''Test if two dictionaries are similar'''
if not isinstance(d1[key], list): if not isinstance(d1[key], list):
d1[key] = [ d1[key] ] d1[key] = [ d1[key] ]
d2[key] = [ d2[key] ] d2[key] = [ d2[key] ]
return False
if d1[key][0] == d2[key][0] or d1[key][0].lower() == d2[key][0].lower(): if d1[key][0] == d2[key][0] or d1[key][0].lower() == d2[key][0].lower():
return True return True
def are_same_dict(self, d1, d2): def are_same_dict(self, d1, d2):
'''Test if two dictionaries are equal''' '''Test if two dictionaries are equal'''
ddiff = deepdiff.DeepDiff(d1, d2, ignore_order=True) ddiff = deepdiff.DeepDiff(d1, d2, ignore_order=True)
if not ddiff: if len(ddiff.keys()) != 0:
return True if ('dictionary_item_added' in ddiff or
if 'dictionary_item_added' in ddiff or 'dictionary_item_removed' in ddiff: 'dictionary_item_removed' in ddiff or
return False 'values_changed' in ddiff):
if 'values_changed' in ddiff:
real_change = False
for key in ddiff['values_changed'].keys():
if isinstance(
ddiff['values_changed'][key]['new_value'],
str
):
if (ddiff['values_changed'][key]['new_value'].lower()
!=
ddiff['values_changed'][key]['old_value'].lower()):
real_change = True
if real_change:
# return False
# else:
# # print(ddiff)
return False return False
return False return True
def _init_log(self): def _init_log(self):
''' Initialize log object ''' ''' Initialize log object '''
@ -305,19 +247,11 @@ class FindDuplicateContacts:
help='Fields to ignore when considering duplicate contacts.', help='Fields to ignore when considering duplicate contacts.',
) )
@click.option( @click.option(
'--uri', '--dummy',
'-u', '-n',
help='URI to the WebDAV folder.' is_flag=True,
) default=False,
@click.option( help='Run without moving duplicate files.'
'--username',
'-U',
help='WebDAV user name for authentication.'
)
@click.option(
'--password',
'-p',
help='User password for WebDAV.'
) )
@click_config_file.configuration_option() @click_config_file.configuration_option()
def __main__(**kwargs): def __main__(**kwargs):

View file

@ -1,5 +1,4 @@
click click
click_config_file click_config_file
vobject vobject
deepdiff deepdiff
webdavclient3