From 5f421cc02494f722a0a975abb1a3278101356434 Mon Sep 17 00:00:00 2001 From: JezerM Date: Sun, 1 Aug 2021 22:17:15 -0600 Subject: [PATCH] Added battery signal. Now using acpi_listen, instead of ambiguous timer --- themes/gruvbox/js/battery.js | 1 - themes/gruvbox/js/index.js | 2 + web-greeter/bridge/Greeter.py | 33 ++---- web-greeter/bridge/__init__.py | 32 +---- web-greeter/utils/battery.py | 210 +++++++++++++++++++++++++++++++++ 5 files changed, 227 insertions(+), 51 deletions(-) create mode 100644 web-greeter/utils/battery.py diff --git a/themes/gruvbox/js/battery.js b/themes/gruvbox/js/battery.js index 64714be..dfe137f 100644 --- a/themes/gruvbox/js/battery.js +++ b/themes/gruvbox/js/battery.js @@ -6,7 +6,6 @@ class Battery { } _updateData() { - lightdm.batteryUpdate() this._info = lightdm.batteryData var level = this._info.level var state = this._info.state diff --git a/themes/gruvbox/js/index.js b/themes/gruvbox/js/index.js index 99c56e9..37d0705 100644 --- a/themes/gruvbox/js/index.js +++ b/themes/gruvbox/js/index.js @@ -28,6 +28,8 @@ async function initGreeter() { lightdm.brightness_update?.connect(() => brightness._updateData()) + lightdm.battery_update?.connect(() => battery._updateData()) + accounts = new Accounts() sessions = new Sessions() diff --git a/web-greeter/bridge/Greeter.py b/web-greeter/bridge/Greeter.py index 10479e0..b9c5307 100644 --- a/web-greeter/bridge/Greeter.py +++ b/web-greeter/bridge/Greeter.py @@ -43,7 +43,6 @@ from PyQt5.QtCore import QTimer # This Application from . import ( - Battery, language_to_dict, layout_to_dict, session_to_dict, @@ -52,6 +51,8 @@ from . import ( logger ) +import utils.battery as battery + LightDMGreeter = LightDM.Greeter() LightDMUsers = LightDM.UserList() @@ -81,21 +82,6 @@ def getBrightness(self): logger.error("Brightness: {}".format(err)) return -1 - -def updateBattery(self): - if self._config.features.battery != True: - return - try: - acpi = subprocess.run(["acpi", "-b"], stdout=subprocess.PIPE, - stderr=subprocess.PIPE, text=True, check=True) - - self._battery.update(acpi.stdout) - except Exception as err: - logger.error("Battery: {}".format(err)) - else: - self.property_changed.emit() - - class Greeter(BridgeObject): # LightDM.Greeter Signals @@ -107,11 +93,12 @@ class Greeter(BridgeObject): show_prompt = bridge.signal(str, LightDM.PromptType, arguments=('text', 'type')) brightness_update = bridge.signal() + battery_update = bridge.signal() noop_signal = bridge.signal() property_changed = bridge.signal() - _battery = Battery() + _battery = None def __init__(self, config, *args, **kwargs): super().__init__(name='LightDMGreeter', *args, **kwargs) @@ -120,6 +107,9 @@ class Greeter(BridgeObject): self._shared_data_directory = '' self._themes_directory = config.themes_dir + if self._config.features.battery == True: + self._battery = battery.Battery() + LightDMGreeter.connect_to_daemon_sync() self._connect_signals() @@ -152,6 +142,9 @@ class Greeter(BridgeObject): lambda greeter, msg, mtype: self._emit_signal(self.show_prompt, msg, mtype) ) + if self._battery: + self._battery.connect(lambda: self.battery_update.emit()) + def _emit_signal(self, _signal, *args): self.property_changed.emit() QTimer().singleShot(300, lambda: _signal.emit(*args)) @@ -172,7 +165,7 @@ class Greeter(BridgeObject): def autologin_user(self): return LightDMGreeter.get_autologin_user_hint() - @bridge.prop(Variant, notify=property_changed) + @bridge.prop(Variant, notify=battery_update) def batteryData(self): return battery_to_dict(self._battery) @@ -311,10 +304,6 @@ class Greeter(BridgeObject): LightDMGreeter.authenticate_as_guest() self.property_changed.emit() - @bridge.method() - def batteryUpdate(self): - return updateBattery(self) - @bridge.method(int) def brightnessSet(self, quantity): thread = threading.Thread(target=changeBrightness, diff --git a/web-greeter/bridge/__init__.py b/web-greeter/bridge/__init__.py index 11faec7..d005d9e 100644 --- a/web-greeter/bridge/__init__.py +++ b/web-greeter/bridge/__init__.py @@ -66,33 +66,6 @@ class setInterval: self.action() -class Battery: - _name = "" - _level = -1 - _state = "" - - def __init__(self): - pass - - def update(self, acpi: str): - formatted = re.sub("%|,|\n", "", acpi) - colon = formatted.split(": ") - splitted = colon[1].split(" ") - - self._name = colon[0] - self._level = int(splitted[1]) - self._state = splitted[0] - - def get_name(self): - return self._name - - def get_level(self): - return self._level - - def get_state(self): - return self._state - - def language_to_dict(lang): if (not lang): return dict() @@ -141,7 +114,10 @@ def battery_to_dict(battery): return dict( name = battery.get_name(), level = battery.get_level(), - state = battery.get_state() + state = battery.get_state(), + capacity = battery.get_capacity(), + time = battery.get_time(), + watt = battery.get_watt() ) diff --git a/web-greeter/utils/battery.py b/web-greeter/utils/battery.py new file mode 100644 index 0000000..ae6e2de --- /dev/null +++ b/web-greeter/utils/battery.py @@ -0,0 +1,210 @@ +import os +import subprocess +import shlex +import re +import math +from threading import Thread +import time + +running = False + +class Battery: + + _batteries = [] + ac = "AC0" + pspath = "/sys/class/power_supply/" + perc = -1 + status = "N/A" + capacity = 0 + time = "" + watt = 0 + + callbacks = [] + + def __init__(self): + if self._batteries.__len__() == 0: + scandir_line(self.pspath, self._update_batteries) + start_timer(self.full_update) + self.full_update() + + def connect(self, callback): + self.callbacks.append(callback) + + def disconnect(self, callback): + self.callbacks.remove(callback) + + def _update_batteries(self, line): + bstr = re.match(r"BAT\w+", line) + if bstr: + self._batteries.append(dict( + name = bstr.group(), + status = "N/A", + perc = 0, + capacity = 0, + )) + else: + self.ac = re.match(r"A\w+", line).group() or self.ac + + # Based on "bat" widget from "lain" awesome-wm library + # * (c) 2013, Luca CPZ + # * (c) 2010-2012, Peter Hofmann + # @see https://github.com/lcpz/lain/blob/master/widget/bat.lua + def full_update(self): + global running + if running: + return + running = True + + sum_rate_current = 0 + sum_rate_voltage = 0 + sum_rate_power = 0 + sum_rate_energy = 0 + sum_energy_now = 0 + sum_energy_full = 0 + sum_charge_full = 0 + sum_charge_design = 0 + + for i in range(len(self._batteries)): + battery = self._batteries[i] + bstr = self.pspath + battery["name"] + present = read_first_line(bstr + "/present") + + if tonumber(present) == 1: + rate_current = tonumber(read_first_line(bstr + "/current_now")) + rate_voltage = tonumber(read_first_line(bstr + "/voltage_now")) + rate_power = tonumber(read_first_line((bstr + "/power_now"))) + charge_full = tonumber(read_first_line(bstr + "/charge_full")) + charge_design = tonumber(read_first_line(bstr + "/charge_full_design")) + + energy_now = tonumber(read_first_line(bstr + "/energy_now") + or read_first_line(bstr + "/charge_now")) + energy_full = tonumber(read_first_line(bstr + "/energy_full") or charge_full) + energy_percentage = tonumber(read_first_line(bstr + "/capacity") + or math.floor(energy_now / energy_full * 100)) + + self._batteries[i]["status"] = read_first_line(bstr + "/status") or "N/A" + self._batteries[i]["perc"] = energy_percentage or self._batteries[i].perc + + if not charge_design or charge_design == 0: + self._batteries[i]["capacity"] = 0 + else: + self._batteries[i]["capacity"] = math.floor( + charge_full / charge_design * 100) + + sum_rate_current = sum_rate_current + (rate_current or 0) + sum_rate_voltage = sum_rate_voltage + (rate_voltage or 0) + sum_rate_power = sum_rate_power + (rate_power or 0) + sum_rate_energy = sum_rate_energy + (rate_power or (((rate_voltage or 0) * (rate_current or 0)) / 1e6)) + sum_energy_now = sum_energy_now + (energy_now or 0) + sum_energy_full = sum_energy_full + (energy_full or 0) + sum_charge_full = sum_charge_full + (charge_full or 0) + sum_charge_design = sum_charge_design + (charge_design or 0) + + self.capacity = math.floor(min(100, sum_charge_full / sum_charge_design * 100)) + self.status = len(self._batteries) > 0 and self._batteries[0]["status"] or "N/A" + + for i in range(len(self._batteries)): + battery = self._batteries[i] + if battery["status"] == "Discharging" or battery["status"] == "Charging": + self.status = battery["status"] + + self.ac_status = tonumber(read_first_line(self.pspath + self.ac + "/online")) or "N/A" + + if self.status != "N/A": + if self.status != "Full" and sum_rate_power == 0 and self.ac_status == 1: + self.perc = math.floor(min(100, + sum_energy_now / sum_energy_full * 100 + 0.5)) + self.time = "00:00" + self.watt = 0 + elif self.status != "Full": + rate_time = 0 + if (sum_rate_power > 0 or sum_rate_current > 0): + div = (sum_rate_power > 0 and sum_rate_power) or sum_rate_current + + if self.status == "Charging": + rate_time = (sum_energy_full - sum_energy_now) / div + else: + rate_time = sum_energy_now / div + + if 0 < rate_time and rate_time < 0.01: + rate_time_magnitude = tonumber(abs(math.floor(math.log10(rate_time)))) + rate_time = rate_time * 10 ^ (rate_time_magnitude - 2) + + hours = math.floor(rate_time) + minutes = math.floor((rate_time - hours) * 60) + self.perc = math.floor(min(100, (sum_energy_now / sum_energy_full) * 100) + 0.5) + self.time = "{:02d}:{:02d}".format(hours, minutes) + self.watt = "{:.2f}".format(sum_rate_energy / 1e6) + + self.perc = self.perc == None and 0 or self.perc + + for cb in self.callbacks: + cb() + + time.sleep(0.1) + + running = False + + def get_name(self): + return self._batteries[0]["name"] + + def get_level(self): + return self.perc + + def get_state(self): + return self.status + + def get_capacity(self): + return self.capacity + + def get_time(self): + return self.time + + def get_watt(self): + return self.watt + +def acpi_listen(callback): + main = subprocess.Popen(shlex.split("acpi_listen"), stdout=subprocess.PIPE, text=True) + awky = subprocess.Popen(shlex.split("grep --line-buffered -E 'battery|ac_adapter'"), + stdout=subprocess.PIPE, stdin=main.stdout, text=True) + while True: + output = awky.stdout.readline() + if output == "" and awky.poll() != None: + break + if output: + callback() + rc = main.poll() + return rc + +def scandir_line(path, callback): + main = subprocess.Popen(shlex.split("ls -1 {}".format(path)), + stdout=subprocess.PIPE, text=True) + while True: + line = main.stdout.readline() + if line == "" and main.poll() != None: + break + if line: + callback(line) + +def read_first_line(path): + try: + file = open(path, "r") + first = None + if file: + first = file.readline() + first = first.replace("\n", "") + file.close() + return first + except Exception: + return None + +def tonumber(asa): + try: + return int(asa) + except Exception: + return None + +def start_timer(callback): + thread = Thread(target = acpi_listen, args=(callback,)) + thread.daemon = True + thread.start()