2021-09-06 14:52:20 +02:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2021 Antonio J. Delgado
#
import sys
import os
import logging
import json
2021-09-06 22:41:35 +02:00
import shutil
2021-09-06 22:54:14 +02:00
import re
2021-09-06 14:52:20 +02:00
import click
import click_config_file
from logging . handlers import SysLogHandler
import face_recognition
2021-09-06 20:47:28 +02:00
import pyexiv2
2021-09-06 15:15:02 +02:00
import PIL
2021-09-06 14:52:20 +02:00
class CustomFormatter ( logging . Formatter ) :
""" Logging colored formatter, adapted from https://stackoverflow.com/a/56944256/3638629 """
grey = ' \x1b [38;21m '
blue = ' \x1b [38;5;39m '
yellow = ' \x1b [38;5;226m '
red = ' \x1b [38;5;196m '
bold_red = ' \x1b [31;1m '
reset = ' \x1b [0m '
def __init__ ( self , fmt ) :
super ( ) . __init__ ( )
self . fmt = fmt
self . FORMATS = {
logging . DEBUG : self . grey + self . fmt + self . reset ,
logging . INFO : self . blue + self . fmt + self . reset ,
logging . WARNING : self . yellow + self . fmt + self . reset ,
logging . ERROR : self . red + self . fmt + self . reset ,
logging . CRITICAL : self . bold_red + self . fmt + self . reset
}
def format ( self , record ) :
log_fmt = self . FORMATS . get ( record . levelno )
formatter = logging . Formatter ( log_fmt )
return formatter . format ( record )
class image_classifier :
2021-09-06 22:46:25 +02:00
def __init__ ( self , debug_level , log_file , faces_directory , directory , no_move ) :
2021-09-06 14:52:20 +02:00
''' Initial function called when object is created '''
self . debug_level = debug_level
if log_file is None :
log_file = os . path . join ( os . environ . get ( ' HOME ' , os . environ . get ( ' USERPROFILE ' , os . getcwd ( ) ) ) , ' log ' , ' image_classifier.log ' )
self . log_file = log_file
self . _init_log ( )
2021-09-06 15:05:04 +02:00
self . faces_directory = faces_directory
2021-09-06 14:52:20 +02:00
self . directory = directory
self . known_people = self . load_known_people ( )
2021-09-06 22:46:25 +02:00
self . no_move = no_move
2021-09-06 15:06:18 +02:00
2021-09-06 14:52:20 +02:00
if os . access ( directory , os . R_OK ) :
with os . scandir ( directory ) as directory_item :
for entry in directory_item :
if not entry . name . startswith ( ' . ' ) and entry . is_file ( ) :
2021-09-06 22:42:26 +02:00
self . process_file ( os . path . join ( directory , entry . name ) )
2021-09-06 14:52:20 +02:00
def process_file ( self , file ) :
''' Process a file, find faces, add EXIF information and
move it to the folder of the day '''
2021-09-06 22:08:52 +02:00
self . _log . debug ( f " Looking for faces in file ' { file } ' ... " )
2021-09-08 14:36:34 +02:00
new_path = False
2021-09-06 22:41:35 +02:00
if not os . access ( file , os . R_OK ) :
self . _log . error ( f " The file ' { file } ' is not readable. " )
2021-09-06 15:49:07 +02:00
else :
2021-09-08 14:26:01 +02:00
if self . is_image ( file ) :
self . metadata = pyexiv2 . ImageMetadata ( file )
self . metadata . read ( )
dirname = os . path . dirname ( os . path . realpath ( file ) )
filename = os . path . basename ( file )
people = self . find_faces ( file )
if people :
self . _log . debug ( f " Found { len ( people ) } known people in the image. " )
self . _log . debug ( json . dumps ( people , indent = 2 ) )
if ' Xmp.iptcExt.PersonInImage ' in self . metadata . xmp_keys :
self . _log . debug ( f " People (before): { self . metadata [ ' Xmp.iptcExt.PersonInImage ' ] . raw_value } (type: { type ( self . metadata [ ' Xmp.iptcExt.PersonInImage ' ] . raw_value ) } ) " )
self . append_people ( people )
self . _log . debug ( f " People (after): { self . metadata [ ' Xmp.iptcExt.PersonInImage ' ] . raw_value } (type: { type ( self . metadata [ ' Xmp.iptcExt.PersonInImage ' ] . raw_value ) } ) " )
self . metadata . write ( )
self . _log . debug ( f " Updated file ' { file } ' . " )
2021-09-08 14:35:36 +02:00
if ' Exif.Photo.DateTimeOriginal ' in self . metadata . exif_keys :
original_date = self . metadata [ ' Exif.Photo.DateTimeOriginal ' ] . value
self . _log . debug ( f " File creation time: { original_date } (type: { type ( original_date ) } ) " )
new_path = os . path . join ( dirname , original_date . strftime ( ' % Y/ % m/ %d ' ) , filename )
if not new_path :
match = re . search ( r ' (?P<year>20[0-9] {2} )[ \ -/ \ ._]?(?P<month>[0-1]?[0-9])[ \ -/ \ ._]?(?P<day>[0-3]?[0-9]) ' , filename )
if match :
new_path = os . path . join ( dirname , match . group ( ' year ' ) , match . group ( ' month ' ) , match . group ( ' day ' ) , filename )
if not new_path :
match = re . search ( r ' (?P<day>[0-3]?[0-9])[ \ -/ \ ._]?(?P<month>[0-1]?[0-9])[ \ -/ \ ._]?(?P<year>20[0-9] {2} ) ' , filename )
if match :
new_path = os . path . join ( dirname , match . group ( ' year ' ) , match . group ( ' month ' ) , match . group ( ' day ' ) , filename )
if not new_path :
new_path = os . path . join ( dirname , ' unknown-time ' , filename )
os . makedirs ( os . path . dirname ( new_path ) , exist_ok = True )
if self . no_move == False :
self . _log . info ( f " Moving file ' { file } ' to ' { new_path } ' ... " )
shutil . move ( file , new_path )
else :
self . _log . info ( f " NOT moving file ' { file } ' to ' { new_path } ' because of --no-move " )
2021-09-08 13:41:40 +02:00
2021-09-06 22:08:52 +02:00
def print_metadata ( self ) :
print ( " IPTC keys: " )
for key in self . metadata . iptc_keys :
print ( f " { key } : ' { self . metadata [ key ] . raw_value } ' " )
print ( " EXIF keys: " )
for key in self . metadata . exif_keys :
print ( f " { key } : ' { self . metadata [ key ] . raw_value } ' " )
print ( " XMP keys: " )
for key in self . metadata . xmp_keys :
print ( f " { key } : ' { self . metadata [ key ] . raw_value } ' " )
2021-09-06 20:25:56 +02:00
def append_people ( self , people ) :
2021-09-06 22:08:52 +02:00
new_list = list ( )
if ' Xmp.iptcExt.PersonInImage ' in self . metadata . xmp_keys :
for person in self . metadata [ ' Xmp.iptcExt.PersonInImage ' ] . raw_value :
new_list . append ( person )
2021-09-06 16:04:19 +02:00
for person in people :
2021-09-06 22:08:52 +02:00
if person not in new_list :
self . _log . debug ( f " Adding person ' { person } ' ... " )
new_list . append ( person )
if ' Xmp.iptcExt.PersonInImage ' in self . metadata . xmp_keys :
self . metadata [ ' Xmp.iptcExt.PersonInImage ' ] . value = new_list
else :
self . metadata [ ' Xmp.iptcExt.PersonInImage ' ] = pyexiv2 . XmpTag ( ' Xmp.iptcExt.PersonInImage ' , new_list )
2021-09-06 16:04:19 +02:00
2021-09-06 15:54:51 +02:00
def is_json ( self , data ) :
2021-09-06 16:31:42 +02:00
try :
result = json . loads ( data )
except TypeError :
return False
2021-09-06 15:54:51 +02:00
return True
2021-09-06 14:52:20 +02:00
def load_known_people ( self ) :
known_people = list ( )
2021-09-06 15:10:57 +02:00
self . _log . debug ( f " Looking for known faces in directory ' { self . faces_directory } ' ... " )
2021-09-06 15:05:04 +02:00
if os . access ( self . faces_directory , os . R_OK ) :
with os . scandir ( self . faces_directory ) as faces_items :
2021-09-06 14:52:20 +02:00
for entry in faces_items :
2021-09-08 13:36:28 +02:00
if not entry . name . startswith ( ' . ' ) and entry . is_file ( ) and self . is_image ( self . faces_directory + os . sep + entry . name ) :
2021-09-06 22:08:52 +02:00
self . _log . debug ( f " Identifying face in file ' { entry . name } ' ... " )
2021-09-06 14:52:20 +02:00
person = dict ( )
2021-09-06 15:09:35 +02:00
person [ ' filename ' ] = face_recognition . load_image_file ( self . faces_directory + os . sep + entry . name )
2021-09-06 15:56:41 +02:00
person [ ' name ' ] = os . path . basename ( os . path . splitext ( self . faces_directory + os . sep + entry . name ) [ 0 ] )
2021-09-06 22:08:52 +02:00
encodings = face_recognition . face_encodings ( person [ ' filename ' ] )
if len ( encodings ) > 0 :
person [ ' encoding ' ] = encodings [ 0 ]
known_people . append ( person )
else :
self . _log . info ( f " No faces found in file ' { entry . name } ' . " )
2021-09-06 14:52:20 +02:00
return known_people
def find_faces ( self , file ) :
''' Find faces in an image/video file '''
2021-09-06 15:47:27 +02:00
people = list ( )
2021-09-08 13:36:28 +02:00
if self . is_image ( file ) :
2021-09-06 15:14:22 +02:00
image = face_recognition . load_image_file ( file )
encodings = face_recognition . face_encodings ( image )
2021-09-06 15:34:27 +02:00
self . _log . debug ( f " Found { len ( encodings ) } faces. " )
2021-09-06 15:14:22 +02:00
for known_person in self . known_people :
2021-09-06 15:37:13 +02:00
for encoding in encodings :
2021-09-06 15:43:33 +02:00
if face_recognition . compare_faces ( [ known_person [ ' encoding ' ] ] , encoding ) [ 0 ] :
2021-09-06 16:13:28 +02:00
if known_person [ ' name ' ] not in people :
people . append ( known_person [ ' name ' ] )
2021-09-08 13:36:28 +02:00
else :
2021-09-06 15:17:53 +02:00
return False
2021-09-06 15:47:27 +02:00
return people
2021-09-06 14:52:20 +02:00
2021-09-08 13:36:28 +02:00
def is_image ( self , file ) :
try :
2021-09-08 14:24:02 +02:00
image_object = PIL . Image . open ( file )
2021-09-08 13:36:28 +02:00
except PIL . UnidentifiedImageError as error :
self . _log . debug ( f " File ' { file } ' is not an image recognizable by PIL. { error } " )
return False
return True
2021-09-06 14:52:20 +02:00
def _init_log ( self ) :
''' Initialize log object '''
self . _log = logging . getLogger ( " image_classifier " )
self . _log . setLevel ( logging . DEBUG )
sysloghandler = SysLogHandler ( )
sysloghandler . setLevel ( logging . DEBUG )
self . _log . addHandler ( sysloghandler )
streamhandler = logging . StreamHandler ( sys . stdout )
streamhandler . setLevel ( logging . getLevelName ( self . debug_level ) )
#formatter = '%(asctime)s | %(levelname)8s | %(message)s'
formatter = ' [ %(levelname)s ] %(message)s '
streamhandler . setFormatter ( CustomFormatter ( formatter ) )
self . _log . addHandler ( streamhandler )
if not os . path . exists ( os . path . dirname ( self . log_file ) ) :
os . mkdir ( os . path . dirname ( self . log_file ) )
filehandler = logging . handlers . RotatingFileHandler ( self . log_file , maxBytes = 102400000 )
# create formatter
formatter = logging . Formatter ( ' %(asctime)s %(name)-12s %(levelname)-8s %(message)s ' )
filehandler . setFormatter ( formatter )
filehandler . setLevel ( logging . DEBUG )
self . _log . addHandler ( filehandler )
return True
@click.command ( )
@click.option ( " --debug-level " , " -d " , default = " INFO " ,
type = click . Choice (
[ " CRITICAL " , " ERROR " , " WARNING " , " INFO " , " DEBUG " , " NOTSET " ] ,
case_sensitive = False ,
) , help = ' Set the debug level for the standard output. ' )
@click.option ( ' --log-file ' , ' -l ' , help = " File to store all debug messages. " )
@click.option ( " --faces-directory " , " -f " , required = True , help = " Folder containing the pictures that identify people. The filename would be used as the name for the person. Just one person per picture. " )
@click.option ( " --directory " , " -d " , required = True , help = " Folder containing the pictures to classify. " )
2021-09-06 22:46:25 +02:00
@click.option ( " --no-move " , " -n " , is_flag = True , help = " Don ' t move files, just add people ' s tag. " )
2021-09-06 14:52:20 +02:00
@click_config_file.configuration_option ( )
2021-09-06 22:46:25 +02:00
def __main__ ( debug_level , log_file , faces_directory , directory , no_move ) :
object = image_classifier ( debug_level , log_file , faces_directory , directory , no_move )
2021-09-06 14:52:20 +02:00
if __name__ == " __main__ " :
__main__ ( )