find_duplicate_files/find_duplicate_files/find_duplicate_files.py

188 lines
8.2 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2023 Antonio J. Delgado
# Given two directories, find files in first directory that are present in the second by checking hashes
import sys
import os
import logging
import click
import click_config_file
from logging.handlers import SysLogHandler
import zlib
import sqlite3
import re
class find_duplicate_files:
def __init__(self, debug_level, log_file, dummy, first_directory, second_directory, exclude, limit, output_file):
''' Initial function called when object is created '''
self.config = dict()
self.config['debug_level'] = debug_level
if log_file is None:
log_file = os.path.join(os.environ.get('HOME', os.environ.get('USERPROFILE', os.getcwd())), 'log', 'find_duplicate_files.log')
self.config['log_file'] = log_file
self._init_log()
self.dummy = dummy
self.first_directory = first_directory
self.second_directory = second_directory
self.exclude = exclude
self.limit = limit
self.output_file = output_file
self._init_db_cache()
first_files = self.recursive_scandir(self.first_directory)
self._log.debug(f"# Found {len(first_files)} files in first directory '{first_directory}'")
second_files = self.recursive_scandir(self.second_directory)
self._log.debug(f"# Found {len(second_files)} files in second directory '{second_directory}'")
total = len(first_files)
count = 0
with open(self.output_file, 'w') as output_pointer:
for hash in first_files:
count += 1
self._log.info(f"# Checking file {count} of {total}")
if hash in second_files:
self._log.info(f"#File '{first_files[hash]}' is dupe with '{second_files[hash]}'.")
self._log.info(f"rm '{first_files[hash]}'")
output_pointer.write(f"rm '{first_files[hash]}'\n")
def _init_db_cache(self, cache_file='/var/cache/find_duplicate_files.cache.sql'):
self.cache_file = cache_file
self.cache_db = sqlite3.connect(self.cache_file)
self.cur = self.cache_db.cursor()
self.cur.execute("CREATE TABLE IF NOT EXISTS files(hash, file)")
self.cache_db.commit()
def _check_file_cache(self, file):
file_sql = file.replace("'", "''")
query = f"SELECT hash FROM files WHERE file = '{file_sql}'"
try:
result = self.cur.execute(query)
except Exception as error:
self._log.error(f"Error executing query '{query}'. {error}")
sys.exit(2)
row = result.fetchone()
if row and len(row) > 0:
return row[0]
else:
return False
def _cache_file(self, file, hash):
file_sql = file.replace("'", "''")
query = f"INSERT INTO files (hash, file) VALUES ('{file_sql}', '{hash}')"
try:
result = self.cur.execute(query)
except Exception as error:
self._log.error(f"Error executing query '{query}'. {error}")
sys.exit(3)
self.cache_db.commit()
return result
def _cache_size(self):
result = self.cur.execute('SELECT count(*) FROM files')
row = result.fetchone()
if row and len(row) > 0:
return row[0]
else:
return False
def _test_exclude(self, file_name):
""" Test if a filename match any of the exclusions """
for exclude in self.exclude:
match = re.search(exclude, file_name)
if match:
return True
else:
return False
def recursive_scandir(self, path, ignore_hidden_files=True):
''' Recursively scan a directory for files'''
files = dict()
if os.path.exists(path):
try:
for file in os.scandir(path):
if self.limit > 0 and len(files) > self.limit:
self._log.debug(f"# Limit of {self.limit} passed ({len(files)})")
break
if not file.name.startswith('.'):
if not self._test_exclude(file.path):
if file.is_file():
check_cache = self._check_file_cache(file.path)
if check_cache:
files[check_cache] = file.path
else:
with open(file.path, 'rb') as file_pointer:
file_content = file_pointer.read()
hash = zlib.adler32(file_content)
files[hash] = file.path
self._cache_file(file.path, hash)
elif file.is_dir(follow_symlinks=False):
more_files = self.recursive_scandir(
file.path,
ignore_hidden_files=ignore_hidden_files
)
if more_files:
files = { **files, **more_files }
except PermissionError as error:
self._log.warning(f"# Permission denied accessing folder '{path}'")
self._log.debug(f"# Found {len(files)} files in '{path}'. Cache contains {self._cache_size()} records.")
else:
self._log.warning(f"# Folder '{path}' doesn't exist")
return files
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("find_duplicate_files")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(logging.getLevelName(self.config.get("debug_level", 'INFO')))
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get('HOME', os.environ.get('USERPROFILE', ''))
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "find_duplicate_files.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(log_file, maxBytes=102400000)
# create formatter
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option("--debug-level", "-d", default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
), help='Set the debug level for the standard output.')
@click.option('--log-file', '-l', help="File to store all debug messages.")
@click.option("--dummy","-n", is_flag=True, help="Don't do anything, just show what would be done.") # Don't forget to add dummy to parameters of main function
@click.option('--first-directory', '-f', required=True, help='First directory to find files AND TO DELETE FILES FROM!!!')
@click.option('--second-directory', '-s', required=True, help='Second directory to find files')
@click.option('--exclude', '-e', multiple=True, help='Regular expression pattern to exclude from files and directories.')
@click.option('--limit', '-l', default=0, type=int, help='Limit to a certain number of files to check.')
@click.option('--output-file', '-o', default='/tmp/delete_duplicates_commands.sh', help='File to write the commands to delete duplicate files. USE WITH CAUTION!')
@click_config_file.configuration_option()
def __main__(debug_level, log_file, dummy, first_directory, second_directory, exclude, limit, output_file):
return find_duplicate_files(debug_level, log_file, dummy, first_directory, second_directory, exclude, limit, output_file)
if __name__ == "__main__":
__main__()