2021-09-06 14:52:20 +02:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2021 Antonio J. Delgado
#
import sys
import os
import logging
import json
import click
import click_config_file
from logging . handlers import SysLogHandler
import face_recognition
import exif
2021-09-06 15:15:02 +02:00
import PIL
2021-09-06 14:52:20 +02:00
class CustomFormatter ( logging . Formatter ) :
""" Logging colored formatter, adapted from https://stackoverflow.com/a/56944256/3638629 """
grey = ' \x1b [38;21m '
blue = ' \x1b [38;5;39m '
yellow = ' \x1b [38;5;226m '
red = ' \x1b [38;5;196m '
bold_red = ' \x1b [31;1m '
reset = ' \x1b [0m '
def __init__ ( self , fmt ) :
super ( ) . __init__ ( )
self . fmt = fmt
self . FORMATS = {
logging . DEBUG : self . grey + self . fmt + self . reset ,
logging . INFO : self . blue + self . fmt + self . reset ,
logging . WARNING : self . yellow + self . fmt + self . reset ,
logging . ERROR : self . red + self . fmt + self . reset ,
logging . CRITICAL : self . bold_red + self . fmt + self . reset
}
def format ( self , record ) :
log_fmt = self . FORMATS . get ( record . levelno )
formatter = logging . Formatter ( log_fmt )
return formatter . format ( record )
class image_classifier :
2021-09-06 15:05:04 +02:00
def __init__ ( self , debug_level , log_file , faces_directory , directory ) :
2021-09-06 14:52:20 +02:00
''' Initial function called when object is created '''
self . debug_level = debug_level
if log_file is None :
log_file = os . path . join ( os . environ . get ( ' HOME ' , os . environ . get ( ' USERPROFILE ' , os . getcwd ( ) ) ) , ' log ' , ' image_classifier.log ' )
self . log_file = log_file
self . _init_log ( )
2021-09-06 15:05:04 +02:00
self . faces_directory = faces_directory
2021-09-06 14:52:20 +02:00
self . directory = directory
self . known_people = self . load_known_people ( )
2021-09-06 15:06:18 +02:00
2021-09-06 14:52:20 +02:00
if os . access ( directory , os . R_OK ) :
with os . scandir ( directory ) as directory_item :
for entry in directory_item :
if not entry . name . startswith ( ' . ' ) and entry . is_file ( ) :
2021-09-06 15:11:21 +02:00
self . process_file ( directory + os . sep + entry . name )
2021-09-06 14:52:20 +02:00
def process_file ( self , file ) :
''' Process a file, find faces, add EXIF information and
move it to the folder of the day '''
2021-09-06 14:57:48 +02:00
self . _log . debug ( f " Processing file ' { file } ' ... " )
2021-09-06 14:52:20 +02:00
people = self . find_faces ( file )
2021-09-06 15:17:53 +02:00
if people :
2021-09-06 15:47:27 +02:00
self . _log . debug ( f " Found { len ( people ) } known people in the image. " )
2021-09-06 15:54:51 +02:00
self . _log . debug ( json . dumps ( people , indent = 2 ) )
2021-09-06 15:17:53 +02:00
with open ( file , ' rb ' ) as image_file :
2021-09-06 16:04:19 +02:00
self . exif_info = exif . Image ( image_file )
if self . exif_info . has_exif :
2021-09-06 16:29:22 +02:00
# for key in dir(self.exif_info):
# if not key.startswith("_"):
# sys.stdout.write(f"{key}: ")
# sys.stdout.write(f"{self.exif_info[key]}")
2021-09-06 16:20:06 +02:00
self . append_people_to_exif ( people )
with open ( file , ' wb ' ) as new_image_file :
new_image_file . write ( self . exif_info . get_file ( ) )
2021-09-06 15:49:07 +02:00
else :
self . _log . debug ( " No exif info in the image. " )
2021-09-06 15:17:53 +02:00
# get date
# move to destination
2021-09-06 15:49:07 +02:00
else :
self . _log . debug ( " Doesn ' t seem to be an image. " )
2021-09-06 14:52:20 +02:00
2021-09-06 16:04:19 +02:00
def append_people_to_exif ( self , people ) :
2021-09-06 16:10:01 +02:00
if self . is_json ( self . exif_info . get ( ' user_comment ' ) ) :
2021-09-06 16:04:19 +02:00
data = json . loads ( self . exif_info [ ' user_comment ' ] )
if ' PeopleDetected ' not in data :
data [ ' PeopleDetected ' ] = list ( )
else :
2021-09-06 16:10:01 +02:00
data = dict ( )
if self . exif_info . get ( ' user_comment ' ) :
data [ " previous_user_comment " ] = self . exif_info . get ( ' user_comment ' )
2021-09-06 16:04:19 +02:00
data [ ' PeopleDetected ' ] = list ( )
for person in people :
data [ ' PeopleDetected ' ] . append ( person )
2021-09-06 16:15:42 +02:00
self . exif_info . set ( " user_comment " , json . dumps ( data ) )
2021-09-06 16:04:19 +02:00
2021-09-06 15:54:51 +02:00
def is_json ( self , data ) :
2021-09-06 16:31:42 +02:00
try :
result = json . loads ( data )
except TypeError :
return False
2021-09-06 15:54:51 +02:00
return True
2021-09-06 14:52:20 +02:00
def load_known_people ( self ) :
known_people = list ( )
2021-09-06 15:10:57 +02:00
self . _log . debug ( f " Looking for known faces in directory ' { self . faces_directory } ' ... " )
2021-09-06 15:05:04 +02:00
if os . access ( self . faces_directory , os . R_OK ) :
with os . scandir ( self . faces_directory ) as faces_items :
2021-09-06 14:52:20 +02:00
for entry in faces_items :
if not entry . name . startswith ( ' . ' ) and entry . is_file ( ) :
2021-09-06 15:14:22 +02:00
self . _log . debug ( f " Detecting known person in file ' { entry . name } ' ... " )
2021-09-06 14:52:20 +02:00
person = dict ( )
2021-09-06 15:09:35 +02:00
person [ ' filename ' ] = face_recognition . load_image_file ( self . faces_directory + os . sep + entry . name )
2021-09-06 15:56:41 +02:00
person [ ' name ' ] = os . path . basename ( os . path . splitext ( self . faces_directory + os . sep + entry . name ) [ 0 ] )
2021-09-06 14:52:20 +02:00
person [ ' encoding ' ] = face_recognition . face_encodings ( person [ ' filename ' ] ) [ 0 ]
known_people . append ( person )
return known_people
def find_faces ( self , file ) :
''' Find faces in an image/video file '''
2021-09-06 15:47:27 +02:00
people = list ( )
2021-09-06 15:14:22 +02:00
try :
image = face_recognition . load_image_file ( file )
encodings = face_recognition . face_encodings ( image )
2021-09-06 15:34:27 +02:00
self . _log . debug ( f " Found { len ( encodings ) } faces. " )
2021-09-06 15:14:22 +02:00
for known_person in self . known_people :
2021-09-06 15:37:13 +02:00
for encoding in encodings :
2021-09-06 15:43:33 +02:00
if face_recognition . compare_faces ( [ known_person [ ' encoding ' ] ] , encoding ) [ 0 ] :
2021-09-06 16:13:28 +02:00
if known_person [ ' name ' ] not in people :
people . append ( known_person [ ' name ' ] )
2021-09-06 15:14:22 +02:00
except PIL . UnidentifiedImageError as error :
self . _log . debug ( f " File ' { file } ' don ' t seem to be an image. " )
2021-09-06 15:17:53 +02:00
return False
2021-09-06 15:47:27 +02:00
return people
2021-09-06 14:52:20 +02:00
def _init_log ( self ) :
''' Initialize log object '''
self . _log = logging . getLogger ( " image_classifier " )
self . _log . setLevel ( logging . DEBUG )
sysloghandler = SysLogHandler ( )
sysloghandler . setLevel ( logging . DEBUG )
self . _log . addHandler ( sysloghandler )
streamhandler = logging . StreamHandler ( sys . stdout )
streamhandler . setLevel ( logging . getLevelName ( self . debug_level ) )
#formatter = '%(asctime)s | %(levelname)8s | %(message)s'
formatter = ' [ %(levelname)s ] %(message)s '
streamhandler . setFormatter ( CustomFormatter ( formatter ) )
self . _log . addHandler ( streamhandler )
if not os . path . exists ( os . path . dirname ( self . log_file ) ) :
os . mkdir ( os . path . dirname ( self . log_file ) )
filehandler = logging . handlers . RotatingFileHandler ( self . log_file , maxBytes = 102400000 )
# create formatter
formatter = logging . Formatter ( ' %(asctime)s %(name)-12s %(levelname)-8s %(message)s ' )
filehandler . setFormatter ( formatter )
filehandler . setLevel ( logging . DEBUG )
self . _log . addHandler ( filehandler )
return True
@click.command ( )
@click.option ( " --debug-level " , " -d " , default = " INFO " ,
type = click . Choice (
[ " CRITICAL " , " ERROR " , " WARNING " , " INFO " , " DEBUG " , " NOTSET " ] ,
case_sensitive = False ,
) , help = ' Set the debug level for the standard output. ' )
@click.option ( ' --log-file ' , ' -l ' , help = " File to store all debug messages. " )
@click.option ( " --faces-directory " , " -f " , required = True , help = " Folder containing the pictures that identify people. The filename would be used as the name for the person. Just one person per picture. " )
@click.option ( " --directory " , " -d " , required = True , help = " Folder containing the pictures to classify. " )
@click_config_file.configuration_option ( )
2021-09-06 15:05:04 +02:00
def __main__ ( debug_level , log_file , faces_directory , directory ) :
object = image_classifier ( debug_level , log_file , faces_directory , directory )
2021-09-06 14:52:20 +02:00
if __name__ == " __main__ " :
__main__ ( )