mqtt-sensord/mqtt_sensord/sensorlib/misc_sensors.py

165 lines
4.4 KiB
Python
Raw Normal View History

2019-07-09 22:01:39 +02:00
import random
import time
from .base import Sensor, sensor
try:
import fcntl
import threading
CO2_IMPORTS = True
except ImportError:
CO2_IMPORTS = False
2019-07-09 22:01:39 +02:00
@sensor
class FakeSensor(Sensor):
sensor_class = 'fake'
def __init__(self, **sensor_conf):
super(FakeSensor, self).__init__(**sensor_conf)
def get_data(self):
return {'random': random.randint(0, 20)}
2019-07-31 22:31:50 +02:00
@sensor
class TestSink(Sensor):
sensor_class = 'testsink'
HAS_SENSOR_DATA = False
def __init__(self, mqtt_topic, **sensor_conf):
self.mqtt_topic = mqtt_topic
def recvd(self, data):
print("I got called! Even with data:", data)
def register_callbacks(self):
self.register_callback(self.mqtt_topic, self.recvd)
def get_topics(self):
return [self.mqtt_topic]
2019-07-09 22:01:39 +02:00
@sensor
class CO2Meter(Sensor):
sensor_class = 'co2meter'
HIDIOCSFEATURE_9 = 0xC0094806
TEMP = 0x42
CO2 = 0x50
HUM = 0x44
def __init__(self, dev_path, **sensor_conf):
super(CO2Meter, self).__init__(**sensor_conf)
if not CO2_IMPORTS:
raise RuntimeError("Missing threading or fnctl to run this sensor")
2019-07-09 22:01:39 +02:00
self.dev_path = dev_path
self.key = b'\x11\x11\x11\x11\x11\x11\x11\x11'
self._dev = None
self.values = {}
self.init_meter()
self.start()
def init_meter(self):
self._dev = open(self.dev_path, "a+b", 0)
set_report = b'\x00' + self.key
fcntl.ioctl(self._dev, self.HIDIOCSFEATURE_9, set_report)
def decrypt(self, data):
cstate = [0x48, 0x74, 0x65, 0x6D, 0x70, 0x39, 0x39, 0x65]
shuffle = [2, 4, 0, 7, 1, 6, 5, 3]
phase1 = bytearray(8)
for i, o in enumerate(shuffle):
phase1[o] = data[i]
phase2 = bytearray(8)
for i in range(8):
phase2[i] = phase1[i] ^ self.key[i]
phase3 = bytearray(8)
for i in range(8):
phase3[i] = (phase2[i] >> 3 | phase2[i - 1] << 5) & 0xff
ctmp = bytearray(8)
for i in range(8):
ctmp[i] = (cstate[i] >> 4 | cstate[i] << 4) & 0xff
out = bytearray(8)
for i in range(8):
out[i] = (0x100 + phase3[i] - ctmp[i]) & 0xff
return out
def start(self):
self._stop = False
self._worker_thread = threading.Thread(target=self._worker, daemon=True)
self._worker_thread.start()
def stop(self):
self._stop = True
self._worker_thread = None
def _worker(self):
while not self._stop:
try:
data = self._dev.read(8)
decrypted = self.decrypt(data)
if decrypted[4] == 0x0d and sum(decrypted[:3]) & 0xff == decrypted[3]:
op = decrypted[0]
val = decrypted[1] << 8 | decrypted[2]
self.values[op] = val
else:
print("Error decrypting data:", data, '==>', decrypted)
except (IOError, OSError) as e:
print("Device error: {} - trying to reopen".format(e))
self.values = {}
try:
self._dev.close()
2019-07-09 22:01:39 +02:00
except Exception:
pass
2019-07-09 22:01:39 +02:00
while self._dev.closed:
time.sleep(1)
try:
self.init_meter()
except (IOError, OSError) as e:
print("Device error: {} - trying to reopen".format(e))
def get_co2(self):
if 0x50 in self.values:
return self.values[0x50]
return None
def get_temp(self):
if 0x42 in self.values:
return self.values[0x42] / 16.0 - 273.15
return None
def get_humidity(self):
if 0x44 in self.values:
return self.values[0x44] / 100.0
return None
def get_data(self):
return dict(temp=self.get_temp(), co2=self.get_co2())
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("device", help="Device do read from")
args = parser.parse_args()
co2 = CO2Meter(args.device)
co2.start()
try:
while True:
if co2.get_temp() and co2.get_co2():
print("{:2.4}°C {:4} PPM CO2".format(co2.get_temp(), co2.get_co2()))
time.sleep(2)
except KeyboardInterrupt:
pass