151 lines
4.3 KiB
Python
151 lines
4.3 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
# Copyright (c) 2023 Claudio Maradonna
|
||
|
|
||
|
# PYTHON_ARGCOMPLETE_OK
|
||
|
|
||
|
from luma.core.interface.serial import i2c
|
||
|
from luma.core.render import canvas
|
||
|
from luma.oled.device import ssd1306, ssd1325, ssd1331, sh1106
|
||
|
from PIL import ImageFont, ImageDraw, Image
|
||
|
|
||
|
from time import sleep
|
||
|
import time
|
||
|
|
||
|
import re
|
||
|
import os
|
||
|
from shutil import which
|
||
|
|
||
|
from pprint import pprint
|
||
|
|
||
|
serial = i2c(port=0, address=0x3C)
|
||
|
device = ssd1306(serial, rotate=2)
|
||
|
|
||
|
s_width = 128
|
||
|
s_height = 64
|
||
|
|
||
|
def zpool_parse_data(data):
|
||
|
parts = re.split(r'(?:\n|^)\s*(\w*):\s*', data.strip(), re.MULTILINE)[1:]
|
||
|
parsed = dict(zip(parts[::2], parts[1::2]))
|
||
|
return {
|
||
|
**parsed,
|
||
|
'config': zpool_parse_config(parsed.get('config', ''))
|
||
|
}
|
||
|
|
||
|
def zpool_parse_config(data):
|
||
|
lines = [v.strip().split() for v in data.splitlines() if v.strip()]
|
||
|
if lines:
|
||
|
return [
|
||
|
dict(zip(lines[0], v))
|
||
|
for v in lines[1:]
|
||
|
]
|
||
|
return []
|
||
|
|
||
|
def upsc_parse_data(data):
|
||
|
lines = data.strip().split("\n")
|
||
|
result = {}
|
||
|
for line in lines:
|
||
|
key, value = line.split(": ")
|
||
|
result[key] = value
|
||
|
return result
|
||
|
|
||
|
def zpool_print_status():
|
||
|
zpool_status = os.popen('zpool status')
|
||
|
zpool_output = zpool_status.read()
|
||
|
|
||
|
zpool_status_data = zpool_parse_data(zpool_output)
|
||
|
|
||
|
zpool_list_size = os.popen('zpool list -o name,size,free')
|
||
|
zpool_list_output = zpool_list_size.read()
|
||
|
zpool_list_size_data = zpool_parse_config(zpool_list_output)
|
||
|
|
||
|
with canvas(device) as draw:
|
||
|
draw.text((0,0), "ZFS ->", fill=1)
|
||
|
|
||
|
pool_name = zpool_status_data["pool"]
|
||
|
pool_status = "ON" if zpool_status_data["state"] == "ONLINE" else "OFF"
|
||
|
final_pool_name = pool_name + ": " + pool_status
|
||
|
|
||
|
pool_name_width = draw.textlength(final_pool_name)
|
||
|
draw.text((s_width - pool_name_width, 0), final_pool_name, fill=1)
|
||
|
|
||
|
starting_height = 0
|
||
|
for drive in zpool_status_data["config"]:
|
||
|
if drive["NAME"] == pool_name:
|
||
|
for drive in zpool_list_size_data:
|
||
|
if drive["NAME"] == pool_name:
|
||
|
starting_height += 9
|
||
|
|
||
|
final_string = drive["FREE"] + "/" + drive["SIZE"]
|
||
|
fs_width = draw.textlength(final_string)
|
||
|
draw.text((s_width - fs_width,starting_height), final_string, fill=1)
|
||
|
continue
|
||
|
|
||
|
starting_height += 9
|
||
|
|
||
|
draw.text((0,starting_height), drive["NAME"], fill=1)
|
||
|
drive_state_width = draw.textlength(drive["STATE"])
|
||
|
draw.text((s_width - drive_state_width,starting_height), drive["STATE"], fill=1)
|
||
|
|
||
|
draw.text((0, s_height - 10), zpool_status_data["errors"], fill=1)
|
||
|
|
||
|
def ups_print_status():
|
||
|
ups_name_finder = os.popen('upsc -l 127.0.0.1')
|
||
|
ups_name_finder_output = ups_name_finder.read().split("\n")
|
||
|
|
||
|
ups_name = ups_name_finder_output[0]
|
||
|
|
||
|
upsc_status = os.popen('upsc ' + ups_name + '@127.0.0.1:3493')
|
||
|
upsc_output = upsc_status.read()
|
||
|
|
||
|
upsc_status_data = upsc_parse_data(upsc_output)
|
||
|
|
||
|
with canvas(device) as draw:
|
||
|
draw.text((0,0), "UPS ->", fill=1)
|
||
|
|
||
|
ups_name_width = draw.textlength(ups_name)
|
||
|
draw.text((s_width - ups_name_width, 0), ups_name, fill=1)
|
||
|
|
||
|
battery_charge = upsc_status_data["battery.charge"]
|
||
|
|
||
|
used_width = draw.textlength("[]%")
|
||
|
bar_width = (128 - used_width)
|
||
|
real_width = (int(battery_charge) * bar_width) / 100
|
||
|
|
||
|
draw.text((0,9), "[", fill=1)
|
||
|
draw.rectangle((5,13, real_width, 17), fill=1)
|
||
|
draw.text((bar_width, 9), "]%", fill=1)
|
||
|
|
||
|
draw.text((0, 20), upsc_status_data["device.model"], fill=1)
|
||
|
draw.text((0, 28), upsc_status_data["outlet.power"] + "W", fill=1)
|
||
|
draw.text((0, 36), upsc_status_data["output.voltage"] + "V", fill=1)
|
||
|
|
||
|
def main():
|
||
|
if which('zpool') is None:
|
||
|
print("Cannot find `zpool` executable.")
|
||
|
|
||
|
exit(1)
|
||
|
|
||
|
if which('upsc') is None:
|
||
|
print("Cannot find `upsc` executable.")
|
||
|
|
||
|
exit(1)
|
||
|
|
||
|
start_count = 0
|
||
|
while True:
|
||
|
device.clear()
|
||
|
|
||
|
if (start_count % 2) == 0:
|
||
|
zpool_print_status()
|
||
|
else:
|
||
|
ups_print_status()
|
||
|
|
||
|
start_count += 1
|
||
|
time.sleep(5)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
try:
|
||
|
main()
|
||
|
except KeyboardInterrupt:
|
||
|
pass
|