408 lines
16 KiB
Python
Executable file
408 lines
16 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
#
|
|
# This script is licensed under GNU GPL version 2.0 or above
|
|
# (c) 2025 Antonio J. Delgado
|
|
"""Android Manager"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import time
|
|
import re
|
|
import logging
|
|
from logging.handlers import SysLogHandler
|
|
import click
|
|
import click_config_file
|
|
from ppadb.client import Client as AdbClient
|
|
import ppadb
|
|
import requests
|
|
import yaml
|
|
|
|
|
|
HOME_FOLDER = os.environ.get('HOME', os.environ.get('USERPROFILE', '/'))
|
|
if HOME_FOLDER == '/':
|
|
CACHE_FOLDER = '/var/cache'
|
|
LOG_FOLDER = '/var/log/'
|
|
else:
|
|
CACHE_FOLDER = f"{HOME_FOLDER}/.local/"
|
|
LOG_FOLDER = f"{HOME_FOLDER}/log/"
|
|
|
|
|
|
class AndroidManager:
|
|
"""Android Manager"""
|
|
|
|
def __init__(self, **kwargs):
|
|
self.config = kwargs
|
|
if 'log_file' not in kwargs or kwargs['log_file'] is None:
|
|
self.config['log_file'] = os.path.join(
|
|
os.environ.get(
|
|
'HOME',
|
|
os.environ.get(
|
|
'USERPROFILE',
|
|
os.getcwd()
|
|
)
|
|
),
|
|
'log',
|
|
'android_manager.log'
|
|
)
|
|
self._init_log()
|
|
self._default_data = {
|
|
"last_update": 0,
|
|
}
|
|
self.data = self._read_cached_data()
|
|
self.lines = None
|
|
self.client = AdbClient(host=self.config['host'], port=self.config['port'])
|
|
self.devices = []
|
|
for device_object in self.client.devices():
|
|
dev = {}
|
|
dev['object'] = device_object
|
|
try:
|
|
dev['properties'] = device_object.get_properties()
|
|
if 'ro.semc.product.name' in dev['properties']:
|
|
dev['name'] = dev['properties']['ro.semc.product.name']
|
|
elif 'ro.vendor.oplus.market.name' in dev['properties']:
|
|
dev['name'] = dev['properties']['ro.vendor.oplus.market.name']
|
|
elif 'ro.product.name' in dev['properties']:
|
|
dev['name'] = dev['properties']['ro.product.name']
|
|
else:
|
|
with open('/tmp/tmp.properties.json', 'w', encoding='utf-8') as debug_file:
|
|
debug_file.write(json.dumps(dev['properties'], indent=2))
|
|
self._log.warning(
|
|
"Device product name not found. Check the /tmp/tmp.properties.json file for the right property to get a product name and notify the developer."
|
|
)
|
|
device_object.shell('dumpsys bluetooth_manager', handler=self._handle_output)
|
|
for line in self.lines:
|
|
match = re.search(r'^ Name: (.*)$', line)
|
|
if match:
|
|
dev['name'] = match.group(1)
|
|
match = re.search(r'^ address: (.*)$', line)
|
|
if match:
|
|
dev['bt_address'] = match.group(1)
|
|
self.devices.append(dev)
|
|
self._log.debug(
|
|
"Found Android device: %s",
|
|
dev['name']
|
|
)
|
|
except Exception as error:
|
|
self._log.error(f"Error reading device properties. Maybe the device is off-line? {error}")
|
|
|
|
self.get_installed_apps()
|
|
|
|
def get_installed_apps(self, device_name=None):
|
|
'''Get a list of installed app in a device or all'''
|
|
for device in self.devices:
|
|
if device_name == device['name'] or device_name is None:
|
|
device['object'].shell("pm list packages -3 -f", handler=self._handle_output)
|
|
device['apps'] = []
|
|
for line in self.lines:
|
|
app = {}
|
|
match = re.search(r'^package:(.*\.apk)=(.*)$', line)
|
|
if match:
|
|
app['path'] = os.path.dirname(match.group(1))
|
|
app['id'] = match.group(2)
|
|
else:
|
|
self._log.error(
|
|
"RegExp failed to identify app in line: '%s'",
|
|
line
|
|
)
|
|
device['apps'].append(app)
|
|
|
|
def restore_apps(self, backup_text_file, device_product_name):
|
|
'''Restore apps from a text file using F-Droid repository'''
|
|
if not os.path.exists(backup_text_file):
|
|
self._log.error(
|
|
"The backup file '%s' doesn't exist.",
|
|
backup_text_file
|
|
)
|
|
return False
|
|
session = requests.Session()
|
|
with open(backup_text_file, 'r', encoding='utf-8') as backup_file:
|
|
content = backup_file.read()
|
|
for app_code in content.split():
|
|
self._log.info(
|
|
"Processing app '%s' for restoration...",
|
|
app_code
|
|
)
|
|
app_url = f"https://gitlab.com/fdroid/fdroiddata/-/raw/master/metadata/{app_code}.yml"
|
|
result = session.get(app_url)
|
|
if result.status_code > 399:
|
|
self._log.error(
|
|
"Error %s fetching app data file from '%s'.",
|
|
result.status_code,
|
|
app_url
|
|
)
|
|
continue
|
|
app_data = yaml.load(result.content, Loader=yaml.Loader)
|
|
latest_build = 0
|
|
if not app_data:
|
|
self._log.error(
|
|
"Error loading YAML data from remote application file '%s'. Result: %s",
|
|
app_url,
|
|
result.content
|
|
)
|
|
continue
|
|
if 'Builds' not in app_data:
|
|
self._log.error(
|
|
"'Builds' section not found in application data. %s",
|
|
app_data
|
|
)
|
|
continue
|
|
for build in app_data['Builds']:
|
|
if build['versionCode'] > latest_build:
|
|
build_url = f"https://verification.f-droid.org/{app_code}_{build['versionCode']}.apk.json"
|
|
result = session.get(
|
|
build_url
|
|
)
|
|
if result.status_code < 400:
|
|
latest_build = build['versionCode']
|
|
if latest_build == 0:
|
|
self._log.error(
|
|
"Unable to find the latest version code in the app data file '%s'",
|
|
app_url
|
|
)
|
|
continue
|
|
self._log.debug(
|
|
"Latest build version code name seems to be '%s'",
|
|
latest_build
|
|
)
|
|
build_url = f"https://verification.f-droid.org/{app_code}_{latest_build}.apk.json"
|
|
result = session.get(
|
|
build_url
|
|
)
|
|
if result.status_code > 399:
|
|
self._log.error(
|
|
"Error %s fetching build information from file '%s'.",
|
|
result.status_code,
|
|
build_url
|
|
)
|
|
continue
|
|
build_data = json.loads(result.content)
|
|
apk_file_url = None
|
|
for build in build_data.keys():
|
|
apk_file_url = build_data[build]['url']
|
|
self._log.debug(
|
|
"Found file: %s",
|
|
apk_file_url
|
|
)
|
|
if not apk_file_url:
|
|
self._log.error(
|
|
"Error finding APK file in '%s'. %s",
|
|
build_url,
|
|
result.content
|
|
)
|
|
continue
|
|
result = session.get(
|
|
apk_file_url
|
|
)
|
|
apk_file = os.path.join(os.path.dirname(backup_text_file), f"{app_code}.apk")
|
|
target_device = None
|
|
with open(apk_file, 'w+b') as apk_object_file:
|
|
apk_object_file.write(result.content)
|
|
if device_product_name:
|
|
for device in self.devices:
|
|
if device['name'] == device_product_name:
|
|
target_device = device
|
|
else:
|
|
target_device=self.devices[0]
|
|
if target_device:
|
|
self._log.debug(
|
|
"Installing app '%s' with version code '%s' to device '%s'...",
|
|
app_code,
|
|
latest_build,
|
|
target_device['name']
|
|
)
|
|
try:
|
|
target_device['object'].install(apk_file)
|
|
self._log.debug(
|
|
"Installed without errors."
|
|
)
|
|
except ppadb.InstallError as error:
|
|
if 'INSTALL_FAILED_ALREADY_EXISTS' in f"{error}":
|
|
self._log.debug(
|
|
'App already installed, not installing again. %s',
|
|
error
|
|
)
|
|
else:
|
|
self._log.error(
|
|
"Error installing application '%s' with version code '%s' to device '%s'. %s",
|
|
app_code,
|
|
latest_build,
|
|
target_device['name'],
|
|
error
|
|
)
|
|
return True
|
|
|
|
def backup_apps(self, backup_path, just_names):
|
|
'''Backup apps either to a text file or to a APK files'''
|
|
for device in self.devices:
|
|
local_directory = os.path.join(backup_path, device.get('name', 'unknown_device'))
|
|
if not os.path.exists(local_directory):
|
|
os.mkdir(local_directory)
|
|
if just_names and os.path.exists(os.path.join(local_directory, 'apks_list.txt')):
|
|
os.remove(os.path.join(local_directory, 'apks_list.txt'))
|
|
for app in device['apps']:
|
|
remote_filename = f"{app['path']}/base.apk"
|
|
if not just_names:
|
|
local_filename = os.path.join(local_directory, f"{app['id']}.apk")
|
|
self._log.info(
|
|
"Saving '%s' into '%s'...",
|
|
remote_filename,
|
|
local_filename
|
|
)
|
|
device['object'].pull(remote_filename, local_filename)
|
|
else:
|
|
file_name = os.path.join(local_directory, 'apks_list.txt')
|
|
self._log.info(
|
|
"Saving list of apps into file '%s'...",
|
|
file_name
|
|
)
|
|
with open(file_name, 'a', encoding='UTF-8') as list_file:
|
|
list_file.write(f"{app['id']}\n")
|
|
|
|
def _handle_output(self, connection):
|
|
buffer = ""
|
|
while True:
|
|
data = connection.read(1024)
|
|
if not data:
|
|
break
|
|
# print(data)
|
|
buffer += data.decode('utf-8', errors='ignore')
|
|
|
|
connection.close()
|
|
self.lines = buffer.strip().split('\n')
|
|
|
|
def close(self):
|
|
'''Close class and save data'''
|
|
self._save_cached_data(self.data)
|
|
|
|
def _read_cached_data(self):
|
|
if os.path.exists(self.config['cache_file']):
|
|
with open(self.config['cache_file'], 'r', encoding='utf-8') as cache_file:
|
|
try:
|
|
cached_data = json.load(cache_file)
|
|
if (
|
|
'last_update' in cached_data and
|
|
cached_data['last_update'] + self.config['max_cache_age'] > time.time()
|
|
):
|
|
cached_data = self._default_data
|
|
except json.decoder.JSONDecodeError:
|
|
cached_data = self._default_data
|
|
return cached_data
|
|
else:
|
|
return self._default_data
|
|
|
|
def _save_cached_data(self, data):
|
|
data['last_update'] = time.time()
|
|
with open(self.config['cache_file'], 'w', encoding='utf-8') as cache_file:
|
|
json.dump(data, cache_file, indent=2)
|
|
self._log.debug(
|
|
"Saved cached data in '%s'",
|
|
self.config['cache_file']
|
|
)
|
|
|
|
def _init_log(self):
|
|
''' Initialize log object '''
|
|
self._log = logging.getLogger("android_manager")
|
|
self._log.setLevel(logging.DEBUG)
|
|
|
|
sysloghandler = SysLogHandler()
|
|
sysloghandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(sysloghandler)
|
|
|
|
streamhandler = logging.StreamHandler(sys.stdout)
|
|
streamhandler.setLevel(
|
|
logging.getLevelName(self.config.get("debug_level", 'INFO'))
|
|
)
|
|
self._log.addHandler(streamhandler)
|
|
|
|
if 'log_file' in self.config:
|
|
log_file = self.config['log_file']
|
|
else:
|
|
home_folder = os.environ.get(
|
|
'HOME', os.environ.get('USERPROFILE', '')
|
|
)
|
|
log_folder = os.path.join(home_folder, "log")
|
|
log_file = os.path.join(log_folder, "android_manager.log")
|
|
|
|
if not os.path.exists(os.path.dirname(log_file)):
|
|
os.mkdir(os.path.dirname(log_file))
|
|
|
|
filehandler = logging.handlers.RotatingFileHandler(
|
|
log_file, maxBytes=102400000
|
|
)
|
|
# create formatter
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
|
|
)
|
|
filehandler.setFormatter(formatter)
|
|
filehandler.setLevel(logging.DEBUG)
|
|
self._log.addHandler(filehandler)
|
|
return True
|
|
|
|
|
|
@click.group()
|
|
@click.option(
|
|
"--debug-level",
|
|
"-d",
|
|
default="INFO",
|
|
type=click.Choice(
|
|
["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
|
|
case_sensitive=False,
|
|
),
|
|
help='Set the debug level for the standard output.'
|
|
)
|
|
@click.option(
|
|
'--log-file',
|
|
'-l',
|
|
default=f"{LOG_FOLDER}/android_manager.log",
|
|
help="File to store all debug messages."
|
|
)
|
|
@click.option(
|
|
'--cache-file',
|
|
'-f',
|
|
default=f"{CACHE_FOLDER}/android_manager.json",
|
|
help='Cache file to store data from each run',
|
|
)
|
|
@click.option(
|
|
'--max-cache-age',
|
|
'-a',
|
|
default=60*60*24*7,
|
|
help='Max age in seconds for the cache'
|
|
)
|
|
@click.option('--host', '-H', default='127.0.0.1', help='ADB server host name or IP.')
|
|
@click.option('--port', '-p', default=5037, help='ADB server port to use.')
|
|
# @click.option("--dummy","-n", is_flag=True,
|
|
# help="Don't do anything, just show what would be done.")
|
|
@click_config_file.configuration_option()
|
|
@click.pass_context
|
|
def cli(ctx, **kwargs):
|
|
'''CLI function'''
|
|
ctx.ensure_object(dict)
|
|
ctx.obj['config'] = kwargs
|
|
ctx.obj['android_manager'] = AndroidManager(**kwargs)
|
|
|
|
@cli.command()
|
|
@click.option('--backup-path', '-p', required=True, help='Path to save the backed APKs')
|
|
@click.option('--just-names', '-n', is_flag=True, help='Save only the filename')
|
|
@click_config_file.configuration_option()
|
|
@click.pass_context
|
|
def backup_apps(ctx, backup_path, just_names):
|
|
'''Backup Android apps'''
|
|
ctx.obj['android_manager'].backup_apps(backup_path, just_names)
|
|
|
|
@cli.command()
|
|
@click.option('--backup-text-file', '-p', required=True, help='Path to save the backed APKs')
|
|
@click.option(
|
|
'--device-product-name',
|
|
'-n',
|
|
help='Device product name to install the apps. If not indicated it will install in the first device found.'
|
|
)
|
|
@click_config_file.configuration_option()
|
|
@click.pass_context
|
|
def restore_apps(ctx, backup_text_file, device_product_name):
|
|
'''Restore Android apps'''
|
|
ctx.obj['android_manager'].restore_apps(backup_text_file, device_product_name)
|
|
|
|
if __name__ == "__main__":
|
|
cli(obj={})
|