Compare commits

..

1 commit

Author SHA1 Message Date
771e93b316 Add webdav support 2024-09-25 19:20:44 +03:00
2 changed files with 142 additions and 75 deletions

View file

@ -5,17 +5,18 @@
# (c) 2022 Antonio J. Delgado
''' Find duplicate contacts '''
import json
import sys
import os
import logging
from logging.handlers import SysLogHandler
import shutil
from pprint import pprint
import json
import click
import click_config_file
import vobject
import deepdiff
from webdav3.client import Client
class FindDuplicateContacts:
@ -37,78 +38,119 @@ class FindDuplicateContacts:
'__project_codename__.log'
)
self._init_log()
self._log.debug(
"Will ignore fields '%s'",
self.config['ignore_field']
)
self.entries = []
self.duplicates_folder = os.path.join(
self.config['directory'],
self.config['duplicates_destination']
)
if not os.path.exists(self.duplicates_folder):
os.mkdir(self.duplicates_folder)
self.entries = []
for entry in os.scandir(self.config['directory']):
self.entries.append(entry)
self.read_cards()
if 'uri' in self.config and self.config['uri'] != '' and self.config['uri']:
if not self._check_connection():
sys.exit(1)
self.mode = 'web'
self._web_mode()
else:
self.mode = 'local'
self._local_mode()
self._read_cards()
self.compare_cards()
def read_cards(self):
def _local_mode(self):
if not os.path.exists(self.duplicates_folder):
os.mkdir(self.duplicates_folder)
for entry in os.scandir(self.config['directory']):
self._log.debug(
"Found entry '%s' in '%s'",
entry.path,
self.config['directory']
)
if not entry.is_dir():
self.entries.append(entry.path)
def _web_mode(self):
if not self.client.check(self.duplicates_folder):
self.client.mkdir(self.duplicates_folder)
for entry in self.client.list(self.config['directory']):
if entry != f"{self.config['directory']}/":
entry_path = os.path.join(self.config['directory'], entry)
self._log.debug(
"Found entry '%s' in '%s'",
entry,
self.config['directory']
)
if not self.client.is_dir(os.path.join(self.config['directory'], entry)):
self.entries.append(entry_path)
def _check_connection(self):
options = {
'webdav_hostname': f"{self.config['uri']}/",
'webdav_login': self.config['username'],
'webdav_password': self.config['password']
}
self._log.debug(
"Checking connection to '%s' as '%s'...",
self.config['uri'],
self.config['username']
)
self.client = Client(options)
return self.client.check('/')
def _read_cards(self):
'''Read all vCards'''
self.cards = []
for entry in self.entries:
self._log.debug(
"Reading vcard '%s'...",
entry.path
entry
)
card = {}
card['filename'] = entry.path
card['filename'] = entry
card['content'] = {}
if not entry.is_dir():
with open(entry.path, 'r', encoding='UTF-8') as filep:
if self.mode == 'local':
with open(entry, 'r', encoding='UTF-8') as filep:
content=filep.read()
if len(content) > 0:
vcard = vobject.readOne(content)
else:
content = self.client.read(entry)
for key in vcard.contents.keys():
if key not in self.config['ignore_field']:
card['content'][key] = list()
for item in vcard.contents[key]:
card['content'][key].append(item.value)
self.cards.append(card)
if len(content) > 0:
vcard = vobject.readOne(content)
for key in vcard.contents.keys():
if key not in self.config['ignore_field']:
card['content'][key] = []
for item in vcard.contents[key]:
card['content'][key].append(item.value)
self.cards.append(card)
def compare_cards(self):
'''Compare all vCards'''
checked_cards = []
self.removed_cards = []
count = 0
for card in self.cards:
if card['filename'] not in self.removed_cards:
count +=1
print(f"Contact {count} of {len(self.cards)}:\b")
duplicated = False
for checked_card in checked_cards:
if checked_card['filename'] not in self.removed_cards:
if self.are_same_dict(card['content'], checked_card['content']):
duplicated = True
self._log.info(
"Exact duplicates:\n '%s'\n '%s",
card['filename'],
checked_card['filename']
)
if not self.config['dummy']:
shutil.move(
card['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card['filename']))
)
else:
print(f"{card['content']}\n{checked_card['content']}.\nI would move '{card['filename']}'")
else:
if self.are_partially_same_dict(card['content'], checked_card['content'], key='fn'):
if self.manual_check_cards(card, checked_card):
duplicated = True
if not duplicated:
checked_cards.append(card)
count +=1
sys.stdout.write(f"Checking contact {count} of {len(self.cards)}\r")
duplicated = False
for checked_card in checked_cards:
if self.are_same_dict(card['content'], checked_card['content']):
duplicated = True
self._log.info(
"Totally duplicates:\n '%s'\n '%s",
card['filename'],
checked_card['filename']
)
self._move(
card['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card['filename']))
)
if self.are_partially_same_dict(card['content'], checked_card['content'], key='fn'):
if self.manual_check_cards(card, checked_card):
duplicated = True
if not duplicated:
checked_cards.append(card)
self._log.info(
"Found %s unique cards",
len(checked_cards)
@ -133,8 +175,6 @@ class FindDuplicateContacts:
print("Differences:")
ddiff = deepdiff.DeepDiff(card1['content'], card2['content'], ignore_order=True)
pprint(ddiff)
if len(ddiff.keys()) == 0:
print('Exact matches')
advice1 = ""
advice2 = ""
if len(ddiff.keys()) == 1:
@ -148,42 +188,60 @@ class FindDuplicateContacts:
print('Anything else and we keep both')
option = input('What to do?')
if option == "1":
if not self.config['dummy']:
shutil.move(
card2['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card2['filename']))
)
self.removed_cards.append(card2['filename'])
self._move(
card2['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card2['filename']))
)
return True
elif option == "2":
if not self.config['dummy']:
shutil.move(
card1['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card1['filename']))
)
self.removed_cards.append(card1['filename'])
self._move(
card1['filename'],
os.path.join(self.duplicates_folder, os.path.basename(card1['filename']))
)
return True
else:
print('Doing nothing.')
return False
def _move(self, source, destination):
if self.mode == 'local':
shutil.move(source, destination)
else:
self.client.move(source, destination)
def are_partially_same_dict(self, d1, d2, key='id'):
'''Test if two dictionaries are similar'''
if not isinstance(d1[key], list):
d1[key] = [ d1[key] ]
d2[key] = [ d2[key] ]
return False
if d1[key][0] == d2[key][0] or d1[key][0].lower() == d2[key][0].lower():
return True
def are_same_dict(self, d1, d2):
'''Test if two dictionaries are equal'''
ddiff = deepdiff.DeepDiff(d1, d2, ignore_order=True)
if len(ddiff.keys()) != 0:
if ('dictionary_item_added' in ddiff or
'dictionary_item_removed' in ddiff or
'values_changed' in ddiff):
if not ddiff:
return True
if 'dictionary_item_added' in ddiff or 'dictionary_item_removed' in ddiff:
return False
if 'values_changed' in ddiff:
real_change = False
for key in ddiff['values_changed'].keys():
if isinstance(
ddiff['values_changed'][key]['new_value'],
str
):
if (ddiff['values_changed'][key]['new_value'].lower()
!=
ddiff['values_changed'][key]['old_value'].lower()):
real_change = True
if real_change:
# return False
# else:
# # print(ddiff)
return False
return True
return False
def _init_log(self):
''' Initialize log object '''
@ -247,11 +305,19 @@ class FindDuplicateContacts:
help='Fields to ignore when considering duplicate contacts.',
)
@click.option(
'--dummy',
'-n',
is_flag=True,
default=False,
help='Run without moving duplicate files.'
'--uri',
'-u',
help='URI to the WebDAV folder.'
)
@click.option(
'--username',
'-U',
help='WebDAV user name for authentication.'
)
@click.option(
'--password',
'-p',
help='User password for WebDAV.'
)
@click_config_file.configuration_option()
def __main__(**kwargs):

View file

@ -1,4 +1,5 @@
click
click_config_file
vobject
deepdiff
deepdiff
webdavclient3