try: import json except ImportError: import ujson as json def sensor(cls): Sensor.register(cls) return cls class Sensor(object): hostname = None platform = None _sensor_classes = {} _mqtt_callbacks = {} HAS_SENSOR_DATA = True def __init__(self, sensor_id, name, sensor_type, pin, description, **sensor_conf): self.id = sensor_id self.name = name self.type = sensor_type self.pin = pin self.description = description @classmethod def configure(cls, hostname, platform): cls.hostname = hostname cls.platform = platform @classmethod def register(cls, new_cls): cls._sensor_classes[new_cls.sensor_class] = new_cls @classmethod def make_sensor(cls, **kwargs): sensor_cls = cls._sensor_classes[kwargs['type']] return sensor_cls(**kwargs) def get_data(self): raise NotImplementedError("You missed a spot!") def gen_datapoint(self): return { 'type': 'measurement', 'tags': { 'name': self.name, 'sub-id': self.id, 'sensor': self.type, 'hostname': self.hostname, 'platform': self.platform, }, 'fields': self.get_data() } def register_callbacks(self): pass def get_mqtt_topics(self): return [] @classmethod def register_callback(cls, mqtt_topic, callback): cls._mqtt_callbacks[mqtt_topic] = callback @classmethod def run_mqtt_callback(cls, mqtt_topic, data): try: data = json.loads(data) except ValueError: print("Could not parse data {}".format(data)) if mqtt_topic in cls._mqtt_callbacks: cls._mqtt_callbacks[mqtt_topic](data) else: print("Unknown callback for", mqtt_topic) def get_topics(self): return []