extract_words/extract_words/extract_words.py

239 lines
7 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2025 Antonio J. Delgado
"""Extract Words from a file"""
import sys
import os
import json
import time
import re
import logging
from logging.handlers import SysLogHandler
import click
import click_config_file
import yaml
HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/'))
if HOME_FOLDER == '/':
CACHE_FOLDER = '/var/cache'
LOG_FOLDER = '/var/log/'
else:
CACHE_FOLDER = f"{HOME_FOLDER}/.local/"
LOG_FOLDER = f"{HOME_FOLDER}/log/"
class ExtractWords:
"""Extract Words from a file"""
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'extract_words.log'
)
self._init_log()
self._default_data = {
"last_update": 0,
}
self.data = self._read_cached_data()
self._debug(
f"Reading file '{self.config['input_file'].name}'..."
)
content = self.config['input_file'].read()
self._debug(
"Unsplitting separated words..."
)
content = re.sub('- ?\n', '', content)
content = content.lower()
self._debug(
f"Splitting file of {len(content)} bytes..."
)
words = content.split()
self._debug(
f"Found {len(words)} a total (non-unique) words"
)
all_words = []
for word in words:
match = re.match(r'\w*', word)
if match:
if match.group(0) != '':
match_numbers = re.match(r'[0-9]', match.group(0))
if not match_numbers:
if match.group(0) not in all_words:
all_words.append(match.group(0))
self._debug(
f"A total of {len(all_words)} unique words"
)
all_words.sort()
self.config['output_file'].write('\n'.join(all_words))
def close(self):
'''Close class and save data'''
self._save_cached_data(self.data)
def _read_cached_data(self):
if os.path.exists(self.config['cache_file']):
with open(self.config['cache_file'], 'r', encoding='utf-8') as cache_file:
try:
cached_data = json.load(cache_file)
if (
'last_update' in cached_data and
cached_data['last_update'] + self.config['max_cache_age'] > time.time()
):
cached_data = self._default_data
except json.decoder.JSONDecodeError:
cached_data = self._default_data
return cached_data
else:
return self._default_data
def _save_cached_data(self, data):
data['last_update'] = time.time()
with open(self.config['cache_file'], 'w', encoding='utf-8') as cache_file:
json.dump(data, cache_file, indent=2)
self._debug(
f"Saved cached data in '{self.config['cache_file']}'",
)
def _output(self, message):
if self.config['output_format'] == 'JSON':
return json.dumps(message, indent=2)
elif self.config['output_format'] == 'YAML':
return yaml.dump(message, Dumper=yaml.Dumper)
elif self.config['output_format'] == 'PLAIN':
return f"{message}"
else:
self._log.warning(
"Output format '%s' not supported",
self.config['output_format']
)
return message
def _info(self, message):
return self._log.info(self._output(message))
def _warning(self, message):
return self._log.warning(self._output(message))
def _error(self, message):
return self._log.error(self._output(message))
def _debug(self, message):
return self._log.debug(self._output(message))
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("extract_words")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "extract_words.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option(
"--output-format",
"-o",
default="JSON",
type=click.Choice(
["JSON", "YAML", "CSV", "PLAIN"],
case_sensitive=False,
),
help='Set the output format.'
)
@click.option(
'--log-file',
'-l',
default=f"{LOG_FOLDER}/extract_words.log",
help="File to store all debug messages."
)
@click.option(
'--cache-file',
'-f',
default=f"{CACHE_FOLDER}/extract_words.json",
help='Cache file to store data from each run',
)
@click.option(
'--max-cache-age',
'-a',
default=60*60*24*7,
help='Max age in seconds for the cache'
)
@click.option(
'--input-file',
'-i',
required=True,
type=click.File('r'),
help='File containing words mixed with other things (like a book) in plain text',
)
@click.option(
'--output-file',
'-O',
required=True,
type=click.File('w'),
help='File to write the words',
)
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click_config_file.configuration_option()
def __main__(**kwargs):
obj = ExtractWords(**kwargs)
obj.close()
if __name__ == "__main__":
__main__()