# Licensed under GPLv3 # Written by Sebastian Lohff (seba@someserver.de) # http://seba-geek.de/projects/seopardy/ from __future__ import print_function import os import serial import socket from PySide import QtCore _inputs = {} def _add_input(inputType, inputClass): global _inputs _inputs[inputType] = inputClass def get_input(inputType, args, app): global _inputs if inputType not in _inputs: raise ValueError("'%s' is an unknown input type" % inputType) return _inputs[inputType](app, *args) class BaseInput(QtCore.QThread): buttonEvent = QtCore.Signal(int) def __init__(self, app, parent=None): super(BaseInput, self).__init__(parent) self._app = app def _sendButtonEvent(self, btn): self.buttonEvent.emit(int(btn)) @QtCore.Slot(bool) def buzzersOpen(self, isOpen): """ Called when buzzers are opened or closed for answering isOpen - True if question got opened, False if closed """ #print("Buzzers are now " + ("open" if isOpen else "closed")) pass @QtCore.Slot(int) def playerGotQuestion(self, playerNo): """ Called when a player got its turn to answer the question playerNo - number (int) of player which can answer (starting from 1) """ #print("player %d can answer now" % playerNo) pass class SerialInput(BaseInput): def __init__(self, app, device, baudrate=9600, bytesize=8, parity='N', stopbits=1): super(SerialInput, self).__init__(app) try: self._serial = serial.Serial(device, baudrate, bytesize, parity, stopbits) except serial.SerialException as e: raise InputException("Error: %s" % e) print(" >> serial %s initialized with rate %s %s%s%s" % (device, baudrate, bytesize, parity, stopbits)) def run(self): while True: c = self._serial.read(1) if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'): self._sendButtonEvent(c) _add_input("Serial", SerialInput) class BeopardySerialInput(BaseInput): def __init__(self, app, device, baudrate=9600, bytesize=8, parity='N', stopbits=1): super(BeopardySerialInput, self).__init__(app) try: self._serial = serial.Serial(device, baudrate, bytesize, parity, stopbits) except serial.SerialException as e: raise InputException("Error: %s" % e) # playerDict self._playerDict = { 'a': 1, 'b': 2, 'c': 3, 'd': 4, } print(" >> serial %s initialized with rate %s %s%s%s" % (device, baudrate, bytesize, parity, stopbits)) def run(self): while True: c = self._serial.read(1) #print("serial: read ", c) if len(c) == 1 and c in self._playerDict: #print("sending button event ", c, " ", self._playerDict[c]) self._sendButtonEvent(self._playerDict[c]) @QtCore.Slot(bool) def buzzersOpen(self, isOpen): self._serial.write("ABCD") #print("serial: ABCD") if isOpen: # buzzers are open now self._serial.write("R") #print("serial: R") else: # tell them that buttons are closed now self._serial.write("r") #print("serial: r") @QtCore.Slot(int) def playerGotQuestion(self, playerNo): if not (playerNo >= 1 and playerNo <= 4): return self._serial.write(chr(ord('a') + (playerNo-1))) #print("serial: ", chr(ord('a') + (playerNo-1))) _add_input("BeopardySerial", BeopardySerialInput) class FifoInput(BaseInput): def __init__(self, app, fifoPath, debug=False): super(FifoInput, self).__init__(app) self._path = fifoPath self._debug = debug if not os.path.exists(self._path): try: os.mkfifo(self._path) except OSError as e: raise InputException("Could not create fifo '%s': %s" % (self._path, e)) if os.path.isfile(self._path) or os.path.isdir(self._path): raise InputException("Fifo '%s' is not a fifo" % self._path) print(" >> Fifo initialized, using '%s'" % self._path) def run(self): fifo = open(self._path) while True: c = fifo.read(1) if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'): self._sendButtonEvent(c) _add_input("Fifo", FifoInput) class UnixInput(BaseInput): def __init__(self, app, socketPath, debug=False): super(UnixInput, self).__init__(app) self._path = socketPath self._debug = debug self._currConn = None try: os.unlink(self._path) except OSError as e: if os.path.exists(self._path): raise InputException("Could not create domain socket '%s': %s" % (self._path, e)) self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.bind(self._path) print(" >> Unix domain socket initialized, using '%s'" % self._path) def run(self): self.sock.listen(1) while True: self._currConn, cli = self.sock.accept() try: while True: c = self._currConn.recv(1) if c == '': break if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'): self._sendButtonEvent(c) finally: self._currConn.close() self._currConn = None @QtCore.Slot(bool) def buzzersOpen(self, isOpen): if self._currConn: if isOpen: self._currConn.send("O") else: self._currConn.send("C") @QtCore.Slot(int) def playerGotQuestion(self, playerNo): if self._currConn: self._currConn.send("T%d" % playerNo) _add_input("Unix", UnixInput) class InputException(Exception): pass