#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) 2023 Claudio Maradonna # PYTHON_ARGCOMPLETE_OK from luma.core.interface.serial import i2c from luma.core.render import canvas from luma.oled.device import ssd1306, ssd1325, ssd1331, sh1106 from PIL import ImageFont, ImageDraw, Image from time import sleep import time import re import os from shutil import which from pprint import pprint serial = i2c(port=0, address=0x3C) device = ssd1306(serial, rotate=2) s_width = 128 s_height = 64 def zpool_parse_data(data): parts = re.split(r'(?:\n|^)\s*(\w*):\s*', data.strip(), re.MULTILINE)[1:] parsed = dict(zip(parts[::2], parts[1::2])) return { **parsed, 'config': zpool_parse_config(parsed.get('config', '')) } def zpool_parse_config(data): lines = [v.strip().split() for v in data.splitlines() if v.strip()] if lines: return [ dict(zip(lines[0], v)) for v in lines[1:] ] return [] def upsc_parse_data(data): lines = data.strip().split("\n") result = {} for line in lines: key, value = line.split(": ") result[key] = value return result def zpool_print_status(): zpool_status = os.popen('zpool status') zpool_output = zpool_status.read() zpool_status_data = zpool_parse_data(zpool_output) zpool_list_size = os.popen('zpool list -o name,size,free') zpool_list_output = zpool_list_size.read() zpool_list_size_data = zpool_parse_config(zpool_list_output) with canvas(device) as draw: draw.text((0,0), "ZFS ->", fill=1) pool_name = zpool_status_data["pool"] pool_status = "ON" if zpool_status_data["state"] == "ONLINE" else "OFF" final_pool_name = pool_name + ": " + pool_status pool_name_width = draw.textlength(final_pool_name) draw.text((s_width - pool_name_width, 0), final_pool_name, fill=1) starting_height = 0 for drive in zpool_status_data["config"]: if drive["NAME"] == pool_name: for drive in zpool_list_size_data: if drive["NAME"] == pool_name: starting_height += 9 final_string = drive["FREE"] + "/" + drive["SIZE"] fs_width = draw.textlength(final_string) draw.text((s_width - fs_width,starting_height), final_string, fill=1) continue starting_height += 9 draw.text((0,starting_height), drive["NAME"], fill=1) drive_state_width = draw.textlength(drive["STATE"]) draw.text((s_width - drive_state_width,starting_height), drive["STATE"], fill=1) draw.text((0, s_height - 10), zpool_status_data["errors"], fill=1) def upsc_print_status(): ups_name_finder = os.popen('upsc -l 127.0.0.1') ups_name_finder_output = ups_name_finder.read().split("\n") ups_name = ups_name_finder_output[0] upsc_status = os.popen('upsc ' + ups_name + '@127.0.0.1:3493') upsc_output = upsc_status.read() upsc_status_data = upsc_parse_data(upsc_output) with canvas(device) as draw: draw.text((0,0), "UPS ->", fill=1) ups_name_width = draw.textlength(ups_name) draw.text((s_width - ups_name_width, 0), ups_name, fill=1) battery_charge = upsc_status_data["battery.charge"] used_width = draw.textlength("[] "+battery_charge+"%") bar_width = (128 - used_width) real_width = (int(battery_charge) * bar_width) / 100 draw.text((0,9), "[", fill=1) draw.rectangle((5,13, real_width, 17), fill=1) draw.text((bar_width, 9), "] "+battery_charge+"%", fill=1) what_to_show = { 'device.model': '%{value}', 'outlet.power': '%{value}W', 'output.voltage': '%{value}V' } starting_height = 19 if int(battery_charge) <= int(upsc_status_data['battery.charge.low']): draw.text((0, starting_height + 9), "*********************", fill=1) draw.text((0, starting_height + 18), "**** BATTERY LOW ****", fill=1) draw.text((0, starting_height + 27), "*********************", fill=1) else: starting_height += 9 for key in what_to_show: value = what_to_show[key].replace('%{value}', upsc_status_data[key]) draw.text((0, starting_height), value, fill=1) starting_height += 9 def main(): modules_enabled = [] if which('zpool') is not None: modules_enabled.append('zpool') if which('upsc') is not None: modules_enabled.append('upsc') if not modules_enabled: print("This script cannot run any module; check dependencies") exit(1) possibles = globals().copy() possibles.update(locals()) start_count = 0 last_print = '' while True: module_to_run = modules_enabled[start_count % 2] method = possibles.get(module_to_run + "_print_status") if not method: print("Module "+module_to_run+" not implemented") exit(1) else: if module_to_run != last_print: device.clear() method() last_print = module_to_run if start_count == (len(modules_enabled) - 1): start_count = 0 else: start_count += 1 time.sleep(5) if __name__ == "__main__": try: main() except KeyboardInterrupt: pass