publish_active_windows/publish_active_windows/publish_active_windows.py

211 lines
6.6 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2025 Antonio J. Delgado
"""Publish Gnome Active Windows to HomeAssistant"""
import sys
import os
import logging
from logging.handlers import SysLogHandler
import subprocess
import re
import json
import time
from datetime import datetime, timezone
import socket
import click
import click_config_file
import requests
class PublishActiveWindows:
"""Publish Gnome Active Windows to HomeAssistant"""
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'publish_active_windows.log'
)
self._init_log()
self.session = requests.Session()
hostname = socket.gethostname()
while True:
windows = self._get_windows_list()
self._log.debug(
json.dumps(windows, indent=2)
)
self._log.debug(
'Sleeping %s seconds',
self.config['update_frequency']
)
for window in windows:
command_line = self._get_process_command_line(window['pid'])
if window['title']:
state = window['title']
else:
state = command_line
share_attributes = {
"command_line": command_line,
"hostname": hostname
}
window['pid'] = f"{window['pid']}"
window['id'] = f"{window['id']}"
window['maximized'] = f"{window['maximized']}"
attributes = {**share_attributes, **window}
if window['focus']:
entity_id = f'sensor.{hostname}_active_window'
attributes['focus'] = 'True'
else:
entity_id = f'sensor.{hostname}_open_window'
attributes['focus'] = 'False'
if not self._publish_state(entity_id=entity_id, state=state, attributes=attributes):
sys.exit(1)
time.sleep(self.config['update_frequency'])
def _get_process_command_line(self, process_id):
with open(f"/proc/{process_id}/cmdline", 'r', encoding='utf-8') as cmdline_file:
raw_cmdline = cmdline_file.read()
return ' '.join(raw_cmdline.split('\0')).strip()
return None
def _publish_state(self, entity_id, state, attributes):
now = datetime.now(timezone.utc)
data = {
"state": state,
"attributes": attributes,
"last_changed": now.strftime("%Y-%m-%dT%H:%M:%S.%f+00:00"),
"last_updated": now.strftime("%Y-%m-%dT%H:%M:%S.%f+00:00")
}
headers = {
"Authorization": f"Bearer {self.config['ha_token']}",
"Content-Type": "application/json"
}
uri = f"{self.config['ha_url']}/api/states/{entity_id}"
self._log.debug(
"Publishing data to HomeAssistant in '%s'. Data: '%s",
uri,
json.dumps(data, indent=2)
)
result = self.session.post(
uri,
json=data,
headers=headers
)
if result.status_code > 399:
self._log.error(
"Result: %s. %s. %s",
result.status_code,
result.reason,
result.text
)
return None
self._log.debug(
"Result: %s. %s. %s",
result.status_code,
result.reason,
result.text
)
return result.json
def _get_windows_list(self):
command = [
'gdbus',
'call',
'--session',
'--dest',
'org.gnome.Shell',
'--object-path',
'/org/gnome/Shell/Extensions/WindowsExt',
'--method',
'org.gnome.Shell.Extensions.WindowsExt.List'
]
result = subprocess.run(command, capture_output=True, check=True)
return json.loads(re.sub(b"',\\)$", b'', re.sub(b"^\\('", b'', result.stdout)))
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("publish_active_windows")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "publish_active_windows.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option('--log-file', '-l', help="File to store all debug messages.")
@click.option(
'--ha-token',
'-t',
required=True,
help='Token to talk with HomeAssistant'
)
@click.option(
'--ha-url',
'-u',
required=True,
help='HomeAssistant URL'
)
@click.option(
'--update-frequency',
'-f',
default=60,
help='Frequency in seconds to update HomeAssistant status'
)
@click_config_file.configuration_option()
def __main__(**kwargs):
return PublishActiveWindows(**kwargs)
if __name__ == "__main__":
__main__()