Author Topic: Monitoring EPMA status using Python and Home Assistant  (Read 493 times)

Changkun

  • Professor
  • ****
  • Posts: 30
Monitoring EPMA status using Python and Home Assistant
« on: June 14, 2022, 09:06:29 PM »
We are using Home Assistant (https://www.home-assistant.io) to monitor the EPMA lab. Here I'd like to introduce how it works.

A few months ago, Hwayoung wrote a python code to get numbers and characters from the JEOL Maintenance Window (it should be visible on the screen).

Code: [Select]
import win32gui
import win32con
import paho.mqtt.client as mqtt
import time
import sys

def handleToString (handle):
buffer = win32gui.PyMakeBuffer(255)
length = win32gui.SendMessage(handle, win32con.WM_GETTEXT, 255, buffer)
result = buffer[0:length*2]
return str(result.tobytes().decode("utf-16"))

def colorToRGB(RGBint):
blue =  RGBint & 255
green = (RGBint >> 8) & 255
red =   (RGBint >> 16) & 255
return (red, green, blue)

def selectTab (tabNum):
tab = win32gui.FindWindowEx(win32gui.FindWindow(None, "Maintenance"), None, "SysTabControl32", None)
TCM_SETCURFOCUS = 0x1330
win32gui.SendMessage(tab, TCM_SETCURFOCUS, tabNum-1, 0)

def getHandles (parent_window, className=None, tabNum=1):
selectTab(tabNum)
time.sleep(0.2)
handles = {}
hwnd1 = None
while True:
sub_window = win32gui.FindWindowEx(parent_window, hwnd1, None, None)
if sub_window == 0:
break
hwnd2 = None
while True:
item = win32gui.FindWindowEx(sub_window, hwnd2, className, None)
if item == 0:
break
CtrlId = win32gui.GetDlgCtrlID(item)
hwnd2 = item
if win32gui.IsWindowVisible(item):
handles[CtrlId] = item
hwnd1 = sub_window
return handles

class MQTT:
def __init__(self, ip_addr, port):
client = mqtt.Client()
self.client = client
client.on_connect = self.on_connect
client.username_pw_set(username="username", password="password")
try:
client.connect(ip_addr, port, 5)
except Exception as e:
sys.exit(">> MQTT connection error: {}".format(e))
client.loop_start()

def on_connect(self, client, userdata, flags, rc):
if not rc == 0:
sys.exit(">> Can not connect to mqtt server.")

def publish(self, topic, msg):
self.client.publish(topic, msg, 1, retain=True)

def disconnect(self):
self.client.loop_stop()
self.client.disconnect()


MQTT_IP_ADDR = "xxx.xxx.xxx.xxx"
MQTT_PORT = 1883
TOPIC_1ST = "EPMA"
TOPIC_2ND_TAB1 = "GUN_VAC"
TOPIC_2ND_TAB2 = "Condition"
OUTPUT_WIDTH_COL_1 = 45
OUTPUT_WIDTH_COL_2 = 8
FIELDS = {
1: {
1387: "Gun Chamber state",
2313: "Specimen Chamber State",
2308: "Filament Current",
2309: "Extract Voltage",
2310: "Emission Current",
2311: "Gun Chamber",
2312: "1st IC",
2151: "Specimen Chamber"
},
2: {
1411: "Vacuum Failure of Gun Chamber",
1412: "Vacuum Failure of Intermediate Chamber",
1416: "Ion pump protection Gun Chamber",
1417: "Ion pump protection Intermediate Chamber",
2160: "Vacuum Failure of Specimen Chamber",
2161: "Vacuum Failure of Pre-Pumping",
2162: "Vacuum Failure of Rough Pumping",
1394: "RP Failure",
1403: "TMP Failure",
1418: "The Wire of PIG Broken",
1393: "Check Pressure of N2 Gas",
2333: "Excess Temperature at CL coil",
2163: "Excess Temperature at Power Amp",
2164: "Excess Temperature at OL Coil",
1392: "Check water flow",
2321: "Check water pressure",
1400: "Excess Emission Current",
2165: "The GUN Cable is not connected",
1422: "Instrument communication failure",
1420: "Stage initialization failure"
}
}
UNITS = {
2308: "A",
2309: "kV",
2310: "μA",
2311: "Pa",
2312: "Pa",
2151: "Pa"
}


# Find "Maintenance" window
win32gui.ShowWindow(win32gui.FindWindow(None, "PC-SEM"), win32con.SW_RESTORE)
window = win32gui.FindWindow(None, "Maintenance")
if not win32gui.IsWindowVisible(window):
sys.exit(">> Maintenance window is not opened.")

try:
win32gui.SetForegroundWindow(window)
except Exception as e:
print(e)


# Connect to MQTT server
mqtt = MQTT(MQTT_IP_ADDR, MQTT_PORT)


# Find pre-determined handles by matching CtrlId in each tabs
for tab, subfields in FIELDS.items():
for CtrlId, item in getHandles(window, None, tab).items():
if CtrlId in subfields.keys():
if tab == 1:
# Tab1 : get string value
topic_2nd = TOPIC_2ND_TAB1
try:
data = handleToString(item)
except Exception as e:
print(e)
data = "N/A"
elif tab == 2:
# Tab2 : determine ON/OFF (whether pixel RGB is green or not)
topic_2nd = TOPIC_2ND_TAB2
try:
color = colorToRGB(win32gui.GetPixel(win32gui.GetDC(item), 7, 7))
if color == (0,255,0):
data = "ON"
else:
data = "OFF"
except Exception as e:
print(e)
data = "ON"

# Text output in console
output = "{:<{width1}}{:<{width2}}".format(subfields[CtrlId], data, width1=OUTPUT_WIDTH_COL_1, width2=OUTPUT_WIDTH_COL_2)
if CtrlId in UNITS.keys():
output += "({})".format(UNITS[CtrlId])
print(output)

# MQTT publishing
topic = "{}/{}/{}".format(TOPIC_1ST, topic_2nd, subfields[CtrlId].replace(" ", "_"))
mqtt.publish(topic, data)


# Close connection to MQTT server
mqtt.disconnect()

# Go back to Tab1
selectTab(1)

The python program reads the numbers and characters from the window and send them to the Home Assistant server by publishing mqtt messages, which runs every 10 minutes via Windows Task Scheduler.



Mqtt broker of the Home Assistant receives the messages and make them as sensors and binary sensors.



When a warning appears in the Maintenance window, it turns off the corresponding binary sensor of Home Assistant. It triggers an automation to send a warning message to us via SNS (like telegram).

Home Assistant is powerful and versatile. It supports wired (serial) and wireless (wifi, zigbee) communication protocols. Home Assistant can be your Lab Assistant!
« Last Edit: June 15, 2022, 07:07:40 AM by John Donovan »