You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
8.1 KiB

import os
import subprocess
import shlex
import re
import math
from threading import Thread
import time
from logging import (
getLogger,
DEBUG,
Formatter,
StreamHandler,
)
log_format = ''.join([
'%(asctime)s [ %(levelname)s ] %(filename)s %(',
'lineno)d: %(message)s'
])
formatter = Formatter(fmt=log_format, datefmt="%Y-%m-%d %H:%M:%S")
logger = getLogger("battery")
logger.propagate = False
stream_handler = StreamHandler()
stream_handler.setLevel(DEBUG)
stream_handler.setFormatter(formatter)
logger.setLevel(DEBUG)
logger.addHandler(stream_handler)
running = False
class Battery:
_batteries = []
ac = "AC0"
pspath = "/sys/class/power_supply/"
perc = -1
status = "N/A"
capacity = 0
time = ""
watt = 0
callbacks = []
def __init__(self):
if self._batteries.__len__() == 0:
scandir_line(self.pspath, self._update_batteries)
start_timer(self.full_update, self.onerror)
self.full_update()
def connect(self, callback):
self.callbacks.append(callback)
def disconnect(self, callback):
self.callbacks.remove(callback)
def onerror(self):
self._batteries = []
for cb in self.callbacks:
cb()
def _update_batteries(self, line):
bstr = re.match(r"BAT\w+", line)
if bstr:
self._batteries.append(dict(
name = bstr.group(),
status = "N/A",
perc = 0,
capacity = 0,
))
else:
self.ac = re.match(r"A\w+", line).group() or self.ac
# Based on "bat" widget from "lain" awesome-wm library
# * (c) 2013, Luca CPZ
# * (c) 2010-2012, Peter Hofmann
# @see https://github.com/lcpz/lain/blob/master/widget/bat.lua
def full_update(self):
global running
if running:
return
running = True
sum_rate_current = 0
sum_rate_voltage = 0
sum_rate_power = 0
sum_rate_energy = 0
sum_energy_now = 0
sum_energy_full = 0
sum_charge_full = 0
sum_charge_design = 0
for i in range(len(self._batteries)):
battery = self._batteries[i]
bstr = self.pspath + battery["name"]
present = read_first_line(bstr + "/present")
if tonumber(present) == 1:
rate_current = tonumber(read_first_line(bstr + "/current_now"))
rate_voltage = tonumber(read_first_line(bstr + "/voltage_now"))
rate_power = tonumber(read_first_line((bstr + "/power_now")))
charge_full = tonumber(read_first_line(bstr + "/charge_full"))
charge_design = tonumber(read_first_line(bstr + "/charge_full_design"))
energy_now = tonumber(read_first_line(bstr + "/energy_now")
or read_first_line(bstr + "/charge_now"))
energy_full = tonumber(read_first_line(bstr + "/energy_full") or charge_full)
energy_percentage = tonumber(read_first_line(bstr + "/capacity")
or math.floor(energy_now / energy_full * 100))
self._batteries[i]["status"] = read_first_line(bstr + "/status") or "N/A"
self._batteries[i]["perc"] = energy_percentage or self._batteries[i].perc
if not charge_design or charge_design == 0:
self._batteries[i]["capacity"] = 0
else:
self._batteries[i]["capacity"] = math.floor(
charge_full / charge_design * 100)
sum_rate_current = sum_rate_current + (rate_current or 0)
sum_rate_voltage = sum_rate_voltage + (rate_voltage or 0)
sum_rate_power = sum_rate_power + (rate_power or 0)
sum_rate_energy = sum_rate_energy + (rate_power or (((rate_voltage or 0) * (rate_current or 0)) / 1e6))
sum_energy_now = sum_energy_now + (energy_now or 0)
sum_energy_full = sum_energy_full + (energy_full or 0)
sum_charge_full = sum_charge_full + (charge_full or 0)
sum_charge_design = sum_charge_design + (charge_design or 0)
self.capacity = math.floor(min(100, sum_charge_full / sum_charge_design * 100))
self.status = len(self._batteries) > 0 and self._batteries[0]["status"] or "N/A"
for i in range(len(self._batteries)):
battery = self._batteries[i]
if battery["status"] == "Discharging" or battery["status"] == "Charging":
self.status = battery["status"]
self.ac_status = tonumber(read_first_line(self.pspath + self.ac + "/online")) or "N/A"
if self.status != "N/A":
if self.status != "Full" and sum_rate_power == 0 and self.ac_status == 1:
self.perc = math.floor(min(100,
sum_energy_now / sum_energy_full * 100 + 0.5))
self.time = "00:00"
self.watt = 0
elif self.status != "Full":
rate_time = 0
if (sum_rate_power > 0 or sum_rate_current > 0):
div = (sum_rate_power > 0 and sum_rate_power) or sum_rate_current
if self.status == "Charging":
rate_time = (sum_energy_full - sum_energy_now) / div
else:
rate_time = sum_energy_now / div
if 0 < rate_time and rate_time < 0.01:
rate_time_magnitude = tonumber(abs(math.floor(math.log10(rate_time))))
rate_time = rate_time * 10 ^ (rate_time_magnitude - 2)
hours = math.floor(rate_time)
minutes = math.floor((rate_time - hours) * 60)
self.perc = math.floor(min(100, (sum_energy_now / sum_energy_full) * 100) + 0.5)
self.time = "{:02d}:{:02d}".format(hours, minutes)
self.watt = "{:.2f}".format(sum_rate_energy / 1e6)
self.perc = self.perc == None and 0 or self.perc
for cb in self.callbacks:
cb()
time.sleep(0.1)
running = False
def get_name(self):
return self._batteries[0]["name"]
def get_level(self):
return self.perc
def get_state(self):
return self.status
def get_capacity(self):
return self.capacity
def get_time(self):
return self.time
def get_watt(self):
return self.watt
acpi_tries = 0
def acpi_listen(callback, onerror):
global acpi_tries
try:
main = subprocess.Popen(shlex.split("acpi_listen"),
stdout=subprocess.PIPE, text=True)
awky = subprocess.Popen(shlex.split("grep --line-buffered -E 'battery|ac_adapter'"),
stdout=subprocess.PIPE, stdin=main.stdout, text=True)
while True:
output = awky.stdout.readline()
if output == "" and awky.poll() != None:
break
if output:
callback()
logger.warning("acpi_listen terminated")
if acpi_tries < 5:
acpi_tries += 1
logger.debug("Restarting acpi_listen")
return acpi_listen(callback, onerror)
else:
raise Exception("acpi_listen exceeded 5 restarts")
except Exception as err:
logger.error("Battery error: " + err.__str__())
onerror()
def scandir_line(path, callback):
main = subprocess.Popen(shlex.split("ls -1 {}".format(path)),
stdout=subprocess.PIPE, text=True)
while True:
line = main.stdout.readline()
if line == "" and main.poll() != None:
break
if line:
callback(line)
def read_first_line(path):
try:
file = open(path, "r")
first = None
if file:
first = file.readline()
first = first.replace("\n", "")
file.close()
return first
except Exception:
return None
def tonumber(asa):
try:
return int(asa)
except Exception:
return None
def start_timer(callback, onerror):
thread = Thread(target = acpi_listen, args=(callback, onerror,))
thread.daemon = True
thread.start()