2023-01-25 13:26:45 +01:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2023 Antonio J. Delgado
# Given two directories, find files in first directory that are present in the second by checking hashes
import sys
import os
import logging
import click
import click_config_file
from logging . handlers import SysLogHandler
2023-01-25 13:53:35 +01:00
import zlib
2023-01-25 15:28:59 +01:00
import sqlite3
2023-01-25 13:26:45 +01:00
class find_duplicate_files :
def __init__ ( self , debug_level , log_file , dummy , first_directory , second_directory ) :
''' Initial function called when object is created '''
self . config = dict ( )
self . config [ ' debug_level ' ] = debug_level
if log_file is None :
log_file = os . path . join ( os . environ . get ( ' HOME ' , os . environ . get ( ' USERPROFILE ' , os . getcwd ( ) ) ) , ' log ' , ' find_duplicate_files.log ' )
self . config [ ' log_file ' ] = log_file
self . _init_log ( )
self . dummy = dummy
self . first_directory = first_directory
self . second_directory = second_directory
2023-01-25 15:28:59 +01:00
self . _init_db_cache ( )
2023-01-25 13:30:40 +01:00
first_files = self . recursive_scandir ( self . first_directory )
2023-01-25 13:31:50 +01:00
self . _log . debug ( f " Found { len ( first_files ) } files in first directory ' { first_directory } ' " )
2023-01-25 13:30:40 +01:00
second_files = self . recursive_scandir ( self . second_directory )
2023-01-25 13:31:50 +01:00
self . _log . debug ( f " Found { len ( second_files ) } files in second directory ' { second_directory } ' " )
2023-01-25 13:30:40 +01:00
2023-01-25 13:56:09 +01:00
total = len ( first_files )
count = 0
2023-01-25 13:41:31 +01:00
for hash in first_files :
2023-01-25 13:56:09 +01:00
count + = 1
2023-01-25 13:59:24 +01:00
self . _log . info ( f " # Checking file { count } of { total } " )
2023-01-25 13:41:31 +01:00
if hash in second_files :
2023-01-25 13:59:24 +01:00
self . _log . info ( f " #File ' { first_files [ hash ] } ' is dupe with ' { second_files [ hash ] } ' . " )
self . _log . info ( f " rm ' { first_files [ hash ] } ' " )
2023-01-25 13:41:31 +01:00
2023-01-25 15:28:59 +01:00
def _init_db_cache ( self , cache_file = ' /var/cache/find_duplicate_files.cache.sql ' ) :
self . cache_file = cache_file
self . cache_db = sqlite3 . connect ( self . cache_file )
self . cur = self . cache_db . cursor ( )
2023-01-25 15:29:39 +01:00
self . cur . execute ( " CREATE TABLE IF NOT EXISTS files(hash, file) " )
2023-01-25 15:28:59 +01:00
self . cache_db . commit ( )
def _check_file_cache ( self , file ) :
result = self . cur . execute ( f " SELECT hash FROM files WHERE file = ' { file } ' " )
row = result . fetchone ( )
2023-01-25 15:30:01 +01:00
if row and len ( row ) > 0 :
2023-01-25 15:28:59 +01:00
return row [ 0 ]
else :
return False
def _cache_file ( self , file , hash ) :
2023-01-25 15:30:55 +01:00
result = self . cur . execute ( f " INSERT INTO files (hash, file) VALUES ( ' { file } ' , ' { hash } ' ) " )
2023-01-25 15:31:17 +01:00
self . cache_db . commit ( )
2023-01-25 15:28:59 +01:00
return result
2023-01-25 13:30:40 +01:00
def recursive_scandir ( self , path , ignore_hidden_files = True ) :
''' Recursively scan a directory for files '''
2023-01-25 13:41:31 +01:00
files = dict ( )
2023-01-25 13:30:40 +01:00
try :
for file in os . scandir ( path ) :
if not file . name . startswith ( ' . ' ) :
if file . is_file ( ) :
2023-01-25 15:28:59 +01:00
check_cache = self . _check_file_cache ( file . path )
if check_cache :
files [ check_cache ] = file . path
else :
with open ( file . path , ' rb ' ) as file_pointer :
file_content = file_pointer . read ( )
hash = zlib . adler32 ( file_content )
files [ hash ] = file . path
self . _cache_file ( file . path , hash )
2023-01-25 13:30:40 +01:00
elif file . is_dir ( follow_symlinks = False ) :
more_files = self . recursive_scandir (
file . path ,
ignore_hidden_files = ignore_hidden_files
)
if more_files :
2023-01-25 13:44:46 +01:00
files = { * * files , * * more_files }
2023-01-25 13:30:40 +01:00
except PermissionError as error :
self . _log . warning ( f " Permission denied accessing folder ' { path } ' " )
2023-01-25 15:56:38 +01:00
self . _log . debug ( f " Found { len ( files ) } files in ' { path } ' . " )
2023-01-25 13:30:40 +01:00
return files
2023-01-25 13:26:45 +01:00
def _init_log ( self ) :
''' Initialize log object '''
self . _log = logging . getLogger ( " find_duplicate_files " )
self . _log . setLevel ( logging . DEBUG )
sysloghandler = SysLogHandler ( )
sysloghandler . setLevel ( logging . DEBUG )
self . _log . addHandler ( sysloghandler )
streamhandler = logging . StreamHandler ( sys . stdout )
streamhandler . setLevel ( logging . getLevelName ( self . config . get ( " debug_level " , ' INFO ' ) ) )
self . _log . addHandler ( streamhandler )
if ' log_file ' in self . config :
log_file = self . config [ ' log_file ' ]
else :
home_folder = os . environ . get ( ' HOME ' , os . environ . get ( ' USERPROFILE ' , ' ' ) )
log_folder = os . path . join ( home_folder , " log " )
log_file = os . path . join ( log_folder , " find_duplicate_files.log " )
if not os . path . exists ( os . path . dirname ( log_file ) ) :
os . mkdir ( os . path . dirname ( log_file ) )
filehandler = logging . handlers . RotatingFileHandler ( log_file , maxBytes = 102400000 )
# create formatter
formatter = logging . Formatter ( ' %(asctime)s %(name)-12s %(levelname)-8s %(message)s ' )
filehandler . setFormatter ( formatter )
filehandler . setLevel ( logging . DEBUG )
self . _log . addHandler ( filehandler )
return True
@click.command ( )
@click.option ( " --debug-level " , " -d " , default = " INFO " ,
type = click . Choice (
[ " CRITICAL " , " ERROR " , " WARNING " , " INFO " , " DEBUG " , " NOTSET " ] ,
case_sensitive = False ,
) , help = ' Set the debug level for the standard output. ' )
@click.option ( ' --log-file ' , ' -l ' , help = " File to store all debug messages. " )
@click.option ( " --dummy " , " -n " , is_flag = True , help = " Don ' t do anything, just show what would be done. " ) # Don't forget to add dummy to parameters of main function
@click.option ( ' --first-directory ' , ' -f ' , required = True , help = ' First directory to find files AND TO DELETE FILES FROM!!! ' )
@click.option ( ' --second-directory ' , ' -s ' , required = True , help = ' Second directory to find files ' )
@click_config_file.configuration_option ( )
def __main__ ( debug_level , log_file , dummy , first_directory , second_directory ) :
return find_duplicate_files ( debug_level , log_file , dummy , first_directory , second_directory )
if __name__ == " __main__ " :
__main__ ( )