xplora_ha/xplora_ha/xplora_ha.py

345 lines
10 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
#
# This script is licensed under GNU GPL version 2.0 or above
# (c) 2025 Antonio J. Delgado
"""Integrate Xplora watches into Home Assistant"""
import sys
import os
import json
import time
import logging
from logging.handlers import SysLogHandler
import click
import click_config_file
import yaml
from pyxplora_api.pyxplora_api import PyXploraApi
import requests
HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/'))
if HOME_FOLDER == '/':
CACHE_FOLDER = '/var/cache'
LOG_FOLDER = '/var/log/'
else:
CACHE_FOLDER = f"{HOME_FOLDER}/.local/"
LOG_FOLDER = f"{HOME_FOLDER}/log/"
WATCH_STATES = [
{
"function": "getWatchBattery",
"entity_name": "battery_level",
"type": "percentage"
},
{
"function": "getWatchIsCharging",
"entity_name": "battery_charging",
"type": "boolean"
},
{
"function": "getWatchIsInSafeZone",
"entity_name": "in_safe_zone",
"type": "boolean"
},
{
"function": "getWatchLastLocation",
"entity_name": "last_location"
},
{
"function": "getWatchOnlineStatus",
"entity_name": "online_status"
},
{
"function": "getWatchSafeZoneLabel",
"entity_name": "safe_zone_label",
"type": "string"
},
# {
# "function": "getWatchState",
# "entity_name": "state"
# },
{
"function": "getWatchUnReadChatMsgCount",
"entity_name": "unread_chat_count",
"type": "integer"
},
{
"function": "getWatchUserCurrentStep",
"entity_name": "current_step",
"type": "integer"
},
{
"function": "getWatchUserSteps",
"params": {
"date": time.time()
},
"entity_name": "user_steps",
"type": "integer"
},
{
"function": "getWatchUserTotalStep",
"entity_name": "user_total_step",
"type": "integer"
}
]
class XploraHa:
"""Integrate Xplora watches into Home Assistant"""
def __init__(self, **kwargs):
self.config = kwargs
if 'log_file' not in kwargs or kwargs['log_file'] is None:
self.config['log_file'] = os.path.join(
os.environ.get(
'HOME',
os.environ.get(
'USERPROFILE',
os.getcwd()
)
),
'log',
'xplora_ha.log'
)
self._init_log()
self._default_data = {
"last_update": 0,
}
self.data = self._read_cached_data()
headers = {
"Authorization": f"Bearer {self.config['home_assistant_token']}",
"content-type": "application/json",
}
self.ha_session = requests.Session()
self.ha_session.headers.update(headers)
self.xplora = PyXploraApi(
self.config['xplora_country_code'],
self.config['xplora_phone_number'],
self.config['xplora_password']
)
self.xplora.init(forceLogin=False, signup=True)
while True:
self._process_watches()
time.sleep(self.config['interval'])
def _process_watches(self):
for watch in self.xplora.watchs:
self._debug(watch['ward'])
base_sensor_entity = f"sensor.xplora_watch_{watch['ward']['name']}_"
self._publish_ha_state(
f"{base_sensor_entity}last_update",
{
"state": time.time(),
'device_class': 'timestamp'
}
)
for watch_state in WATCH_STATES:
function = getattr(self.xplora, watch_state['function'])
state = {
"state": ""
}
if 'params' in watch_state:
if 'date' in watch_state['params']:
watch_state['params']['date'] = time.time()
state['state'] = function(watch['ward']['id'], **watch_state['params'])
else:
state['state'] = function(watch['ward']['id'])
if 'type' in watch_state:
if watch_state['type'] == 'percentage':
state['unit_of_measurementstate'] = "%"
elif watch_state['type'] == 'date':
state['device_class'] = 'timestamp'
self._publish_ha_state(
f"{base_sensor_entity}{watch_state['entity_name']}",
state
)
def _publish_ha_state(self, entity_id, state):
self._debug(
{ "posting": { "entity": entity_id, "state": state } }
)
result = self.ha_session.post(
f"{self.config['home_assistant_url']}/api/states/{entity_id}",
json=state
)
self._debug( { "result": result.json() })
def close(self):
'''Close class and save data'''
self._save_cached_data(self.data)
def _read_cached_data(self):
if os.path.exists(self.config['cache_file']):
with open(self.config['cache_file'], 'r', encoding='utf-8') as cache_file:
try:
cached_data = json.load(cache_file)
if (
'last_update' in cached_data and
cached_data['last_update'] + self.config['max_cache_age'] > time.time()
):
cached_data = self._default_data
except json.decoder.JSONDecodeError:
cached_data = self._default_data
return cached_data
else:
return self._default_data
def _save_cached_data(self, data):
data['last_update'] = time.time()
with open(self.config['cache_file'], 'w', encoding='utf-8') as cache_file:
json.dump(data, cache_file, indent=2)
self._debug(
f"Saved cached data in '{self.config['cache_file']}'",
)
def _output(self, message):
if self.config['output_format'] == 'JSON':
return json.dumps(message, indent=2)
elif self.config['output_format'] == 'YAML':
return yaml.dump(message, Dumper=yaml.Dumper)
elif self.config['output_format'] == 'PLAIN':
return f"{message}"
else:
self._log.warning(
"Output format '%s' not supported",
self.config['output_format']
)
return message
def _info(self, message):
return self._log.info(self._output(message))
def _warning(self, message):
return self._log.warning(self._output(message))
def _error(self, message):
return self._log.error(self._output(message))
def _debug(self, message):
return self._log.debug(self._output(message))
def _init_log(self):
''' Initialize log object '''
self._log = logging.getLogger("xplora_ha")
self._log.setLevel(logging.DEBUG)
sysloghandler = SysLogHandler()
sysloghandler.setLevel(logging.DEBUG)
self._log.addHandler(sysloghandler)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setLevel(
logging.getLevelName(self.config.get("debug_level", 'INFO'))
)
self._log.addHandler(streamhandler)
if 'log_file' in self.config:
log_file = self.config['log_file']
else:
home_folder = os.environ.get(
'HOME', os.environ.get('USERPROFILE', '')
)
log_folder = os.path.join(home_folder, "log")
log_file = os.path.join(log_folder, "xplora_ha.log")
if not os.path.exists(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
filehandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=102400000
)
# create formatter
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
)
filehandler.setFormatter(formatter)
filehandler.setLevel(logging.DEBUG)
self._log.addHandler(filehandler)
return True
@click.command()
@click.option(
"--debug-level",
"-d",
default="INFO",
type=click.Choice(
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
case_sensitive=False,
),
help='Set the debug level for the standard output.'
)
@click.option(
"--output-format",
"-o",
default="JSON",
type=click.Choice(
["JSON", "YAML", "CSV", "PLAIN"],
case_sensitive=False,
),
help='Set the output format.'
)
@click.option(
'--log-file',
'-l',
default=f"{LOG_FOLDER}/xplora_ha.log",
help="File to store all debug messages."
)
@click.option(
'--cache-file',
'-f',
default=f"{CACHE_FOLDER}/xplora_ha.json",
help='Cache file to store data from each run',
)
@click.option(
'--max-cache-age',
'-a',
default=60*60*24*7,
help='Max age in seconds for the cache'
)
@click.option(
'--home-assistant-url',
'-u',
required=True,
help='Home Assistant URL, like http://example.org:8123'
)
@click.option(
'--home-assistant-token',
'-t',
required=True,
help='Home Assistant token'
)
@click.option(
'--xplora-country-code',
'-c',
required=True,
help='Xplora phone country code'
)
@click.option(
'--xplora-phone-number',
'-p',
required=True,
help='Xplora phone number of your account'
)
@click.option(
'--xplora-password',
'-P',
required=True,
help='Xplora account password'
)
@click.option(
'--interval',
'-i',
default=30,
help='Interval in seconds between checks on the Xplora API'
)
# @click.option("--dummy","-n", is_flag=True,
# help="Don't do anything, just show what would be done.")
@click_config_file.configuration_option()
def __main__(**kwargs):
obj = XploraHa(**kwargs)
obj.close()
if __name__ == "__main__":
__main__()