You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

239 lines
8.0 KiB

import subprocess
import shlex
import re
import math
from threading import Thread
import time
from logger import logger
from shutil import which
running = False
class Battery:
_batteries = []
ac = "AC0"
pspath = "/sys/class/power_supply/"
perc = -1
status = "N/A"
capacity = 0
time = ""
watt = 0
callbacks = []
def __init__(self):
if self._batteries.__len__() == 0:
scandir_line(self.pspath, self._update_batteries)
start_timer(self.full_update, self.onerror)
self.full_update()
def connect(self, callback):
self.callbacks.append(callback)
def disconnect(self, callback):
self.callbacks.remove(callback)
def onerror(self):
self._batteries = []
for cb in self.callbacks:
cb()
def _update_batteries(self, line):
bstr = re.match(r"BAT\w+", line)
if bstr:
self._batteries.append(dict(
name = bstr.group(),
status = "N/A",
perc = 0,
capacity = 0,
))
else:
self.ac = re.match(r"A\w+", line).group() or self.ac
# Based on "bat" widget from "lain" awesome-wm library
# * (c) 2013, Luca CPZ
# * (c) 2010-2012, Peter Hofmann
# @see https://github.com/lcpz/lain/blob/master/widget/bat.lua
def full_update(self):
global running
if running:
return
running = True
sum_rate_current = 0
sum_rate_voltage = 0
sum_rate_power = 0
sum_rate_energy = 0
sum_energy_now = 0
sum_energy_full = 0
sum_charge_full = 0
sum_charge_design = 0
for i in range(len(self._batteries)):
battery = self._batteries[i]
bstr = self.pspath + battery["name"]
present = read_first_line(bstr + "/present")
if tonumber(present) == 1:
rate_current = tonumber(read_first_line(bstr + "/current_now")) or 0
rate_voltage = tonumber(read_first_line(bstr + "/voltage_now")) or 0
rate_power = tonumber(read_first_line((bstr + "/power_now"))) or 0
charge_full = tonumber(read_first_line(bstr + "/charge_full")) or 0
charge_design = tonumber(read_first_line(bstr + "/charge_full_design")) or 0
energy_now = tonumber(read_first_line(bstr + "/energy_now")
or read_first_line(bstr + "/charge_now")) or 0
energy_full = tonumber(read_first_line(bstr + "/energy_full") or charge_full) or 0
energy_percentage = tonumber(read_first_line(bstr + "/capacity")
or math.floor(energy_now / energy_full * 100)) or 0
self._batteries[i]["status"] = read_first_line(bstr + "/status") or "N/A"
self._batteries[i]["perc"] = energy_percentage or self._batteries[i].perc
if not charge_design or charge_design == 0:
self._batteries[i]["capacity"] = 0
else:
self._batteries[i]["capacity"] = math.floor(
charge_full / charge_design * 100)
sum_rate_current = sum_rate_current + rate_current
sum_rate_voltage = sum_rate_voltage + rate_voltage
sum_rate_power = sum_rate_power + rate_power
sum_rate_energy = sum_rate_energy + (rate_power or ((rate_voltage * rate_current) / 1e6))
sum_energy_now = sum_energy_now + energy_now
sum_energy_full = sum_energy_full + energy_full
sum_charge_full = sum_charge_full + charge_full
sum_charge_design = sum_charge_design + charge_design
self.capacity = math.floor(min(100, sum_charge_full / sum_charge_design * 100))
self.status = len(self._batteries) > 0 and self._batteries[0]["status"] or "N/A"
for i in range(len(self._batteries)):
battery = self._batteries[i]
if battery["status"] == "Discharging" or battery["status"] == "Charging":
self.status = battery["status"]
self.ac_status = tonumber(read_first_line(self.pspath + self.ac + "/online")) or "N/A"
if self.status != "N/A":
if self.status != "Full" and sum_rate_power == 0 and self.ac_status == 1:
self.perc = math.floor(min(100,
sum_energy_now / sum_energy_full * 100 + 0.5))
self.time = "00:00"
self.watt = 0
elif self.status != "Full":
rate_time = 0
if (sum_rate_power > 0 or sum_rate_current > 0):
div = (sum_rate_power > 0 and sum_rate_power) or sum_rate_current
if self.status == "Charging":
rate_time = (sum_energy_full - sum_energy_now) / div
else:
rate_time = sum_energy_now / div
if 0 < rate_time and rate_time < 0.01:
rate_time_magnitude = tonumber(abs(math.floor(math.log10(rate_time)))) or 0
rate_time = int(rate_time * 10) ^ (rate_time_magnitude - 2)
hours = math.floor(rate_time)
minutes = math.floor((rate_time - hours) * 60)
self.perc = math.floor(min(100, (sum_energy_now / sum_energy_full) * 100) + 0.5)
self.time = "{:02d}:{:02d}".format(hours, minutes)
self.watt = "{:.2f}".format(sum_rate_energy / 1e6)
elif self.status == "Full":
self.perc = 100
self.time = "00:00"
self.watt = 0
self.perc = self.perc == None and 0 or self.perc
for cb in self.callbacks:
cb()
time.sleep(0.1)
running = False
def get_name(self):
return self._batteries[0]["name"]
def get_level(self):
return self.perc
def get_state(self):
return self.status
def get_ac_status(self):
return self.ac_status
def get_capacity(self):
return self.capacity
def get_time(self):
return self.time
def get_watt(self):
return self.watt
acpi_tries = 0
def acpi_listen(callback, onerror):
if not which("acpi_listen"):
return
global acpi_tries
try:
main = subprocess.Popen(shlex.split("acpi_listen"),
stdout=subprocess.PIPE, text=True)
awky = subprocess.Popen(shlex.split("grep --line-buffered -E 'battery|ac_adapter'"),
stdout=subprocess.PIPE, stdin=main.stdout, text=True)
while True:
output = awky.stdout.readline()
if output == "" and awky.poll() != None:
break
if output:
callback()
logger.warning("acpi_listen terminated")
if acpi_tries < 5:
acpi_tries += 1
logger.debug("Restarting acpi_listen")
return acpi_listen(callback, onerror)
else:
raise Exception("acpi_listen exceeded 5 restarts")
except Exception as err:
logger.error("Battery error: " + err.__str__())
onerror()
def scandir_line(path, callback):
main = subprocess.Popen(shlex.split("ls -1 {}".format(path)),
stdout=subprocess.PIPE, text=True)
while True:
line = main.stdout.readline()
if line == "" and main.poll() != None:
break
if line:
callback(line)
def read_first_line(path):
try:
file = open(path, "r")
first = None
if file:
first = file.readline()
first = first.replace("\n", "")
file.close()
return first
except Exception:
return None
def tonumber(asa):
try:
return int(asa)
except Exception:
return None
def start_timer(callback, onerror):
thread = Thread(target = acpi_listen, args=(callback, onerror,))
thread.daemon = True
thread.start()