118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
import dht
|
|
import ds18x20
|
|
import machine
|
|
import onewire
|
|
import time
|
|
import ujson
|
|
|
|
from .base import Sensor, sensor
|
|
|
|
|
|
@sensor
|
|
class DS18B20(Sensor):
|
|
sensor_class = 'esp-ds18b20'
|
|
|
|
def __init__(self, **sensor_conf):
|
|
super(DS18B20, self).__init__(**sensor_conf)
|
|
|
|
self.o = onewire.OneWire(machine.Pin(self.pin))
|
|
self.ds = ds18x20.DS18X20(self.o)
|
|
self.sID = self.ds.scan()[0]
|
|
|
|
def get_data(self):
|
|
self.ds.convert_temp()
|
|
time.sleep(0.1)
|
|
return {"temp": self.ds.read_temp(self.sID)}
|
|
|
|
|
|
@sensor
|
|
class DHT(Sensor):
|
|
sensor_class = 'esp-dht'
|
|
|
|
def __init__(self, **sensor_conf):
|
|
super(DHT, self).__init__(**sensor_conf)
|
|
if sensor_conf["dht_type"] == "dht11":
|
|
self.dht = dht.DHT11(machine.Pin(self.pin))
|
|
elif sensor_conf["dht_type"] == "dht22":
|
|
self.dht = dht.DHT22(machine.Pin(self.pin))
|
|
else:
|
|
raise ValueError("Unknown type")
|
|
|
|
def get_data(self):
|
|
self.dht.measure()
|
|
time.sleep(0.1)
|
|
return {"temp": self.dht.temperature(), "humidity": self.dht.humidity()}
|
|
|
|
|
|
@sensor
|
|
class PowerSwitch(Sensor):
|
|
sensor_class = 'esp-rcswitch'
|
|
HAS_SENSOR_DATA = False
|
|
|
|
def __init__(self, mqtt_topic, socket_name, socket_count, socket_json, **sensor_conf):
|
|
super(PowerSwitch, self).__init__(**sensor_conf)
|
|
|
|
self.mqtt_topic = mqtt_topic
|
|
self.socket_name = socket_name
|
|
self.socket_count = socket_count
|
|
self.socket_state = ["off"] * socket_count
|
|
self.socket_code_state = []
|
|
self._codes_per_call = 2
|
|
for _ in range(socket_count):
|
|
self.socket_code_state.append({'on': 0, 'off': 0})
|
|
self._pin = machine.Pin(self.pin, machine.Pin.OUT)
|
|
self._load_json(socket_json)
|
|
|
|
def _load_json(self, path):
|
|
with open(path) as f:
|
|
data = ujson.load(f)
|
|
|
|
for entry in data:
|
|
if entry["name"] == self.socket_name:
|
|
self._socket_data = entry
|
|
break
|
|
else:
|
|
raise ValueError("Could not find socket codes for {} in {}".format(self.socket_name, path))
|
|
|
|
def send_by_compressed_timings(self, buckets, timings, repeats):
|
|
delays = [buckets[ord(t) - ord('0')] for t in timings]
|
|
for repeat in range(repeats):
|
|
self._pin.value(0)
|
|
state = 0
|
|
t1 = time.ticks_us()
|
|
for delay in delays:
|
|
state ^= 1
|
|
self._pin.value(state)
|
|
time.sleep_us(delay - (time.ticks_us() - t1) - 55)
|
|
t1 = time.ticks_us()
|
|
|
|
def switch_light(self, data):
|
|
try:
|
|
sock = int(data["socket"])
|
|
if data["state"] in ("on", "off"):
|
|
state = data["state"]
|
|
elif data["state"] == "toggle":
|
|
state = "on" if self.socket_state[sock] == "off" else "off"
|
|
else:
|
|
raise ValueError("Unknown state {}".format(data["state"]))
|
|
if sock < 0 or sock >= self.socket_count:
|
|
raise ValueError("Socket {} is out of range".format(sock))
|
|
except (KeyError, ValueError) as e:
|
|
print("Could not load data:", e)
|
|
return
|
|
|
|
print("Switch light called with ", data)
|
|
codes = self._socket_data["codes"]
|
|
for _ in range(self._codes_per_call):
|
|
timings = codes[sock][state][self.socket_code_state[sock][state]]
|
|
self.send_by_compressed_timings(self._socket_data["buckets"], timings, 10)
|
|
self.socket_state[sock] = state
|
|
self.socket_code_state[sock][state] = (self.socket_code_state[sock][state] + 1) % len(codes[sock][state])
|
|
print("Switch socket {} to state {}".format(sock, state))
|
|
|
|
def register_callbacks(self):
|
|
self.register_callback(self.mqtt_topic, self.switch_light)
|
|
|
|
def get_topics(self):
|
|
return [self.mqtt_topic]
|