import random import time from .base import Sensor, sensor try: import fcntl import threading CO2_IMPORTS = True except ImportError: CO2_IMPORTS = False @sensor class FakeSensor(Sensor): sensor_class = 'fake' def __init__(self, **sensor_conf): super(FakeSensor, self).__init__(**sensor_conf) def get_data(self): return {'random': random.randint(0, 20)} @sensor class TestSink(Sensor): sensor_class = 'testsink' HAS_SENSOR_DATA = False def __init__(self, mqtt_topic, **sensor_conf): self.mqtt_topic = mqtt_topic def recvd(self, data): print("I got called! Even with data:", data) def register_callbacks(self): self.register_callback(self.mqtt_topic, self.recvd) def get_topics(self): return [self.mqtt_topic] @sensor class CO2Meter(Sensor): sensor_class = 'co2meter' HIDIOCSFEATURE_9 = 0xC0094806 TEMP = 0x42 CO2 = 0x50 HUM = 0x44 def __init__(self, dev_path, **sensor_conf): super(CO2Meter, self).__init__(**sensor_conf) if not CO2_IMPORTS: raise RuntimeError("Missing threading or fnctl to run this sensor") self.dev_path = dev_path self.key = b'\x11\x11\x11\x11\x11\x11\x11\x11' self._dev = None self.values = {} self.init_meter() self.start() def init_meter(self): self._dev = open(self.dev_path, "a+b", 0) set_report = b'\x00' + self.key fcntl.ioctl(self._dev, self.HIDIOCSFEATURE_9, set_report) def decrypt(self, data): cstate = [0x48, 0x74, 0x65, 0x6D, 0x70, 0x39, 0x39, 0x65] shuffle = [2, 4, 0, 7, 1, 6, 5, 3] phase1 = bytearray(8) for i, o in enumerate(shuffle): phase1[o] = data[i] phase2 = bytearray(8) for i in range(8): phase2[i] = phase1[i] ^ self.key[i] phase3 = bytearray(8) for i in range(8): phase3[i] = (phase2[i] >> 3 | phase2[i - 1] << 5) & 0xff ctmp = bytearray(8) for i in range(8): ctmp[i] = (cstate[i] >> 4 | cstate[i] << 4) & 0xff out = bytearray(8) for i in range(8): out[i] = (0x100 + phase3[i] - ctmp[i]) & 0xff return out def start(self): self._stop = False self._worker_thread = threading.Thread(target=self._worker, daemon=True) self._worker_thread.start() def stop(self): self._stop = True self._worker_thread = None def _worker(self): while not self._stop: try: data = self._dev.read(8) decrypted = self.decrypt(data) if decrypted[4] == 0x0d and sum(decrypted[:3]) & 0xff == decrypted[3]: op = decrypted[0] val = decrypted[1] << 8 | decrypted[2] self.values[op] = val else: print("Error decrypting data:", data, '==>', decrypted) except (IOError, OSError) as e: print("Device error: {} - trying to reopen".format(e)) self.values = {} try: self._dev.close() except Exception: pass while self._dev.closed: time.sleep(1) try: self.init_meter() except (IOError, OSError) as e: print("Device error: {} - trying to reopen".format(e)) def get_co2(self): if 0x50 in self.values: return self.values[0x50] return None def get_temp(self): if 0x42 in self.values: return self.values[0x42] / 16.0 - 273.15 return None def get_humidity(self): if 0x44 in self.values: return self.values[0x44] / 100.0 return None def get_data(self): return dict(temp=self.get_temp(), co2=self.get_co2()) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("device", help="Device do read from") args = parser.parse_args() co2 = CO2Meter(args.device) co2.start() try: while True: if co2.get_temp() and co2.get_co2(): print("{:2.4}°C {:4} PPM CO2".format(co2.get_temp(), co2.get_co2())) time.sleep(2) except KeyboardInterrupt: pass