Add option to copy to people's folders
This commit is contained in:
parent
87a30a0dbb
commit
df640aa230
1 changed files with 66 additions and 36 deletions
|
@ -3,7 +3,7 @@
|
|||
#
|
||||
# This script is licensed under GNU GPL version 2.0 or above
|
||||
# (c) 2021 Antonio J. Delgado
|
||||
#
|
||||
#
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
@ -18,6 +18,7 @@ import face_recognition
|
|||
import pyexiv2
|
||||
import PIL
|
||||
|
||||
|
||||
class CustomFormatter(logging.Formatter):
|
||||
"""Logging colored formatter, adapted from https://stackoverflow.com/a/56944256/3638629"""
|
||||
|
||||
|
@ -44,19 +45,23 @@ class CustomFormatter(logging.Formatter):
|
|||
formatter = logging.Formatter(log_fmt)
|
||||
return formatter.format(record)
|
||||
|
||||
|
||||
class image_classifier:
|
||||
|
||||
def __init__(self, debug_level, log_file, faces_directory, directory, no_move):
|
||||
def __init__(self, debug_level, log_file, faces_directory, directory, no_move,
|
||||
people_folder):
|
||||
''' Initial function called when object is created '''
|
||||
self.debug_level = debug_level
|
||||
if log_file is None:
|
||||
log_file = os.path.join(os.environ.get('HOME', os.environ.get('USERPROFILE', os.getcwd())), 'log', 'image_classifier.log')
|
||||
home_path = os.environ.get('HOME', os.environ.get('USERPROFILE', os.getcwd()))
|
||||
log_file = os.path.join(home_path, 'log', 'image_classifier.log')
|
||||
self.log_file = log_file
|
||||
self._init_log()
|
||||
self.faces_directory = faces_directory
|
||||
self.directory = directory
|
||||
self.known_people = self.load_known_people()
|
||||
self.no_move = no_move
|
||||
self.people_folder = people_folder
|
||||
|
||||
if os.access(directory, os.R_OK):
|
||||
with os.scandir(directory) as directory_item:
|
||||
|
@ -82,36 +87,41 @@ class image_classifier:
|
|||
self._log.debug(f"Found {len(people)} known people in the image.")
|
||||
self._log.debug(json.dumps(people, indent=2))
|
||||
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
|
||||
self._log.debug(f"People (before): {self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} (type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
|
||||
self.append_people(people)
|
||||
self._log.debug(f"People (after): {self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} (type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
|
||||
try:
|
||||
self.metadata.write()
|
||||
self._log.debug(f"Updated file '{file}'.")
|
||||
except OSError as error:
|
||||
self._log.error(f"Error writing metadata to picture file. {error}")
|
||||
if 'Exif.Photo.DateTimeOriginal' in self.metadata.exif_keys:
|
||||
original_date = self.metadata['Exif.Photo.DateTimeOriginal'].value
|
||||
self._log.debug(f"File creation time: {original_date} (type: {type(original_date)})")
|
||||
folder = os.path.join(dirname, original_date.strftime('%Y.%m.%d'), filename)
|
||||
self._log.debug(f"People (before): \
|
||||
{self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} \
|
||||
(type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
|
||||
self.append_people(file, people)
|
||||
if 'Exif.Photo.DateTimeOriginal' in self.metadata.exif_keys:
|
||||
original_date = self.metadata['Exif.Photo.DateTimeOriginal'].value
|
||||
self._log.debug(f"File creation time: {original_date} \
|
||||
(type: {type(original_date)})")
|
||||
folder = os.path.join(dirname, original_date.strftime('%Y.%m.%d'), filename)
|
||||
if not folder:
|
||||
match = re.search(r'(?P<year>20[0-9]{2})[\-/\._]?(?P<month>[0-1]?[0-9])[\-/\._]?(?P<day>[0-3]?[0-9])', filename)
|
||||
match = re.search(r'(?P<year>20[0-9]{2})[\-/\._]?\
|
||||
(?P<month>[0-1]?[0-9])[\-/\._]?(?P<day>[0-3]?[0-9])', filename)
|
||||
if match:
|
||||
folder = f"{match.group('year')}.{match.group('month')}.{match.group('day')}"
|
||||
else:
|
||||
match = re.search(r'(?P<day>[0-3]?[0-9])[\-/\._]?(?P<month>[0-1]?[0-9])[\-/\._]?(?P<year>20[0-9]{2})', filename)
|
||||
match = re.search(r'(?P<day>[0-3]?[0-9])[\-/\._]?\
|
||||
(?P<month>[0-1]?[0-9])[\-/\._]?(?P<year>20[0-9]{2})', filename)
|
||||
if match:
|
||||
folder = f"{match.group('year')}.{match.group('month')}.{match.group('day')}"
|
||||
folder = f"{match.group('year')}.{match.group('month')}.\
|
||||
{match.group('day')}"
|
||||
else:
|
||||
folder = 'unknown-time'
|
||||
new_path = os.path.join(dirname, folder, filename)
|
||||
os.makedirs(os.path.dirname(new_path), exist_ok=True)
|
||||
if self.no_move == False:
|
||||
if self.people_folder:
|
||||
for person in people:
|
||||
person_path = os.path.join(self.people_folder, person, folder, filename)
|
||||
self._log.debug(f"Copying file '{file}' to '{person_path}'...")
|
||||
shutil.copy(file, person_path)
|
||||
if not self.no_move:
|
||||
self._log.info(f"Moving file '{file}' to '{new_path}'...")
|
||||
shutil.move(file, new_path)
|
||||
else:
|
||||
self._log.info(f"NOT moving file '{file}' to '{new_path}' because of --no-move")
|
||||
|
||||
|
||||
def print_metadata(self):
|
||||
print("IPTC keys:")
|
||||
for key in self.metadata.iptc_keys:
|
||||
|
@ -123,7 +133,7 @@ class image_classifier:
|
|||
for key in self.metadata.xmp_keys:
|
||||
print(f" {key}: '{self.metadata[key].raw_value}'")
|
||||
|
||||
def append_people(self, people):
|
||||
def append_people(self, file, people):
|
||||
new_list = list()
|
||||
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
|
||||
for person in self.metadata['Xmp.iptcExt.PersonInImage'].raw_value:
|
||||
|
@ -135,11 +145,21 @@ class image_classifier:
|
|||
if 'Xmp.iptcExt.PersonInImage' in self.metadata.xmp_keys:
|
||||
self.metadata['Xmp.iptcExt.PersonInImage'].value = new_list
|
||||
else:
|
||||
self.metadata['Xmp.iptcExt.PersonInImage'] = pyexiv2.XmpTag('Xmp.iptcExt.PersonInImage', new_list)
|
||||
self.metadata['Xmp.iptcExt.PersonInImage'] = pyexiv2.XmpTag('Xmp.iptcExt.PersonInImage',
|
||||
new_list)
|
||||
self._log.debug(f"People (after): \
|
||||
{self.metadata['Xmp.iptcExt.PersonInImage'].raw_value} \
|
||||
(type: {type(self.metadata['Xmp.iptcExt.PersonInImage'].raw_value)})")
|
||||
try:
|
||||
self.metadata.write()
|
||||
self._log.debug(f"Updated file '{file}'.")
|
||||
except OSError as error:
|
||||
self._log.error(f"Error writing metadata to picture file. {error}")
|
||||
|
||||
def is_json(self, data):
|
||||
try:
|
||||
result = json.loads(data)
|
||||
self._log.debug(f"{result} is JSON data")
|
||||
except TypeError:
|
||||
return False
|
||||
return True
|
||||
|
@ -150,11 +170,14 @@ class image_classifier:
|
|||
if os.access(self.faces_directory, os.R_OK):
|
||||
with os.scandir(self.faces_directory) as faces_items:
|
||||
for entry in faces_items:
|
||||
if not entry.name.startswith('.') and entry.is_file() and self.is_image(self.faces_directory + os.sep + entry.name):
|
||||
if (not entry.name.startswith('.') and entry.is_file() and
|
||||
self.is_image(self.faces_directory + os.sep + entry.name)):
|
||||
self._log.debug(f"Identifying face in file '{entry.name}'...")
|
||||
person = dict()
|
||||
person['filename'] = face_recognition.load_image_file(self.faces_directory + os.sep + entry.name)
|
||||
person['name'] = os.path.basename(os.path.splitext(self.faces_directory + os.sep + entry.name)[0])
|
||||
person['filename'] = face_recognition.load_image_file(self.faces_directory +
|
||||
os.sep + entry.name)
|
||||
person['name'] = os.path.basename(os.path.splitext(self.faces_directory +
|
||||
os.sep + entry.name)[0])
|
||||
encodings = face_recognition.face_encodings(person['filename'])
|
||||
if len(encodings) > 0:
|
||||
person['encoding'] = encodings[0]
|
||||
|
@ -182,6 +205,7 @@ class image_classifier:
|
|||
def is_image(self, file):
|
||||
try:
|
||||
image_object = PIL.Image.open(file)
|
||||
self._log.debug(f"{image_object} is an image for PIL")
|
||||
except OSError as error:
|
||||
self._log.debug(f"File '{file}' is not readable by PIL. {error}")
|
||||
return False
|
||||
|
@ -201,7 +225,7 @@ class image_classifier:
|
|||
|
||||
streamhandler = logging.StreamHandler(sys.stdout)
|
||||
streamhandler.setLevel(logging.getLevelName(self.debug_level))
|
||||
#formatter = '%(asctime)s | %(levelname)8s | %(message)s'
|
||||
# formatter = '%(asctime)s | %(levelname)8s | %(message)s'
|
||||
formatter = '[%(levelname)s] %(message)s'
|
||||
streamhandler.setFormatter(CustomFormatter(formatter))
|
||||
self._log.addHandler(streamhandler)
|
||||
|
@ -217,20 +241,26 @@ class image_classifier:
|
|||
self._log.addHandler(filehandler)
|
||||
return True
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("--debug-level", "-d", default="INFO",
|
||||
type=click.Choice(
|
||||
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
|
||||
case_sensitive=False,
|
||||
), help='Set the debug level for the standard output.')
|
||||
type=click.Choice(
|
||||
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
|
||||
case_sensitive=False,
|
||||
),
|
||||
help='Set the debug level for the standard output.')
|
||||
@click.option('--log-file', '-l', help="File to store all debug messages.")
|
||||
@click.option("--faces-directory","-f", required=True, help="Folder containing the pictures that identify people. The filename would be used as the name for the person. Just one person per picture.")
|
||||
@click.option("--directory","-d", required=True, help="Folder containing the pictures to classify.")
|
||||
@click.option("--no-move","-n", is_flag=True, help="Don't move files, just add people's tag.")
|
||||
@click.option("--faces-directory", "-f", required=True, help="Folder containing the pictures that \
|
||||
identify people. Filename would be used as the name for the person. Just one person per picture.")
|
||||
@click.option("--directory", "-d", required=True, help="Folder with the pictures to classify.")
|
||||
@click.option("--no-move", "-n", is_flag=True, help="Don't move files, just add people's tag.")
|
||||
@click.option('--people-folder', '-p', help="Define a folder for people's folders and copy pictures \
|
||||
to each person's folder. Be sure to have deduplication in the filesystem to avoid using too much storage.")
|
||||
@click_config_file.configuration_option()
|
||||
def __main__(debug_level, log_file, faces_directory, directory, no_move):
|
||||
object = image_classifier(debug_level, log_file, faces_directory, directory, no_move)
|
||||
def __main__(debug_level, log_file, faces_directory, directory, no_move, people_folder):
|
||||
return image_classifier(debug_level, log_file, faces_directory, directory, no_move,
|
||||
people_folder)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
__main__()
|
||||
|
||||
|
|
Loading…
Reference in a new issue