image_classifier/image_classifier/image_classifier.py

280 lines
12 KiB
Python
Raw Normal View History

2021-09-06 14:52:20 +02:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2021 Antonio J. Delgado
2022-05-02 12:56:32 +02:00
#
2021-09-06 14:52:20 +02:00
import sys
import os
import logging
import json
2021-09-06 22:41:35 +02:00
import shutil
2021-09-06 22:54:14 +02:00
import re
2021-09-06 14:52:20 +02:00
import click
import click_config_file
from logging.handlers import SysLogHandler
import face_recognition
2021-09-06 20:47:28 +02:00
import pyexiv2
2021-09-06 15:15:02 +02:00
import PIL
2021-09-06 14:52:20 +02:00
2022-05-02 12:56:32 +02:00
2021-09-06 14:52:20 +02:00
class CustomFormatter(logging.Formatter):
"""Logging colored formatter, adapted from https://stackoverflow.com/a/56944256/3638629"""
grey = '\x1b[38;21m'
blue = '\x1b[38;5;39m'
yellow = '\x1b[38;5;226m'
red = '\x1b[38;5;196m'
bold_red = '\x1b[31;1m'
reset = '\x1b[0m'
def __init__(self, fmt):
super().__init__()
self.fmt = fmt
self.FORMATS = {
logging.DEBUG: self.grey + self.fmt + self.reset,
logging.INFO: self.blue + self.fmt + self.reset,
logging.WARNING: self.yellow + self.fmt + self.reset,
logging.ERROR: self.red + self.fmt + self.reset,
logging.CRITICAL: self.bold_red + self.fmt + self.reset
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
2022-05-02 12:56:32 +02:00
2021-09-06 14:52:20 +02:00
class image_classifier:
2022-05-02 12:56:32 +02:00
def __init__(self, debug_level, log_file, faces_directory, directory, no_move,
people_folder):
2021-09-06 14:52:20 +02:00
''' Initial function called when object is created '''
self.debug_level = debug_level
if log_file is None:
2022-05-02 12:56:32 +02:00
home_path = os.environ.get('HOME', os.environ.get('USERPROFILE', os.getcwd()))
log_file = os.path.join(home_path, 'log', 'image_classifier.log')
2021-09-06 14:52:20 +02:00
self.log_file = log_file
self._init_log()
2021-09-06 15:05:04 +02:00
self.faces_directory = faces_directory
2021-09-06 14:52:20 +02:00
self.directory = directory
self.known_people = self.load_known_people()
2021-09-06 22:46:25 +02:00
self.no_move = no_move
2022-05-02 12:56:32 +02:00
self.people_folder = people_folder
2021-09-06 15:06:18 +02:00
2021-09-06 14:52:20 +02:00
if os.access(directory, os.R_OK):
with os.scandir(directory) as directory_item:
for entry in directory_item:
if not entry.name.startswith('.') and entry.is_file():
2021-09-06 22:42:26 +02:00
self.process_file(os.path.join(directory, entry.name))
2021-09-06 14:52:20 +02:00
2022-05-02 15:36:05 +02:00
def process_metadata(self, file):
self.metadata = pyexiv2.ImageMetadata(file)
self.metadata.read()
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
self._log.debug(f"People (before): \
{self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} \
(type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
2021-09-06 14:52:20 +02:00
def process_file(self, file):
''' Process a file, find faces, add EXIF information and
move it to the folder of the day'''
2021-09-06 22:08:52 +02:00
self._log.debug(f"Looking for faces in file '{file}'...")
2022-05-02 15:36:05 +02:00
folder_date = 'unknown-time'
2021-09-08 14:37:44 +02:00
dirname = os.path.dirname(os.path.realpath(file))
filename = os.path.basename(file)
2022-05-02 15:49:19 +02:00
people = list()
2021-09-06 22:41:35 +02:00
if not os.access(file, os.R_OK):
self._log.error(f"The file '{file}' is not readable.")
2021-09-06 15:49:07 +02:00
else:
2021-09-08 14:26:01 +02:00
if self.is_image(file):
2022-05-02 15:37:59 +02:00
self.process_metadata(file)
2021-09-08 14:26:01 +02:00
people = self.find_faces(file)
if people:
self._log.debug(f"Found {len(people)} known people in the image.")
self._log.debug(json.dumps(people, indent=2))
2022-05-02 12:56:32 +02:00
self.append_people(file, people)
if 'Exif.Photo.DateTimeOriginal' in self.metadata.exif_keys:
original_date = self.metadata['Exif.Photo.DateTimeOriginal'].value
self._log.debug(f"File creation time: {original_date} \
(type: {type(original_date)})")
2022-05-02 13:50:37 +02:00
folder_date = original_date.strftime('%Y.%m.%d')
2022-05-02 15:36:05 +02:00
if folder_date == 'unknown-time':
2022-05-02 12:56:32 +02:00
match = re.search(r'(?P<year>20[0-9]{2})[\-/\._]?\
(?P<month>[0-1]?[0-9])[\-/\._]?(?P<day>[0-3]?[0-9])', filename)
2021-09-08 14:35:36 +02:00
if match:
2022-05-02 13:50:37 +02:00
folder_date = f"{match.group('year')}.{match.group('month')}.\
{match.group('day')}"
2021-10-25 07:50:39 +02:00
else:
2022-05-02 12:56:32 +02:00
match = re.search(r'(?P<day>[0-3]?[0-9])[\-/\._]?\
(?P<month>[0-1]?[0-9])[\-/\._]?(?P<year>20[0-9]{2})', filename)
2021-10-25 07:50:39 +02:00
if match:
2022-05-02 13:50:37 +02:00
folder_date = f"{match.group('year')}.{match.group('month')}.\
2022-05-02 12:56:32 +02:00
{match.group('day')}"
2022-05-02 13:50:37 +02:00
folder = os.path.join(dirname, folder_date, filename)
self._log.debug(f"Time based folder name section '{folder_date}'")
2022-05-02 14:05:18 +02:00
new_path = os.path.dirname(folder)
2022-05-02 13:28:44 +02:00
if not os.path.exists(new_path):
os.makedirs(new_path)
2022-05-02 12:56:32 +02:00
if self.people_folder:
for person in people:
2022-05-02 13:50:37 +02:00
person_path = os.path.join(self.people_folder,
person.replace(' ', '.'),
folder_date)
2022-05-02 13:25:50 +02:00
if not os.path.exists(person_path):
os.makedirs(person_path)
2022-05-02 13:41:25 +02:00
self._log.info(f"Copying file '{file}' to person '{person}' folder,\
'{person_path}'...")
2022-05-02 12:56:32 +02:00
shutil.copy(file, person_path)
if not self.no_move:
2022-05-02 14:00:16 +02:00
if os.path.exists(os.path.join(new_path, filename)):
self._log.debug(f"Destination '{new_path}/{filename}' exists, removing it...")
os.remove(os.path.join(new_path, filename))
2021-09-08 14:35:36 +02:00
self._log.info(f"Moving file '{file}' to '{new_path}'...")
shutil.move(file, new_path)
else:
self._log.info(f"NOT moving file '{file}' to '{new_path}' because of --no-move")
2022-05-02 12:56:32 +02:00
2021-09-06 22:08:52 +02:00
def print_metadata(self):
print("IPTC keys:")
for key in self.metadata.iptc_keys:
print(f" {key}: '{self.metadata[key].raw_value}'")
print("EXIF keys:")
for key in self.metadata.exif_keys:
print(f" {key}: '{self.metadata[key].raw_value}'")
print("XMP keys:")
for key in self.metadata.xmp_keys:
print(f" {key}: '{self.metadata[key].raw_value}'")
2022-05-02 12:56:32 +02:00
def append_people(self, file, people):
2021-09-06 22:08:52 +02:00
new_list = list()
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
for person in self.metadata['Xmp.iptcExt.PersonInImage'].raw_value:
new_list.append(person)
2021-09-06 16:04:19 +02:00
for person in people:
2021-09-06 22:08:52 +02:00
if person not in new_list:
self._log.debug(f"Adding person '{person}'...")
new_list.append(person)
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
self.metadata['Xmp.iptcExt.PersonInImage'].value = new_list
else:
2022-05-02 12:56:32 +02:00
self.metadata['Xmp.iptcExt.PersonInImage'] = pyexiv2.XmpTag('Xmp.iptcExt.PersonInImage',
new_list)
self._log.debug(f"People (after): \
{self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} \
(type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
try:
self.metadata.write()
self._log.debug(f"Updated file '{file}'.")
except OSError as error:
self._log.error(f"Error writing metadata to picture file. {error}")
2021-09-06 16:04:19 +02:00
2021-09-06 15:54:51 +02:00
def is_json(self, data):
2021-09-06 16:31:42 +02:00
try:
2022-05-02 13:02:23 +02:00
json.loads(data)
2021-09-06 16:31:42 +02:00
except TypeError:
return False
2021-09-06 15:54:51 +02:00
return True
2021-09-06 14:52:20 +02:00
def load_known_people(self):
known_people = list()
2021-09-06 15:10:57 +02:00
self._log.debug(f"Looking for known faces in directory '{self.faces_directory}'...")
2021-09-06 15:05:04 +02:00
if os.access(self.faces_directory, os.R_OK):
with os.scandir(self.faces_directory) as faces_items:
2021-09-06 14:52:20 +02:00
for entry in faces_items:
2022-05-02 12:56:32 +02:00
if (not entry.name.startswith('.') and entry.is_file() and
self.is_image(self.faces_directory + os.sep + entry.name)):
2021-09-06 22:08:52 +02:00
self._log.debug(f"Identifying face in file '{entry.name}'...")
2021-09-06 14:52:20 +02:00
person = dict()
2022-05-02 12:56:32 +02:00
person['filename'] = face_recognition.load_image_file(self.faces_directory +
os.sep + entry.name)
person['name'] = os.path.basename(os.path.splitext(self.faces_directory +
os.sep + entry.name)[0])
2021-09-06 22:08:52 +02:00
encodings = face_recognition.face_encodings(person['filename'])
if len(encodings) > 0:
person['encoding'] = encodings[0]
known_people.append(person)
else:
self._log.info(f"No faces found in file '{entry.name}'.")
2021-09-06 14:52:20 +02:00
return known_people
def find_faces(self, file):
''' Find faces in an image/video file '''
2021-09-06 15:47:27 +02:00
people = list()
2021-09-08 13:36:28 +02:00
if self.is_image(file):
2021-09-06 15:14:22 +02:00
image = face_recognition.load_image_file(file)
encodings = face_recognition.face_encodings(image)
2021-09-06 15:34:27 +02:00
self._log.debug(f"Found {len(encodings)} faces.")
2021-09-06 15:14:22 +02:00
for known_person in self.known_people:
2021-09-06 15:37:13 +02:00
for encoding in encodings:
2021-09-06 15:43:33 +02:00
if face_recognition.compare_faces([known_person['encoding']], encoding)[0]:
2021-09-06 16:13:28 +02:00
if known_person['name'] not in people:
people.append(known_person['name'])
2021-09-08 13:36:28 +02:00
else:
2021-09-06 15:17:53 +02:00
return False
2021-09-06 15:47:27 +02:00
return people
2021-09-06 14:52:20 +02:00
2021-09-08 13:36:28 +02:00
def is_image(self, file):
try:
2022-05-02 13:02:23 +02:00
PIL.Image.open(file)
2021-10-10 14:10:30 +02:00
except OSError as error:
self._log.debug(f"File '{file}' is not readable by PIL. {error}")
return False
2021-09-08 13:36:28 +02:00
except PIL.UnidentifiedImageError as error:
self._log.debug(f"File '{file}' is not an image recognizable by PIL. {error}")
return False
return True
2021-09-06 14:52:20 +02:00
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("image_classifier")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(logging.getLevelName(self.debug_level))
2022-05-02 12:56:32 +02:00
# formatter = '%(asctime)s | %(levelname)8s | %(message)s'
2021-09-06 14:52:20 +02:00
formatter = '[%(levelname)s] %(message)s'
streamhandler.setFormatter(CustomFormatter(formatter))
self._log.addHandler(streamhandler)
if not os.path.exists(os.path.dirname(self.log_file)):
os.mkdir(os.path.dirname(self.log_file))
filehandler = logging.handlers.RotatingFileHandler(self.log_file, maxBytes=102400000)
# create formatter
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
2022-05-02 12:56:32 +02:00
2021-09-06 14:52:20 +02:00
@click.command()
@click.option("--debug-level", "-d", default="INFO",
2022-05-02 12:56:32 +02:00
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.')
2021-09-06 14:52:20 +02:00
@click.option('--log-file', '-l', help="File to store all debug messages.")
2022-05-02 12:56:32 +02:00
@click.option("--faces-directory", "-f", required=True, help="Folder containing the pictures that \
identify people. Filename would be used as the name for the person. Just one person per picture.")
@click.option("--directory", "-d", required=True, help="Folder with the pictures to classify.")
@click.option("--no-move", "-n", is_flag=True, help="Don't move files, just add people's tag.")
2022-05-02 13:37:04 +02:00
@click.option('--people-folder', '-p', help="Define a folder for people's folders and copy \
pictures to each person's folder. Be sure to have deduplication in the filesystem to avoid using \
too much storage.")
2021-09-06 14:52:20 +02:00
@click_config_file.configuration_option()
2022-05-02 12:56:32 +02:00
def __main__(debug_level, log_file, faces_directory, directory, no_move, people_folder):
return image_classifier(debug_level, log_file, faces_directory, directory, no_move,
people_folder)
2021-09-06 14:52:20 +02:00
if __name__ == "__main__":
__main__()