191 lines
5.1 KiB
Python
191 lines
5.1 KiB
Python
# write a thread that reads buttoninput from fifo or pyserial or something
|
|
# https://gist.github.com/elshaka/2999082
|
|
# and make this configureable
|
|
from __future__ import print_function
|
|
|
|
import os
|
|
import serial
|
|
import socket
|
|
from PySide import QtCore
|
|
|
|
_inputs = {}
|
|
|
|
def _add_input(inputType, inputClass):
|
|
global _inputs
|
|
_inputs[inputType] = inputClass
|
|
|
|
def get_input(inputType, args, app):
|
|
global _inputs
|
|
if inputType not in _inputs:
|
|
raise ValueError("'%s' is an unknown input type" % inputType)
|
|
return _inputs[inputType](app, *args)
|
|
|
|
class BaseInput(QtCore.QThread):
|
|
buttonEvent = QtCore.Signal(int)
|
|
|
|
def __init__(self, app, parent=None):
|
|
super(BaseInput, self).__init__(parent)
|
|
self._app = app
|
|
|
|
def _sendButtonEvent(self, btn):
|
|
self.buttonEvent.emit(int(btn))
|
|
|
|
@QtCore.Slot(bool)
|
|
def buzzersOpen(self, isOpen):
|
|
""" Called when buzzers are opened or closed for answering
|
|
|
|
isOpen - True if question got opened, False if closed
|
|
"""
|
|
#print("Buzzers are now " + ("open" if isOpen else "closed"))
|
|
pass
|
|
|
|
@QtCore.Slot(int)
|
|
def playerGotQuestion(self, playerNo):
|
|
""" Called when a player got its turn to answer the question
|
|
|
|
playerNo - number (int) of player which can answer (starting from 1)
|
|
"""
|
|
#print("player %d can answer now" % playerNo)
|
|
pass
|
|
|
|
class SerialInput(BaseInput):
|
|
def __init__(self, app, device, baudrate=9600, bytesize=8, parity='N', stopbits=1):
|
|
super(SerialInput, self).__init__(app)
|
|
try:
|
|
self._serial = serial.Serial(device, baudrate, bytesize, parity, stopbits)
|
|
except serial.SerialException as e:
|
|
raise InputException("Error: %s" % e)
|
|
|
|
print(" >> serial %s initialized with rate %s %s%s%s" % (device, baudrate, bytesize, parity, stopbits))
|
|
|
|
def run(self):
|
|
while True:
|
|
c = self._serial.read(1)
|
|
if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'):
|
|
self._sendButtonEvent(c)
|
|
|
|
_add_input("Serial", SerialInput)
|
|
|
|
class BeopardySerialInput(BaseInput):
|
|
def __init__(self, app, device, baudrate=9600, bytesize=8, parity='N', stopbits=1):
|
|
super(BeopardySerialInput, self).__init__(app)
|
|
try:
|
|
self._serial = serial.Serial(device, baudrate, bytesize, parity, stopbits)
|
|
except serial.SerialException as e:
|
|
raise InputException("Error: %s" % e)
|
|
|
|
# playerDict
|
|
self._playerDict = {
|
|
'a': 1,
|
|
'b': 2,
|
|
'c': 3,
|
|
'd': 4,
|
|
}
|
|
|
|
print(" >> serial %s initialized with rate %s %s%s%s" % (device, baudrate, bytesize, parity, stopbits))
|
|
|
|
def run(self):
|
|
while True:
|
|
c = self._serial.read(1)
|
|
#print("serial: read ", c)
|
|
if len(c) == 1 and c in self._playerDict:
|
|
#print("sending button event ", c, " ", self._playerDict[c])
|
|
self._sendButtonEvent(self._playerDict[c])
|
|
|
|
@QtCore.Slot(bool)
|
|
def buzzersOpen(self, isOpen):
|
|
self._serial.write("ABCD")
|
|
#print("serial: ABCD")
|
|
if isOpen:
|
|
# buzzers are open now
|
|
self._serial.write("R")
|
|
#print("serial: R")
|
|
else:
|
|
# tell them that buttons are closed now
|
|
self._serial.write("r")
|
|
#print("serial: r")
|
|
|
|
@QtCore.Slot(int)
|
|
def playerGotQuestion(self, playerNo):
|
|
if not (playerNo >= 1 and playerNo <= 4):
|
|
return
|
|
|
|
self._serial.write(chr(ord('a') + (playerNo-1)))
|
|
#print("serial: ", chr(ord('a') + (playerNo-1)))
|
|
|
|
|
|
_add_input("BeopardySerial", BeopardySerialInput)
|
|
|
|
class FifoInput(BaseInput):
|
|
def __init__(self, app, fifoPath, debug=False):
|
|
super(FifoInput, self).__init__(app)
|
|
self._path = fifoPath
|
|
self._debug = debug
|
|
if not os.path.exists(self._path):
|
|
try:
|
|
os.mkfifo(self._path)
|
|
except OSError as e:
|
|
raise InputException("Could not create fifo '%s': %s" % (self._path, e))
|
|
if os.path.isfile(self._path) or os.path.isdir(self._path):
|
|
raise InputException("Fifo '%s' is not a fifo" % self._path)
|
|
|
|
print(" >> Fifo initialized, using '%s'" % self._path)
|
|
|
|
def run(self):
|
|
fifo = open(self._path)
|
|
while True:
|
|
c = fifo.read(1)
|
|
if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'):
|
|
self._sendButtonEvent(c)
|
|
|
|
_add_input("Fifo", FifoInput)
|
|
|
|
class UnixInput(BaseInput):
|
|
def __init__(self, app, socketPath, debug=False):
|
|
super(UnixInput, self).__init__(app)
|
|
self._path = socketPath
|
|
self._debug = debug
|
|
self._currConn = None
|
|
try:
|
|
os.unlink(self._path)
|
|
except OSError as e:
|
|
if os.path.exists(self._path):
|
|
raise InputException("Could not create domain socket '%s': %s" % (self._path, e))
|
|
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.sock.bind(self._path)
|
|
print(" >> Unix domain socket initialized, using '%s'" % self._path)
|
|
|
|
def run(self):
|
|
self.sock.listen(1)
|
|
while True:
|
|
self._currConn, cli = self.sock.accept()
|
|
try:
|
|
while True:
|
|
c = self._currConn.recv(1)
|
|
if c == '':
|
|
break
|
|
if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'):
|
|
self._sendButtonEvent(c)
|
|
finally:
|
|
self._currConn.close()
|
|
self._currConn = None
|
|
|
|
@QtCore.Slot(bool)
|
|
def buzzersOpen(self, isOpen):
|
|
if self._currConn:
|
|
if isOpen:
|
|
self._currConn.send("O")
|
|
else:
|
|
self._currConn.send("C")
|
|
|
|
@QtCore.Slot(int)
|
|
def playerGotQuestion(self, playerNo):
|
|
if self._currConn:
|
|
self._currConn.send("T%d" % playerNo)
|
|
|
|
_add_input("Unix", UnixInput)
|
|
|
|
class InputException(Exception):
|
|
pass
|