mqtt-sensord/mqtt_sensord/sensorlib/base.py

81 lines
1.9 KiB
Python
Raw Permalink Normal View History

2019-08-11 01:13:03 +02:00
try:
import json
except ImportError:
import ujson as json
2019-07-09 22:01:39 +02:00
def sensor(cls):
Sensor.register(cls)
return cls
class Sensor(object):
hostname = None
platform = None
_sensor_classes = {}
2019-07-31 22:31:50 +02:00
_mqtt_callbacks = {}
HAS_SENSOR_DATA = True
2019-07-09 22:01:39 +02:00
2021-01-10 23:32:09 +01:00
def __init__(self, id, name, type, pin, description, **sensor_conf):
self.id = id
2019-07-09 22:01:39 +02:00
self.name = name
2021-01-10 23:32:09 +01:00
self.type = type
2019-07-09 22:01:39 +02:00
self.pin = pin
self.description = description
@classmethod
def configure(cls, hostname, platform):
cls.hostname = hostname
cls.platform = platform
@classmethod
def register(cls, new_cls):
cls._sensor_classes[new_cls.sensor_class] = new_cls
@classmethod
def make_sensor(cls, **kwargs):
sensor_cls = cls._sensor_classes[kwargs['type']]
return sensor_cls(**kwargs)
def get_data(self):
raise NotImplementedError("You missed a spot!")
2021-01-10 23:32:09 +01:00
def gen_datapoint(self, legacy=True):
2019-07-09 22:01:39 +02:00
return {
'type': 'measurement',
'tags': {
'name': self.name,
'sub-id': self.id,
'sensor': self.type,
'hostname': self.hostname,
'platform': self.platform,
},
'fields': self.get_data()
}
2019-07-31 22:31:50 +02:00
def register_callbacks(self):
pass
def get_mqtt_topics(self):
return []
@classmethod
def register_callback(cls, mqtt_topic, callback):
cls._mqtt_callbacks[mqtt_topic] = callback
@classmethod
def run_mqtt_callback(cls, mqtt_topic, data):
2019-08-11 01:13:03 +02:00
try:
data = json.loads(data)
except ValueError:
print("Could not parse data {}".format(data))
2019-07-31 22:31:50 +02:00
if mqtt_topic in cls._mqtt_callbacks:
cls._mqtt_callbacks[mqtt_topic](data)
else:
print("Unknown callback for", mqtt_topic)
def get_topics(self):
return []