Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

104 rader
3.4 KiB

#!/usr/bin/env python3
import socketserver
import threading
class WrongArgumentCount(Exception):
def __init__(self, command, expected, found):
if isinstance(expected, list):
expected = "{} to {}".format(**expected)
msg = "Command {} expects {} arguments, found {}".format(
command, expected, found
)
super(WrongArgumentCount, self).__init__(msg)
class CommandNotFound(Exception):
def __init__(self, command):
msg = "Command {} does not exist".format(command)
super(CommandNotFound, self).__init__(msg)
class AtmoClientHandler(socketserver.StreamRequestHandler):
def setup(self):
super(AtmoClientHandler, self).setup()
self.authenticated = True
self.quit = False
def handle(self):
try:
while not self.quit:
self.send(">>> ", end="")
data = self.rfile.readline().decode().strip()
tokens = data.split()
if len(tokens) > 0:
self.handle_command(tokens)
except BrokenPipeError:
print(" >> Client disconnected")
def handle_command(self, tokens):
cmd = tokens[0].lower()
args = tokens[1:]
try:
if cmd == "reload":
self._ensure_args(cmd, 0, args)
self.atmo.load_sounds()
self.send("Sounds reloaded")
elif cmd == "status":
self._ensure_args(cmd, 0, args)
self.send("Currently playing:")
s = self.atmo.get_status()
for line in s.splitlines():
self.send(line)
elif cmd == "pause":
self._ensure_args(cmd, 0, args)
is_paused = self.atmo.pause()
self.send("Paused" if is_paused else "Resume")
elif cmd == "help":
self._ensure_args(cmd, 0, args)
self.send_usage()
elif cmd == "exit":
self.quit = True
self.send("Bye")
else:
raise CommandNotFound(cmd)
except CommandNotFound as e:
self.send("Error: {}".format(e))
self.send_usage()
except WrongArgumentCount as e:
self.send("Error: {}".format(e))
def send_usage(self):
self.send("The following commands are available: ")
self.send(" reload")
self.send(" status")
self.send(" pause")
@staticmethod
def _ensure_args(cmd, expected, args):
if isinstance(expected, list):
if not expected[0] <= len(args) <= expected[1]:
raise WrongArgumentCount(cmd, expected, len(args))
else:
if len(args) != expected:
raise WrongArgumentCount(cmd, expected, len(args))
def send(self, line, end="\n"):
self.wfile.write("{}{}".format(line, end).encode())
class ReusableThreadingTCPServer(socketserver.ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True
class AtmoReplRunner(threading.Thread):
def __init__(self, atmo, host="0.0.0.0", port=7723):
super(AtmoReplRunner, self).__init__()
AtmoClientHandler.atmo = atmo
self._client = ReusableThreadingTCPServer((host, port), AtmoClientHandler)
self.daemon = True
def run(self):
self._client.serve_forever()