From 3a086962c0215fda25715a1cb9b062c2b0724996 Mon Sep 17 00:00:00 2001 From: Sebastian Lohff Date: Sun, 10 Jan 2021 23:32:09 +0100 Subject: [PATCH] WIP: MQTT combined mode --- mqtt_sensord/mqtt_sensord.py | 37 +++++++++++++++++++++++++------ mqtt_sensord/mqttlib/base.py | 3 +++ mqtt_sensord/mqttlib/paho_mqtt.py | 5 +++++ mqtt_sensord/sensorlib/base.py | 8 +++---- 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/mqtt_sensord/mqtt_sensord.py b/mqtt_sensord/mqtt_sensord.py index ae4efb9..c0f6f1f 100755 --- a/mqtt_sensord/mqtt_sensord.py +++ b/mqtt_sensord/mqtt_sensord.py @@ -28,6 +28,7 @@ def main(): debug = debug or config.get('debug') config['debug'] = debug + esp_reset_time = config['host'].get('esp-reset-time') sensorlib.Sensor.configure(config['host']['name'], sys.platform) mcfg = config['mqtt'] @@ -36,6 +37,7 @@ def main(): else: client_id = None + mqtt_mode = config['mqtt'].get('mode', 'split') mqtt = MQTTClient(mcfg['host'], mcfg['user'], mcfg['password'], client_id) default_mqtt_topic = mcfg.get('sensor_topic') print("Connected to", mqtt.host, "with default topic", default_mqtt_topic) @@ -54,19 +56,35 @@ def main(): print("Subscribe to", topic) mqtt.subscribe(topic) - last_measurement = time.time() + last_measurement = start_time = time.time() while True: # handle regular sensor data if debug: print("Getting values from sensors...") - for sensor in sensors: - if sensor.HAS_SENSOR_DATA: - data = sensor.gen_datapoint() - if debug: - print(default_mqtt_topic, data) - mqtt.send_data(default_mqtt_topic, data) + if mqtt_mode == 'split': + for sensor in sensors: + if sensor.HAS_SENSOR_DATA: + data = sensor.gen_datapoint() + if debug: + print(default_mqtt_topic, data) + mqtt.send_data(default_mqtt_topic, data) + if not mqtt.is_async(): + mqtt.process_mqtt() + elif mqtt_mode == 'combined': + data = { + 'hostname': config['host']['name'], + 'platform': sys.platform, + } + for sensor in sensors: + if sensor.HAS_SENSOR_DATA: + data[sensor.id] = sensor.get_data() + if debug: + print(default_mqtt_topic, data) + mqtt.send_data(default_mqtt_topic, data) if not mqtt.is_async(): mqtt.process_mqtt() + else: + raise RuntimeError("Unknown mqtt mode {}".format(mqtt_mode)) if mqtt.is_async(): time.sleep(max(0.0, time.time() + config['host']['interval'] - last_measurement)) @@ -76,6 +94,11 @@ def main(): time.sleep(0.1) last_measurement = time.time() + if sys.implementation.name == 'micropython' and esp_reset_time and \ + last_measurement - start_time > esp_reset_time: + import machine + machine.reset() + if __name__ == '__main__': main() diff --git a/mqtt_sensord/mqttlib/base.py b/mqtt_sensord/mqttlib/base.py index 1a171e2..85fb616 100644 --- a/mqtt_sensord/mqttlib/base.py +++ b/mqtt_sensord/mqttlib/base.py @@ -15,3 +15,6 @@ class MQTTClientBase: def is_async(self): return False + + def process_mqtt(self): + pass diff --git a/mqtt_sensord/mqttlib/paho_mqtt.py b/mqtt_sensord/mqttlib/paho_mqtt.py index 2b37642..fac933f 100644 --- a/mqtt_sensord/mqttlib/paho_mqtt.py +++ b/mqtt_sensord/mqttlib/paho_mqtt.py @@ -9,8 +9,13 @@ class MQTTClient(MQTTClientBase): def _connect(self): self._mqtt = mqtt.Client(self.client_id) self._mqtt.username_pw_set(self.user, self.password) + self._mqtt.on_connect = self.on_connect self._mqtt.connect(self.host) + def on_connect(self, *args, **kwargs): + #print("Connected! client={} userdata={} msg={}".format(client, userdata, msg)) + print("Connected!", *args, **kwargs) + def send_data(self, topic, data): self._mqtt.publish(topic, json.dumps(data)) diff --git a/mqtt_sensord/sensorlib/base.py b/mqtt_sensord/sensorlib/base.py index d4225d0..037ac6e 100644 --- a/mqtt_sensord/sensorlib/base.py +++ b/mqtt_sensord/sensorlib/base.py @@ -17,10 +17,10 @@ class Sensor(object): _mqtt_callbacks = {} HAS_SENSOR_DATA = True - def __init__(self, sensor_id, name, sensor_type, pin, description, **sensor_conf): - self.id = sensor_id + def __init__(self, id, name, type, pin, description, **sensor_conf): + self.id = id self.name = name - self.type = sensor_type + self.type = type self.pin = pin self.description = description @@ -41,7 +41,7 @@ class Sensor(object): def get_data(self): raise NotImplementedError("You missed a spot!") - def gen_datapoint(self): + def gen_datapoint(self, legacy=True): return { 'type': 'measurement', 'tags': {