Support mqtt callbacks
This commit is contained in:
parent
31ca8475c2
commit
41dfa6b262
|
@ -33,20 +33,39 @@ def main():
|
||||||
mqtt = MQTTClient(mcfg['host'], mcfg['user'], mcfg['password'], client_id)
|
mqtt = MQTTClient(mcfg['host'], mcfg['user'], mcfg['password'], client_id)
|
||||||
mqtt_topic = mcfg['sensor_topic']
|
mqtt_topic = mcfg['sensor_topic']
|
||||||
print("Connected to", mqtt.host, "with topic", mqtt_topic)
|
print("Connected to", mqtt.host, "with topic", mqtt_topic)
|
||||||
|
mqtt.set_callback(sensorlib.Sensor.run_mqtt_callback)
|
||||||
|
|
||||||
sensors = []
|
sensors = []
|
||||||
|
extra_topics = set()
|
||||||
for scfg in config['sensors']:
|
for scfg in config['sensors']:
|
||||||
s = sensorlib.Sensor.make_sensor(**scfg)
|
s = sensorlib.Sensor.make_sensor(**scfg)
|
||||||
|
s.register_callbacks()
|
||||||
|
for topic in s.get_topics():
|
||||||
|
extra_topics.add(topic)
|
||||||
sensors.append(s)
|
sensors.append(s)
|
||||||
|
|
||||||
|
for topic in extra_topics:
|
||||||
|
print("Subscribe to", topic)
|
||||||
|
mqtt.subscribe(topic)
|
||||||
|
|
||||||
last_measurement = time.time()
|
last_measurement = time.time()
|
||||||
while True:
|
while True:
|
||||||
|
# handle regular sensor data
|
||||||
print("Getting values from sensors...")
|
print("Getting values from sensors...")
|
||||||
for sensor in sensors:
|
for sensor in sensors:
|
||||||
|
if sensor.HAS_SENSOR_DATA:
|
||||||
data = sensor.gen_datapoint()
|
data = sensor.gen_datapoint()
|
||||||
print(mqtt_topic, data)
|
print(mqtt_topic, data)
|
||||||
mqtt.send_data(mqtt_topic, data)
|
mqtt.send_data(mqtt_topic, data)
|
||||||
|
if not mqtt.is_async():
|
||||||
|
mqtt.process_mqtt()
|
||||||
|
|
||||||
|
if mqtt.is_async():
|
||||||
time.sleep(max(0.0, time.time() + config['host']['interval'] - last_measurement))
|
time.sleep(max(0.0, time.time() + config['host']['interval'] - last_measurement))
|
||||||
|
else:
|
||||||
|
while time.time() - last_measurement < config['host']['interval']:
|
||||||
|
mqtt.process_mqtt()
|
||||||
|
time.sleep(0.1)
|
||||||
last_measurement = time.time()
|
last_measurement = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,3 +12,6 @@ class MQTTClientBase:
|
||||||
|
|
||||||
def _connect(self):
|
def _connect(self):
|
||||||
raise NotImplementedError("Connect not implemented")
|
raise NotImplementedError("Connect not implemented")
|
||||||
|
|
||||||
|
def is_async(self):
|
||||||
|
return False
|
||||||
|
|
|
@ -13,3 +13,12 @@ class MQTTClient(MQTTClientBase):
|
||||||
|
|
||||||
def send_data(self, topic, data):
|
def send_data(self, topic, data):
|
||||||
self._mqtt.publish(topic, json.dumps(data))
|
self._mqtt.publish(topic, json.dumps(data))
|
||||||
|
|
||||||
|
def set_callback(self, callback):
|
||||||
|
self._mqtt.on_message = lambda client, userdata, msg: callback(msg.topic, msg.payload.decode())
|
||||||
|
|
||||||
|
def process_mqtt(self):
|
||||||
|
self._mqtt.loop(0.01)
|
||||||
|
|
||||||
|
def subscribe(self, topic):
|
||||||
|
self._mqtt.subscribe(topic)
|
||||||
|
|
|
@ -8,6 +8,8 @@ class Sensor(object):
|
||||||
hostname = None
|
hostname = None
|
||||||
platform = None
|
platform = None
|
||||||
_sensor_classes = {}
|
_sensor_classes = {}
|
||||||
|
_mqtt_callbacks = {}
|
||||||
|
HAS_SENSOR_DATA = True
|
||||||
|
|
||||||
def __init__(self, id, name, type, pin, description, **sensor_conf):
|
def __init__(self, id, name, type, pin, description, **sensor_conf):
|
||||||
self.id = id
|
self.id = id
|
||||||
|
@ -45,3 +47,23 @@ class Sensor(object):
|
||||||
},
|
},
|
||||||
'fields': self.get_data()
|
'fields': self.get_data()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def register_callbacks(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_mqtt_topics(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def register_callback(cls, mqtt_topic, callback):
|
||||||
|
cls._mqtt_callbacks[mqtt_topic] = callback
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def run_mqtt_callback(cls, mqtt_topic, data):
|
||||||
|
if mqtt_topic in cls._mqtt_callbacks:
|
||||||
|
cls._mqtt_callbacks[mqtt_topic](data)
|
||||||
|
else:
|
||||||
|
print("Unknown callback for", mqtt_topic)
|
||||||
|
|
||||||
|
def get_topics(self):
|
||||||
|
return []
|
||||||
|
|
|
@ -22,6 +22,24 @@ class FakeSensor(Sensor):
|
||||||
return {'random': random.randint(0, 20)}
|
return {'random': random.randint(0, 20)}
|
||||||
|
|
||||||
|
|
||||||
|
@sensor
|
||||||
|
class TestSink(Sensor):
|
||||||
|
sensor_class = 'testsink'
|
||||||
|
HAS_SENSOR_DATA = False
|
||||||
|
|
||||||
|
def __init__(self, mqtt_topic, **sensor_conf):
|
||||||
|
self.mqtt_topic = mqtt_topic
|
||||||
|
|
||||||
|
def recvd(self, data):
|
||||||
|
print("I got called! Even with data:", data)
|
||||||
|
|
||||||
|
def register_callbacks(self):
|
||||||
|
self.register_callback(self.mqtt_topic, self.recvd)
|
||||||
|
|
||||||
|
def get_topics(self):
|
||||||
|
return [self.mqtt_topic]
|
||||||
|
|
||||||
|
|
||||||
@sensor
|
@sensor
|
||||||
class CO2Meter(Sensor):
|
class CO2Meter(Sensor):
|
||||||
sensor_class = 'co2meter'
|
sensor_class = 'co2meter'
|
||||||
|
|
Loading…
Reference in New Issue