211 lines
9.4 KiB
Python
Executable file
211 lines
9.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
#
|
|
# This script is licensed under GNU GPL version 2.0 or above
|
|
# (c) 2021 Antonio J. Delgado
|
|
#
|
|
|
|
import sys
|
|
import os
|
|
import logging
|
|
import json
|
|
import shutil
|
|
import click
|
|
import click_config_file
|
|
from logging.handlers import SysLogHandler
|
|
import face_recognition
|
|
import pyexiv2
|
|
import PIL
|
|
|
|
class CustomFormatter(logging.Formatter):
|
|
"""Logging colored formatter, adapted from https://stackoverflow.com/a/56944256/3638629"""
|
|
|
|
grey = '\x1b[38;21m'
|
|
blue = '\x1b[38;5;39m'
|
|
yellow = '\x1b[38;5;226m'
|
|
red = '\x1b[38;5;196m'
|
|
bold_red = '\x1b[31;1m'
|
|
reset = '\x1b[0m'
|
|
|
|
def __init__(self, fmt):
|
|
super().__init__()
|
|
self.fmt = fmt
|
|
self.FORMATS = {
|
|
logging.DEBUG: self.grey + self.fmt + self.reset,
|
|
logging.INFO: self.blue + self.fmt + self.reset,
|
|
logging.WARNING: self.yellow + self.fmt + self.reset,
|
|
logging.ERROR: self.red + self.fmt + self.reset,
|
|
logging.CRITICAL: self.bold_red + self.fmt + self.reset
|
|
}
|
|
|
|
def format(self, record):
|
|
log_fmt = self.FORMATS.get(record.levelno)
|
|
formatter = logging.Formatter(log_fmt)
|
|
return formatter.format(record)
|
|
|
|
class image_classifier:
|
|
|
|
def __init__(self, debug_level, log_file, faces_directory, directory, no_move):
|
|
''' Initial function called when object is created '''
|
|
self.debug_level = debug_level
|
|
if log_file is None:
|
|
log_file = os.path.join(os.environ.get('HOME', os.environ.get('USERPROFILE', os.getcwd())), 'log', 'image_classifier.log')
|
|
self.log_file = log_file
|
|
self._init_log()
|
|
self.faces_directory = faces_directory
|
|
self.directory = directory
|
|
self.known_people = self.load_known_people()
|
|
self.no_move = no_move
|
|
|
|
if os.access(directory, os.R_OK):
|
|
with os.scandir(directory) as directory_item:
|
|
for entry in directory_item:
|
|
if not entry.name.startswith('.') and entry.is_file():
|
|
self.process_file(os.path.join(directory, entry.name))
|
|
|
|
def process_file(self, file):
|
|
''' Process a file, find faces, add EXIF information and
|
|
move it to the folder of the day'''
|
|
self._log.debug(f"Looking for faces in file '{file}'...")
|
|
if not os.access(file, os.R_OK):
|
|
self._log.error(f"The file '{file}' is not readable.")
|
|
else:
|
|
people = self.find_faces(file)
|
|
if people:
|
|
self._log.debug(f"Found {len(people)} known people in the image.")
|
|
self._log.debug(json.dumps(people, indent=2))
|
|
self.metadata = pyexiv2.ImageMetadata(file)
|
|
self.metadata.read()
|
|
#self.print_metadata()
|
|
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
|
|
self._log.debug(f"People (before): {self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} (type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
|
|
self.append_people(people)
|
|
self._log.debug(f"People (after): {self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} (type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
|
|
self.metadata.write()
|
|
self._log.debug(f"Updated file '{file}'.")
|
|
if not self.no_move:
|
|
if 'Exif.Photo.DateTimeOriginal' in self.metadata.exif_keys:
|
|
original_date = self.metadata['Exif.Photo.DateTimeOriginal'].value
|
|
self._log.debug(f"File creation time: {original_date} (type: {type(original_date)})")
|
|
dirname = os.path.dirname(os.path.realpath(file))
|
|
filename = os.path.basename(file)
|
|
new_path = os.path.join(dirname, original_date.strftime('%Y/%m/%d'), filename)
|
|
self._log.debug(f"New path: {new_path}")
|
|
os.makedirs(os.path.dirname(new_path), exist_ok=True)
|
|
if new_path:
|
|
shutil.move(file, new_path)
|
|
else:
|
|
self._log.debug("Doesn't seem to be an image.")
|
|
|
|
def print_metadata(self):
|
|
print("IPTC keys:")
|
|
for key in self.metadata.iptc_keys:
|
|
print(f" {key}: '{self.metadata[key].raw_value}'")
|
|
print("EXIF keys:")
|
|
for key in self.metadata.exif_keys:
|
|
print(f" {key}: '{self.metadata[key].raw_value}'")
|
|
print("XMP keys:")
|
|
for key in self.metadata.xmp_keys:
|
|
print(f" {key}: '{self.metadata[key].raw_value}'")
|
|
|
|
def append_people(self, people):
|
|
new_list = list()
|
|
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
|
|
for person in self.metadata['Xmp.iptcExt.PersonInImage'].raw_value:
|
|
new_list.append(person)
|
|
for person in people:
|
|
if person not in new_list:
|
|
self._log.debug(f"Adding person '{person}'...")
|
|
new_list.append(person)
|
|
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
|
|
self.metadata['Xmp.iptcExt.PersonInImage'].value = new_list
|
|
else:
|
|
self.metadata['Xmp.iptcExt.PersonInImage'] = pyexiv2.XmpTag('Xmp.iptcExt.PersonInImage', new_list)
|
|
|
|
def is_json(self, data):
|
|
try:
|
|
result = json.loads(data)
|
|
except TypeError:
|
|
return False
|
|
return True
|
|
|
|
def load_known_people(self):
|
|
known_people = list()
|
|
self._log.debug(f"Looking for known faces in directory '{self.faces_directory}'...")
|
|
if os.access(self.faces_directory, os.R_OK):
|
|
with os.scandir(self.faces_directory) as faces_items:
|
|
for entry in faces_items:
|
|
if not entry.name.startswith('.') and entry.is_file():
|
|
self._log.debug(f"Identifying face in file '{entry.name}'...")
|
|
person = dict()
|
|
person['filename'] = face_recognition.load_image_file(self.faces_directory + os.sep + entry.name)
|
|
person['name'] = os.path.basename(os.path.splitext(self.faces_directory + os.sep + entry.name)[0])
|
|
encodings = face_recognition.face_encodings(person['filename'])
|
|
if len(encodings) > 0:
|
|
person['encoding'] = encodings[0]
|
|
known_people.append(person)
|
|
else:
|
|
self._log.info(f"No faces found in file '{entry.name}'.")
|
|
return known_people
|
|
|
|
def find_faces(self, file):
|
|
''' Find faces in an image/video file '''
|
|
people = list()
|
|
try:
|
|
image = face_recognition.load_image_file(file)
|
|
encodings = face_recognition.face_encodings(image)
|
|
self._log.debug(f"Found {len(encodings)} faces.")
|
|
for known_person in self.known_people:
|
|
for encoding in encodings:
|
|
if face_recognition.compare_faces([known_person['encoding']], encoding)[0]:
|
|
if known_person['name'] not in people:
|
|
people.append(known_person['name'])
|
|
except PIL.UnidentifiedImageError as error:
|
|
self._log.debug(f"File '{file}' don't seem to be an image.")
|
|
return False
|
|
return people
|
|
|
|
def _init_log(self):
|
|
''' Initialize log object '''
|
|
self._log = logging.getLogger("image_classifier")
|
|
self._log.setLevel(logging.DEBUG)
|
|
|
|
sysloghandler = SysLogHandler()
|
|
sysloghandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(sysloghandler)
|
|
|
|
streamhandler = logging.StreamHandler(sys.stdout)
|
|
streamhandler.setLevel(logging.getLevelName(self.debug_level))
|
|
#formatter = '%(asctime)s | %(levelname)8s | %(message)s'
|
|
formatter = '[%(levelname)s] %(message)s'
|
|
streamhandler.setFormatter(CustomFormatter(formatter))
|
|
self._log.addHandler(streamhandler)
|
|
|
|
if not os.path.exists(os.path.dirname(self.log_file)):
|
|
os.mkdir(os.path.dirname(self.log_file))
|
|
|
|
filehandler = logging.handlers.RotatingFileHandler(self.log_file, maxBytes=102400000)
|
|
# create formatter
|
|
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
|
|
filehandler.setFormatter(formatter)
|
|
filehandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(filehandler)
|
|
return True
|
|
|
|
@click.command()
|
|
@click.option("--debug-level", "-d", default="INFO",
|
|
type=click.Choice(
|
|
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
|
|
case_sensitive=False,
|
|
), help='Set the debug level for the standard output.')
|
|
@click.option('--log-file', '-l', help="File to store all debug messages.")
|
|
@click.option("--faces-directory","-f", required=True, help="Folder containing the pictures that identify people. The filename would be used as the name for the person. Just one person per picture.")
|
|
@click.option("--directory","-d", required=True, help="Folder containing the pictures to classify.")
|
|
@click.option("--no-move","-n", is_flag=True, help="Don't move files, just add people's tag.")
|
|
@click_config_file.configuration_option()
|
|
def __main__(debug_level, log_file, faces_directory, directory, no_move):
|
|
object = image_classifier(debug_level, log_file, faces_directory, directory, no_move)
|
|
|
|
if __name__ == "__main__":
|
|
__main__()
|
|
|