diff --git a/cronplayer/cronplayer b/cronplayer/cronplayer new file mode 100644 index 0000000..28c7ea2 --- /dev/null +++ b/cronplayer/cronplayer @@ -0,0 +1,525 @@ +#!/usr/bin/env python3 +from datetime import datetime +import functools +from http.server import BaseHTTPRequestHandler, HTTPServer +import json +import os +from pathlib import Path +import queue +import signal +import subprocess +import threading +import time + +import click +from crontab import CronTab +import requests +import psutil + + +PA_CLIENT_NAME = 'cronplayer' + + +class Entry: + + def __init__(self, number=None, crontab=None, play_definitions=None): + self.number = number + self.crontab = crontab + self.play_definitions = play_definitions or [] + + def __repr__(self): + return f"Entry(number={self.number}, crontab={self.crontab}, play_definitions={self.play_definitions})" + + def play(self, pa_device_name=None, pa_client_name=PA_CLIENT_NAME, remote=None, debug=False, wait=True): + if not pa_device_name and not remote: + if debug: + print("DEBUG: Neither pa_device_name nor remote given: not playing anything") + return + + if not self.play_definitions: + if debug: + print(f"DEBUG: Not playing {self} without definitions") + return + + if pa_device_name and remote and debug: + print("Both pa_device_name and remote given: using remote to play") + + if remote: + return self._play_remote(remote, debug) + else: + return self._play_pa(pa_device_name, pa_client_name, debug, wait) + + def _play_remote(self, remote, debug): + payload = [pd.to_dict() for pd in self.play_definitions] + if debug: + print(f"DEBUG: Would PUT {payload} to {remote}") + else: + requests.put(remote, json=payload) + + def _play_pa(self, pa_device_name, pa_client_name, debug, wait): + env = {'XDG_RUNTIME_DIR': '/run/user/1000'} + + p = subprocess.run(f"pactl list sinks | grep -e Name: -e Description: | grep '{pa_device_name}' -B 1 | " + "head -n 1 | awk '{print $2;}'", + capture_output=True, shell=True, env=env, check=True, encoding='utf-8') + pa_device = p.stdout.strip() + + if not pa_device: + raise RuntimeError(f"Could not find PA device with Description set to '{pa_device_name}'.") + + ffmpeg_file_input = '\n'.join(f"file '{p}'" for pd in self.play_definitions for p in pd.paths) + cmd = ( + 'ffmpeg -loglevel 0 ' + f"-f concat -safe 0 -i <(echo \"{ffmpeg_file_input}\") " + '-f wav - | ' + f"paplay --client-name {pa_client_name} --device '{pa_device}'") + # Using this would output to pulseaudio directly, but this runs short as + # ffmpeg quits once it's encoded everything and doesn't wait till that's + # all played on pulse + # -f pulse -device "${DEVICE}" -name "Uhr" "Uhr" + + if debug: + print(f"DEBUG: {cmd}") + else: + kwargs = dict(shell=True, executable='/bin/bash', env=env) + if wait: + subprocess.run(cmd, **kwargs) + else: + return subprocess.Popen(cmd, **kwargs) + + +class PlayDefinitionError(Exception): + pass + + +class PlayDefinition: + + def __init__(self, path, repeat=1): + self._path = path + self._repeat = repeat + + @property + def paths(self): + return [self._path] * self._repeat + + @classmethod + def create(cls, path, repeat=1, check_exists=True): + path = Path(path).expanduser().resolve() + if check_exists and not path.exists(): + raise PlayDefinitionError(f"{path} does not exist") + + return cls(path, repeat) + + @classmethod + def create_from_dict(cls, parsed_definition, check_exists=True): + if 'repeat' not in parsed_definition: + raise PlayDefinitionError("Entry is missing key 'repeat'") + if 'file' not in parsed_definition: + raise PlayDefinitionError("Entry is missing key 'file'") + + dt = datetime.now() + params = { + 'month': dt.month, + 'day': dt.day, + 'dow': dt.isoweekday(), + 'hour': dt.hour, + 'analog_hour': int(dt.strftime('%I')), + 'minute': dt.minute + } + + repeat = parsed_definition['repeat'] + if isinstance(repeat, int): + repeat = str(repeat) + try: + repeat = repeat.format(**params) + except Exception as e: + raise PlayDefinitionError(f"definition formatting failed: {e}") + + repeat = eval(repeat, {}, {}) + + return cls.create(parsed_definition['file'], repeat, check_exists=check_exists) + + @classmethod + def create_from_json(cls, json_definition, check_exists=True): + return cls.create_from_dict(json.loads(json_definition), check_exists=check_exists) + + def __repr__(self): + return f"PlayDefinition(path={self._path}, repeat={self._repeat})" + + def to_dict(self): + return {'file': str(self._path), 'repeat': self._repeat} + + def to_json(self): + return json.dumps(self.to_dict()) + + +def client_options(f): + @click.option('--pa-device-name') + @click.option('--pa-client-name', default=PA_CLIENT_NAME) + @click.option('--remote') + @click.option('--debug', is_flag=True) + @functools.wraps(f) + def wrapper(*args, **kwargs): + if kwargs.get('pa_device_name') and kwargs.get('remote'): + raise click.ClickException('"--remote" and "--pa-device-name" are mutually exclusive') + + f(*args, **kwargs) + + return wrapper + + +@click.group() +def cli(): + ... + + +@cli.command() +@click.option('--file', 'file_path', type=click.Path(), required=True) +@click.option('--repeat', default="1") +@client_options +def play(file_path, repeat, pa_device_name, pa_client_name, remote, debug): + """Play a file with the given repeat definition + + Construct a PlayDefinition and an Entry and play them as if they would have + been read from the crontab-like config-file. + """ + pd = PlayDefinition.create_from_json( + json.dumps({'file': file_path, 'repeat': repeat}), check_exists=not bool(remote)) + + entry = Entry(0, None, [pd]) + entry.play(pa_device_name, pa_client_name, remote, debug) + + +@cli.command() +@click.option('--remote', required=True) +def stop(remote): + """Send the stop command to the given remote""" + requests.post(remote) + + +@cli.command() +@click.option('--config-file', type=click.Path(exists=True), required=True) +@client_options +def play_by_config(config_file, pa_device_name, pa_client_name, remote, debug): + """Play the defined entries according to given crontab-like config-file + + Only entries for which the current time matches the definition in the + crontab-like config-file are played. + """ + # parse the config file + # config file entry starting with a # (and any space before) are ignored + # entries contain a crontab definition followed by a ' = ' and then one or + # more play definitions separated by '|' + # + # a play definition is either + # * a path to a file (without '|') + # * a JSON dict containing the keys "file" and "repeat", where "times" can + # contain mathematical expressions and can use Python formatting with the + # variables month, day, dow (starting at monday being 1), hour (0 - 23), + # analog_hour (1 - 12), minute containing the current run's values + with open(config_file) as f: + lines = [l.strip() for l in f.readlines()] + + lines = [(i, l) for (i, l) in enumerate(lines, start=1) + if l and not l.startswith('#')] + + entries = [] + for i, line in lines: + if ' = ' not in line: + print(f"Error: Config file {config_file} line {i} contains no ' = '. Ignoring.") + continue + + cron_definition, play_definitions = line.split(' = ', 1) + + try: + c = CronTab(cron_definition) + except ValueError as e: + print(f"Error: Config file {config_file} line {i} contains no valid cron definition: {e}") + continue + + entry = Entry(i, c) + + play_definitions = play_definitions.split('|') + for p_def in play_definitions: + p_def = p_def.strip() + try: + play_definition = PlayDefinition.create_from_json(p_def) + except PlayDefinitionError as e: + print(f"Error: Config file {config_file} line {i}: {e}") + continue + except json.decoder.JSONDecodeError: + # no valid JSON, treat as filepath + try: + play_definition = PlayDefinition.create(p_def) + except PlayDefinitionError as e: + print(f"Error: Config file {config_file} line {i}: {e}") + continue + + entry.play_definitions.append(play_definition) + + if entry.play_definitions: + entries.append(entry) + + if debug: + from pprint import pprint + print("\n## All entries") + pprint(entries) + print() + + this_runs_entries = [] + # we filter first into a new list, because multiple Entries might be valid + # for this run and the farther down the entry is defined the later it would + # be checked + for entry in entries: + # check if this entry should be played right now + if debug: + print(f"{entry.crontab.previous(default_utc=False)} {entry}") + + if entry.crontab.previous(default_utc=False) < -59: + continue + + this_runs_entries.append(entry) + + if debug: + from pprint import pprint + print("\n## This run's entries") + pprint(this_runs_entries) + print() + + for entry in this_runs_entries: + entry.play(pa_device_name, pa_client_name, remote, debug=debug) + + +class PlayerHttpServer(HTTPServer): + + def __init__(self, *args, player, **kwargs): + self.player = player + super().__init__(*args, **kwargs) + + +class PlayerHttpHandler(BaseHTTPRequestHandler): + + def do_GET(self): + """Return the last played thing on / only""" + if self.path != '/': + self.send_response(404, 'Invalid path') + return + + # return the last thing that was played + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + data = {'previous': None, 'playing': self.server.player.is_playing} + if self.server.player.last_played: + data['previous'] = [pd.to_dict() for pd in self.server.player.last_played.play_definitions] + self.wfile.write(bytes(json.dumps(data), 'utf-8')) + + def do_PUT(self): + """Accept something new to be played on / only""" + if self.path != '/': + self.send_response(404) + return + + if self.headers.get('Content-Type') != 'application/json': + self.send_response(400) + self.end_headers() + self.wfile.write(b'Content-type application/json only') + return + + content_length = self.headers.get('content-length') + length = int(content_length) if content_length else 0 + + if not length: + self.send_response(400) + self.end_headers() + self.wfile.write(b'Got empty body') + return + data = self.rfile.read(length) + try: + entry_data = json.loads(data) + except json.JSONDecodeError as e: + self.send_response(400) + self.end_headers() + self.wfile.write(bytes(f"Could not parse JSON: {e}", 'utf-8')) + return + + if not isinstance(entry_data, list): + self.send_response(400) + self.end_headers() + self.wfile.write(b'Entry must be a list of dicts') + return + if not entry_data: + self.send_response(400) + self.end_headers() + self.wfile.write(b'Empty entry') + return + + entry = Entry() + for i, pd_data in enumerate(entry_data): + try: + entry.play_definitions.append(PlayDefinition.create_from_dict(pd_data)) + except Exception as e: + self.send_response(400) + self.end_headers() + self.wfile.write(bytes(f"Invalid PlayDefinition {i}: {e}"), 'utf-8') + return + self.server.player.append(entry) + self.send_response(200) + self.end_headers() + + def do_POST(self): + """Stop the currently-playing entry on /""" + if self.path != '/': + self.send_response(404) + return + + self.server.player.stop() + self.send_response(200) + self.end_headers() + + +class Player(threading.Thread): + + def __init__(self, *args, done_event, entry, pa_device_name, pa_client_name, debug, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) + self._done_event = done_event + self._entry = entry + self._process = None + self._child_pids = [] + self._child_pids_event = threading.Event() + + self._pa_device_name = pa_device_name + self._pa_client_name = pa_client_name + self._debug = debug + + def stop(self): + if not self._process: + print("no process to stop") + return + + self._child_pids_event.wait() + + for child_pid in self._child_pids: + os.kill(child_pid, signal.SIGTERM) + self._process.terminate() + try: + self._process.wait(2) + except subprocess.TimeoutExpired: + self._process.kill() + # TODO check if children are still running and try kill + # for child_pid in self._child_pids: + + def run(self): + self._process = self._entry.play(pa_device_name=self._pa_device_name, pa_client_name=self._pa_client_name, + debug=self._debug, wait=False) + p = psutil.Process(self._process.pid) + while len(p.children()) < 1: + time.sleep(0.2) + for child in p.children(): + self._child_pids.append(child.pid) + self._child_pids_event.set() + self._process.wait() + + +class PlayerManager(threading.Thread): + + def __init__(self, *args, pa_device_name, pa_client_name, debug, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) + + self._pa_device_name = pa_device_name + self._pa_client_name = pa_client_name + self._debug = debug + + self._queue = queue.SimpleQueue() + # event signalling this PlayerThread to stop + self._stop_thread_event = threading.Event() + + # a Player object currently running + self._playing_thread = None + # an event to be set by the currently running Player to signal it's + # done + self._playing_done_event = threading.Event() + # event signalling to stop the current Player + self._stop = threading.Event() + + # an Entry object that was last played + self.last_played = None + + def append(self, entry): + self._queue.put(entry) + + def stop(self): + self._stop.set() + + def stop_thread(self): + self._stop_thread_event.set() + if self._playing_thread and self._playing_thread.is_alive(): + self._playing_thread.stop() + + @property + def is_playing(self): + return bool(self._playing_thread) + + def run(self): + while not self._stop_thread_event.is_set(): + # check if something is currently playing + if self._playing_thread: + # try to stop what's currently happening + if self._stop.is_set(): + self._playing_thread.stop() + self._stop.clear() + + # clean up if playing was done + if not self._playing_thread.is_alive(): + self._playing_thread.join() + self._playing_done_event.clear() + self._playing_thread = None + continue + # wait for playing to be done for some time, but continue to + # let stop() have some effect + self._playing_done_event.wait(1) + continue + + try: + entry = self._queue.get(block=True, timeout=1) + except queue.Empty: + continue + + self.last_played = entry + + # make sure we don't immediately try to stop it again, because it was set some time ago + self._stop.clear() + + player = Player(entry=entry, done_event=self._playing_done_event, pa_device_name=self._pa_device_name, + pa_client_name=self._pa_client_name, debug=self._debug, daemon=True) + player.start() + self._playing_thread = player + + +@cli.command() +@click.option('--listen-host', default='127.0.0.1') +@click.option('--listen-port', default=8227) +@click.option('--pa-device-name', required=True) +@click.option('--pa-client-name', default=PA_CLIENT_NAME) +@click.option('--debug', is_flag=True) +def daemon(listen_host, listen_port, pa_device_name, pa_client_name, debug): + """Listen on given host:port with an HTTP server + + Via HTTP, new requests to play files are accepted as PlayDefinitions as new + entry and currently playing entries can be stopped. The last playing entry + can be replayed, too. + + If --config-file is given, additionally play the crontab-like config + """ + play_thread = PlayerManager(pa_device_name=pa_device_name, pa_client_name=pa_client_name, + debug=debug, daemon=True) + play_thread.start() + + server_address = (listen_host, listen_port) + httpd = PlayerHttpServer(server_address, PlayerHttpHandler, player=play_thread) + httpd.serve_forever() + + +if __name__ == '__main__': + import sys + sys.exit(cli()) diff --git a/cronplayer/example.config b/cronplayer/example.config new file mode 100644 index 0000000..72e7b37 --- /dev/null +++ b/cronplayer/example.config @@ -0,0 +1,5 @@ +# # m h dom mon dow = play definition +15 * * * * = ~/Uhr/viertel.wav +30 * * * * = ~/Uhr/halb.wav +45 * * * * = ~/Uhr/dreiviertel.wav +00 * * * * = ~/Uhr/um.wav | {"file": "~/Uhr/firstgong.wav", "repeat": "{hour} - 1"} | ~/Uhr/endgong.wav diff --git a/cronplayer/setup.cfg b/cronplayer/setup.cfg new file mode 100644 index 0000000..0251394 --- /dev/null +++ b/cronplayer/setup.cfg @@ -0,0 +1,18 @@ +[metadata] +name = cronplayer +version = 0.1 +description = Play a defined bunch of files at a certain time as defined in cron-like config file - possibly remote via integrated webserver + +[options] +scripts = cronplayer +install_requires = + crontab + click + psutil + requests + + +[flake8] +max-line-length = 120 +exclude = .git,__pycache__,*.egg-info,*lib/python* +ignore = E241,E741,W503,W504 diff --git a/cronplayer/setup.py b/cronplayer/setup.py new file mode 100644 index 0000000..056ba45 --- /dev/null +++ b/cronplayer/setup.py @@ -0,0 +1,4 @@ +import setuptools + + +setuptools.setup()