JezerM
3 years ago
5 changed files with 227 additions and 51 deletions
@ -0,0 +1,210 @@ |
|||||||
|
import os |
||||||
|
import subprocess |
||||||
|
import shlex |
||||||
|
import re |
||||||
|
import math |
||||||
|
from threading import Thread |
||||||
|
import time |
||||||
|
|
||||||
|
running = False |
||||||
|
|
||||||
|
class Battery: |
||||||
|
|
||||||
|
_batteries = [] |
||||||
|
ac = "AC0" |
||||||
|
pspath = "/sys/class/power_supply/" |
||||||
|
perc = -1 |
||||||
|
status = "N/A" |
||||||
|
capacity = 0 |
||||||
|
time = "" |
||||||
|
watt = 0 |
||||||
|
|
||||||
|
callbacks = [] |
||||||
|
|
||||||
|
def __init__(self): |
||||||
|
if self._batteries.__len__() == 0: |
||||||
|
scandir_line(self.pspath, self._update_batteries) |
||||||
|
start_timer(self.full_update) |
||||||
|
self.full_update() |
||||||
|
|
||||||
|
def connect(self, callback): |
||||||
|
self.callbacks.append(callback) |
||||||
|
|
||||||
|
def disconnect(self, callback): |
||||||
|
self.callbacks.remove(callback) |
||||||
|
|
||||||
|
def _update_batteries(self, line): |
||||||
|
bstr = re.match(r"BAT\w+", line) |
||||||
|
if bstr: |
||||||
|
self._batteries.append(dict( |
||||||
|
name = bstr.group(), |
||||||
|
status = "N/A", |
||||||
|
perc = 0, |
||||||
|
capacity = 0, |
||||||
|
)) |
||||||
|
else: |
||||||
|
self.ac = re.match(r"A\w+", line).group() or self.ac |
||||||
|
|
||||||
|
# Based on "bat" widget from "lain" awesome-wm library |
||||||
|
# * (c) 2013, Luca CPZ |
||||||
|
# * (c) 2010-2012, Peter Hofmann |
||||||
|
# @see https://github.com/lcpz/lain/blob/master/widget/bat.lua |
||||||
|
def full_update(self): |
||||||
|
global running |
||||||
|
if running: |
||||||
|
return |
||||||
|
running = True |
||||||
|
|
||||||
|
sum_rate_current = 0 |
||||||
|
sum_rate_voltage = 0 |
||||||
|
sum_rate_power = 0 |
||||||
|
sum_rate_energy = 0 |
||||||
|
sum_energy_now = 0 |
||||||
|
sum_energy_full = 0 |
||||||
|
sum_charge_full = 0 |
||||||
|
sum_charge_design = 0 |
||||||
|
|
||||||
|
for i in range(len(self._batteries)): |
||||||
|
battery = self._batteries[i] |
||||||
|
bstr = self.pspath + battery["name"] |
||||||
|
present = read_first_line(bstr + "/present") |
||||||
|
|
||||||
|
if tonumber(present) == 1: |
||||||
|
rate_current = tonumber(read_first_line(bstr + "/current_now")) |
||||||
|
rate_voltage = tonumber(read_first_line(bstr + "/voltage_now")) |
||||||
|
rate_power = tonumber(read_first_line((bstr + "/power_now"))) |
||||||
|
charge_full = tonumber(read_first_line(bstr + "/charge_full")) |
||||||
|
charge_design = tonumber(read_first_line(bstr + "/charge_full_design")) |
||||||
|
|
||||||
|
energy_now = tonumber(read_first_line(bstr + "/energy_now") |
||||||
|
or read_first_line(bstr + "/charge_now")) |
||||||
|
energy_full = tonumber(read_first_line(bstr + "/energy_full") or charge_full) |
||||||
|
energy_percentage = tonumber(read_first_line(bstr + "/capacity") |
||||||
|
or math.floor(energy_now / energy_full * 100)) |
||||||
|
|
||||||
|
self._batteries[i]["status"] = read_first_line(bstr + "/status") or "N/A" |
||||||
|
self._batteries[i]["perc"] = energy_percentage or self._batteries[i].perc |
||||||
|
|
||||||
|
if not charge_design or charge_design == 0: |
||||||
|
self._batteries[i]["capacity"] = 0 |
||||||
|
else: |
||||||
|
self._batteries[i]["capacity"] = math.floor( |
||||||
|
charge_full / charge_design * 100) |
||||||
|
|
||||||
|
sum_rate_current = sum_rate_current + (rate_current or 0) |
||||||
|
sum_rate_voltage = sum_rate_voltage + (rate_voltage or 0) |
||||||
|
sum_rate_power = sum_rate_power + (rate_power or 0) |
||||||
|
sum_rate_energy = sum_rate_energy + (rate_power or (((rate_voltage or 0) * (rate_current or 0)) / 1e6)) |
||||||
|
sum_energy_now = sum_energy_now + (energy_now or 0) |
||||||
|
sum_energy_full = sum_energy_full + (energy_full or 0) |
||||||
|
sum_charge_full = sum_charge_full + (charge_full or 0) |
||||||
|
sum_charge_design = sum_charge_design + (charge_design or 0) |
||||||
|
|
||||||
|
self.capacity = math.floor(min(100, sum_charge_full / sum_charge_design * 100)) |
||||||
|
self.status = len(self._batteries) > 0 and self._batteries[0]["status"] or "N/A" |
||||||
|
|
||||||
|
for i in range(len(self._batteries)): |
||||||
|
battery = self._batteries[i] |
||||||
|
if battery["status"] == "Discharging" or battery["status"] == "Charging": |
||||||
|
self.status = battery["status"] |
||||||
|
|
||||||
|
self.ac_status = tonumber(read_first_line(self.pspath + self.ac + "/online")) or "N/A" |
||||||
|
|
||||||
|
if self.status != "N/A": |
||||||
|
if self.status != "Full" and sum_rate_power == 0 and self.ac_status == 1: |
||||||
|
self.perc = math.floor(min(100, |
||||||
|
sum_energy_now / sum_energy_full * 100 + 0.5)) |
||||||
|
self.time = "00:00" |
||||||
|
self.watt = 0 |
||||||
|
elif self.status != "Full": |
||||||
|
rate_time = 0 |
||||||
|
if (sum_rate_power > 0 or sum_rate_current > 0): |
||||||
|
div = (sum_rate_power > 0 and sum_rate_power) or sum_rate_current |
||||||
|
|
||||||
|
if self.status == "Charging": |
||||||
|
rate_time = (sum_energy_full - sum_energy_now) / div |
||||||
|
else: |
||||||
|
rate_time = sum_energy_now / div |
||||||
|
|
||||||
|
if 0 < rate_time and rate_time < 0.01: |
||||||
|
rate_time_magnitude = tonumber(abs(math.floor(math.log10(rate_time)))) |
||||||
|
rate_time = rate_time * 10 ^ (rate_time_magnitude - 2) |
||||||
|
|
||||||
|
hours = math.floor(rate_time) |
||||||
|
minutes = math.floor((rate_time - hours) * 60) |
||||||
|
self.perc = math.floor(min(100, (sum_energy_now / sum_energy_full) * 100) + 0.5) |
||||||
|
self.time = "{:02d}:{:02d}".format(hours, minutes) |
||||||
|
self.watt = "{:.2f}".format(sum_rate_energy / 1e6) |
||||||
|
|
||||||
|
self.perc = self.perc == None and 0 or self.perc |
||||||
|
|
||||||
|
for cb in self.callbacks: |
||||||
|
cb() |
||||||
|
|
||||||
|
time.sleep(0.1) |
||||||
|
|
||||||
|
running = False |
||||||
|
|
||||||
|
def get_name(self): |
||||||
|
return self._batteries[0]["name"] |
||||||
|
|
||||||
|
def get_level(self): |
||||||
|
return self.perc |
||||||
|
|
||||||
|
def get_state(self): |
||||||
|
return self.status |
||||||
|
|
||||||
|
def get_capacity(self): |
||||||
|
return self.capacity |
||||||
|
|
||||||
|
def get_time(self): |
||||||
|
return self.time |
||||||
|
|
||||||
|
def get_watt(self): |
||||||
|
return self.watt |
||||||
|
|
||||||
|
def acpi_listen(callback): |
||||||
|
main = subprocess.Popen(shlex.split("acpi_listen"), stdout=subprocess.PIPE, text=True) |
||||||
|
awky = subprocess.Popen(shlex.split("grep --line-buffered -E 'battery|ac_adapter'"), |
||||||
|
stdout=subprocess.PIPE, stdin=main.stdout, text=True) |
||||||
|
while True: |
||||||
|
output = awky.stdout.readline() |
||||||
|
if output == "" and awky.poll() != None: |
||||||
|
break |
||||||
|
if output: |
||||||
|
callback() |
||||||
|
rc = main.poll() |
||||||
|
return rc |
||||||
|
|
||||||
|
def scandir_line(path, callback): |
||||||
|
main = subprocess.Popen(shlex.split("ls -1 {}".format(path)), |
||||||
|
stdout=subprocess.PIPE, text=True) |
||||||
|
while True: |
||||||
|
line = main.stdout.readline() |
||||||
|
if line == "" and main.poll() != None: |
||||||
|
break |
||||||
|
if line: |
||||||
|
callback(line) |
||||||
|
|
||||||
|
def read_first_line(path): |
||||||
|
try: |
||||||
|
file = open(path, "r") |
||||||
|
first = None |
||||||
|
if file: |
||||||
|
first = file.readline() |
||||||
|
first = first.replace("\n", "") |
||||||
|
file.close() |
||||||
|
return first |
||||||
|
except Exception: |
||||||
|
return None |
||||||
|
|
||||||
|
def tonumber(asa): |
||||||
|
try: |
||||||
|
return int(asa) |
||||||
|
except Exception: |
||||||
|
return None |
||||||
|
|
||||||
|
def start_timer(callback): |
||||||
|
thread = Thread(target = acpi_listen, args=(callback,)) |
||||||
|
thread.daemon = True |
||||||
|
thread.start() |
Loading…
Reference in new issue