diff --git a/mqtt_sensord/mqtt_sensord.py b/mqtt_sensord/mqtt_sensord.py index 5d5c321..f87ebd6 100755 --- a/mqtt_sensord/mqtt_sensord.py +++ b/mqtt_sensord/mqtt_sensord.py @@ -33,20 +33,39 @@ def main(): mqtt = MQTTClient(mcfg['host'], mcfg['user'], mcfg['password'], client_id) mqtt_topic = mcfg['sensor_topic'] print("Connected to", mqtt.host, "with topic", mqtt_topic) + mqtt.set_callback(sensorlib.Sensor.run_mqtt_callback) sensors = [] + extra_topics = set() for scfg in config['sensors']: s = sensorlib.Sensor.make_sensor(**scfg) + s.register_callbacks() + for topic in s.get_topics(): + extra_topics.add(topic) sensors.append(s) + for topic in extra_topics: + print("Subscribe to", topic) + mqtt.subscribe(topic) + last_measurement = time.time() while True: + # handle regular sensor data print("Getting values from sensors...") for sensor in sensors: - data = sensor.gen_datapoint() - print(mqtt_topic, data) - mqtt.send_data(mqtt_topic, data) - time.sleep(max(0.0, time.time() + config['host']['interval'] - last_measurement)) + if sensor.HAS_SENSOR_DATA: + data = sensor.gen_datapoint() + print(mqtt_topic, data) + mqtt.send_data(mqtt_topic, data) + if not mqtt.is_async(): + mqtt.process_mqtt() + + if mqtt.is_async(): + time.sleep(max(0.0, time.time() + config['host']['interval'] - last_measurement)) + else: + while time.time() - last_measurement < config['host']['interval']: + mqtt.process_mqtt() + time.sleep(0.1) last_measurement = time.time() diff --git a/mqtt_sensord/mqttlib/base.py b/mqtt_sensord/mqttlib/base.py index b176de6..1a171e2 100644 --- a/mqtt_sensord/mqttlib/base.py +++ b/mqtt_sensord/mqttlib/base.py @@ -12,3 +12,6 @@ class MQTTClientBase: def _connect(self): raise NotImplementedError("Connect not implemented") + + def is_async(self): + return False diff --git a/mqtt_sensord/mqttlib/paho_mqtt.py b/mqtt_sensord/mqttlib/paho_mqtt.py index e92e7d1..2b37642 100644 --- a/mqtt_sensord/mqttlib/paho_mqtt.py +++ b/mqtt_sensord/mqttlib/paho_mqtt.py @@ -13,3 +13,12 @@ class MQTTClient(MQTTClientBase): def send_data(self, topic, data): self._mqtt.publish(topic, json.dumps(data)) + + def set_callback(self, callback): + self._mqtt.on_message = lambda client, userdata, msg: callback(msg.topic, msg.payload.decode()) + + def process_mqtt(self): + self._mqtt.loop(0.01) + + def subscribe(self, topic): + self._mqtt.subscribe(topic) diff --git a/mqtt_sensord/sensorlib/base.py b/mqtt_sensord/sensorlib/base.py index 54d048c..77687d6 100644 --- a/mqtt_sensord/sensorlib/base.py +++ b/mqtt_sensord/sensorlib/base.py @@ -8,6 +8,8 @@ class Sensor(object): hostname = None platform = None _sensor_classes = {} + _mqtt_callbacks = {} + HAS_SENSOR_DATA = True def __init__(self, id, name, type, pin, description, **sensor_conf): self.id = id @@ -45,3 +47,23 @@ class Sensor(object): }, 'fields': self.get_data() } + + def register_callbacks(self): + pass + + def get_mqtt_topics(self): + return [] + + @classmethod + def register_callback(cls, mqtt_topic, callback): + cls._mqtt_callbacks[mqtt_topic] = callback + + @classmethod + def run_mqtt_callback(cls, mqtt_topic, data): + if mqtt_topic in cls._mqtt_callbacks: + cls._mqtt_callbacks[mqtt_topic](data) + else: + print("Unknown callback for", mqtt_topic) + + def get_topics(self): + return [] diff --git a/mqtt_sensord/sensorlib/misc_sensors.py b/mqtt_sensord/sensorlib/misc_sensors.py index 20b38e4..cf3af14 100644 --- a/mqtt_sensord/sensorlib/misc_sensors.py +++ b/mqtt_sensord/sensorlib/misc_sensors.py @@ -22,6 +22,24 @@ class FakeSensor(Sensor): return {'random': random.randint(0, 20)} +@sensor +class TestSink(Sensor): + sensor_class = 'testsink' + HAS_SENSOR_DATA = False + + def __init__(self, mqtt_topic, **sensor_conf): + self.mqtt_topic = mqtt_topic + + def recvd(self, data): + print("I got called! Even with data:", data) + + def register_callbacks(self): + self.register_callback(self.mqtt_topic, self.recvd) + + def get_topics(self): + return [self.mqtt_topic] + + @sensor class CO2Meter(Sensor): sensor_class = 'co2meter'