import dht import ds18x20 import machine import onewire import time import ujson from .base import Sensor, sensor @sensor class DS18B20(Sensor): sensor_class = 'esp-ds18b20' def __init__(self, **sensor_conf): super(DS18B20, self).__init__(**sensor_conf) self.o = onewire.OneWire(machine.Pin(self.pin)) self.ds = ds18x20.DS18X20(self.o) self.sID = self.ds.scan()[0] def get_data(self): self.ds.convert_temp() time.sleep(0.1) return {"temp": self.ds.read_temp(self.sID)} @sensor class DHT(Sensor): sensor_class = 'esp-dht' def __init__(self, **sensor_conf): super(DHT, self).__init__(**sensor_conf) if sensor_conf["dht_type"] == "dht11": self.dht = dht.DHT11(machine.Pin(self.pin)) elif sensor_conf["dht_type"] == "dht22": self.dht = dht.DHT22(machine.Pin(self.pin)) else: raise ValueError("Unknown type") def get_data(self): self.dht.measure() time.sleep(0.1) return {"temp": self.dht.temperature(), "humidity": self.dht.humidity()} @sensor class PowerSwitch(Sensor): sensor_class = 'esp-rcswitch' HAS_SENSOR_DATA = False def __init__(self, mqtt_topic, socket_name, socket_count, socket_json, **sensor_conf): super(PowerSwitch, self).__init__(**sensor_conf) self.mqtt_topic = mqtt_topic self.socket_name = socket_name self.socket_count = socket_count self.socket_state = ["off"] * socket_count self.socket_code_state = [] self._codes_per_call = 2 for _ in range(socket_count): self.socket_code_state.append({'on': 0, 'off': 0}) self._pin = machine.Pin(self.pin, machine.Pin.OUT) self._load_json(socket_json) def _load_json(self, path): with open(path) as f: data = ujson.load(f) for entry in data: if entry["name"] == self.socket_name: self._socket_data = entry break else: raise ValueError("Could not find socket codes for {} in {}".format(self.socket_name, path)) def send_by_compressed_timings(self, buckets, timings, repeats): delays = [buckets[ord(t) - ord('0')] for t in timings] for repeat in range(repeats): self._pin.value(0) state = 0 t1 = time.ticks_us() for delay in delays: state ^= 1 self._pin.value(state) time.sleep_us(delay - (time.ticks_us() - t1) - 55) t1 = time.ticks_us() def switch_light(self, data): try: sock = int(data["socket"]) if data["state"] in ("on", "off"): state = data["state"] elif data["state"] == "toggle": state = "on" if self.socket_state[sock] == "off" else "off" else: raise ValueError("Unknown state {}".format(data["state"])) if sock < 0 or sock >= self.socket_count: raise ValueError("Socket {} is out of range".format(sock)) except (KeyError, ValueError) as e: print("Could not load data:", e) return print("Switch light called with ", data) codes = self._socket_data["codes"] for _ in range(self._codes_per_call): timings = codes[sock][state][self.socket_code_state[sock][state]] self.send_by_compressed_timings(self._socket_data["buckets"], timings, 10) self.socket_state[sock] = state self.socket_code_state[sock][state] = (self.socket_code_state[sock][state] + 1) % len(codes[sock][state]) print("Switch socket {} to state {}".format(sock, state)) def register_callbacks(self): self.register_callback(self.mqtt_topic, self.switch_light) def get_topics(self): return [self.mqtt_topic]