123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- # Licensed under GPLv3
- # Written by Sebastian Lohff (seba@someserver.de)
- # http://seba-geek.de/projects/seopardy/
-
- from __future__ import print_function
-
- import os
- import serial
- import socket
- from PySide import QtCore
-
- _inputs = {}
-
- def _add_input(inputType, inputClass):
- global _inputs
- _inputs[inputType] = inputClass
-
- def get_input(inputType, args, app):
- global _inputs
- if inputType not in _inputs:
- raise ValueError("'%s' is an unknown input type" % inputType)
- return _inputs[inputType](app, *args)
-
- class BaseInput(QtCore.QThread):
- buttonEvent = QtCore.Signal(int)
-
- def __init__(self, app, parent=None):
- super(BaseInput, self).__init__(parent)
- self._app = app
-
- def _sendButtonEvent(self, btn):
- self.buttonEvent.emit(int(btn))
-
- @QtCore.Slot(bool)
- def buzzersOpen(self, isOpen):
- """ Called when buzzers are opened or closed for answering
-
- isOpen - True if question got opened, False if closed
- """
- #print("Buzzers are now " + ("open" if isOpen else "closed"))
- pass
-
- @QtCore.Slot(int)
- def playerGotQuestion(self, playerNo):
- """ Called when a player got its turn to answer the question
-
- playerNo - number (int) of player which can answer (starting from 1)
- """
- #print("player %d can answer now" % playerNo)
- pass
-
- class SerialInput(BaseInput):
- def __init__(self, app, device, baudrate=9600, bytesize=8, parity='N', stopbits=1):
- super(SerialInput, self).__init__(app)
- try:
- self._serial = serial.Serial(device, baudrate, bytesize, parity, stopbits)
- except serial.SerialException as e:
- raise InputException("Error: %s" % e)
-
- print(" >> serial %s initialized with rate %s %s%s%s" % (device, baudrate, bytesize, parity, stopbits))
-
- def run(self):
- while True:
- c = self._serial.read(1)
- if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'):
- self._sendButtonEvent(c)
-
- _add_input("Serial", SerialInput)
-
- class BeopardySerialInput(BaseInput):
- def __init__(self, app, device, baudrate=9600, bytesize=8, parity='N', stopbits=1):
- super(BeopardySerialInput, self).__init__(app)
- try:
- self._serial = serial.Serial(device, baudrate, bytesize, parity, stopbits)
- except serial.SerialException as e:
- raise InputException("Error: %s" % e)
-
- # playerDict
- self._playerDict = {
- 'a': 1,
- 'b': 2,
- 'c': 3,
- 'd': 4,
- }
-
- print(" >> serial %s initialized with rate %s %s%s%s" % (device, baudrate, bytesize, parity, stopbits))
-
- def run(self):
- while True:
- c = self._serial.read(1)
- #print("serial: read ", c)
- if len(c) == 1 and c in self._playerDict:
- #print("sending button event ", c, " ", self._playerDict[c])
- self._sendButtonEvent(self._playerDict[c])
-
- @QtCore.Slot(bool)
- def buzzersOpen(self, isOpen):
- self._serial.write("ABCD")
- #print("serial: ABCD")
- if isOpen:
- # buzzers are open now
- self._serial.write("R")
- #print("serial: R")
- else:
- # tell them that buttons are closed now
- self._serial.write("r")
- #print("serial: r")
-
- @QtCore.Slot(int)
- def playerGotQuestion(self, playerNo):
- if not (playerNo >= 1 and playerNo <= 4):
- return
-
- self._serial.write(chr(ord('a') + (playerNo-1)))
- #print("serial: ", chr(ord('a') + (playerNo-1)))
-
-
- _add_input("BeopardySerial", BeopardySerialInput)
-
- class FifoInput(BaseInput):
- def __init__(self, app, fifoPath, debug=False):
- super(FifoInput, self).__init__(app)
- self._path = fifoPath
- self._debug = debug
- if not os.path.exists(self._path):
- try:
- os.mkfifo(self._path)
- except OSError as e:
- raise InputException("Could not create fifo '%s': %s" % (self._path, e))
- if os.path.isfile(self._path) or os.path.isdir(self._path):
- raise InputException("Fifo '%s' is not a fifo" % self._path)
-
- print(" >> Fifo initialized, using '%s'" % self._path)
-
- def run(self):
- fifo = open(self._path)
- while True:
- c = fifo.read(1)
- if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'):
- self._sendButtonEvent(c)
-
- _add_input("Fifo", FifoInput)
-
- class UnixInput(BaseInput):
- def __init__(self, app, socketPath, debug=False):
- super(UnixInput, self).__init__(app)
- self._path = socketPath
- self._debug = debug
- self._currConn = None
- try:
- os.unlink(self._path)
- except OSError as e:
- if os.path.exists(self._path):
- raise InputException("Could not create domain socket '%s': %s" % (self._path, e))
-
- self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.sock.bind(self._path)
- print(" >> Unix domain socket initialized, using '%s'" % self._path)
-
- def run(self):
- self.sock.listen(1)
- while True:
- self._currConn, cli = self.sock.accept()
- try:
- while True:
- c = self._currConn.recv(1)
- if c == '':
- break
- if len(c) == 1 and ord(c) > ord('0') and ord(c) <= ord('9'):
- self._sendButtonEvent(c)
- finally:
- self._currConn.close()
- self._currConn = None
-
- @QtCore.Slot(bool)
- def buzzersOpen(self, isOpen):
- if self._currConn:
- if isOpen:
- self._currConn.send("O")
- else:
- self._currConn.send("C")
-
- @QtCore.Slot(int)
- def playerGotQuestion(self, playerNo):
- if self._currConn:
- self._currConn.send("T%d" % playerNo)
-
- _add_input("Unix", UnixInput)
-
- class InputException(Exception):
- pass
|