From 7729a8a9db7f165fb7c2b3c5d7508a68d5bbfce4 Mon Sep 17 00:00:00 2001 From: MasterofJOKers Date: Tue, 17 May 2022 23:41:31 +0200 Subject: [PATCH] Add cronplayer This command's main purpose is to get run regularly via cron and play (a list of) files possibly repeated to a specific PulseAudio device. Additonally, it can just play a command or start as an HTTP server providing an interface to play local files, stop the current playback and show the current status. Actual playback is done via ffmpeg and paplay. --- cronplayer/cronplayer | 525 ++++++++++++++++++++++++++++++++++++++ cronplayer/example.config | 5 + cronplayer/setup.cfg | 18 ++ cronplayer/setup.py | 4 + 4 files changed, 552 insertions(+) create mode 100644 cronplayer/cronplayer create mode 100644 cronplayer/example.config create mode 100644 cronplayer/setup.cfg create mode 100644 cronplayer/setup.py diff --git a/cronplayer/cronplayer b/cronplayer/cronplayer new file mode 100644 index 0000000..28c7ea2 --- /dev/null +++ b/cronplayer/cronplayer @@ -0,0 +1,525 @@ +#!/usr/bin/env python3 +from datetime import datetime +import functools +from http.server import BaseHTTPRequestHandler, HTTPServer +import json +import os +from pathlib import Path +import queue +import signal +import subprocess +import threading +import time + +import click +from crontab import CronTab +import requests +import psutil + + +PA_CLIENT_NAME = 'cronplayer' + + +class Entry: + + def __init__(self, number=None, crontab=None, play_definitions=None): + self.number = number + self.crontab = crontab + self.play_definitions = play_definitions or [] + + def __repr__(self): + return f"Entry(number={self.number}, crontab={self.crontab}, play_definitions={self.play_definitions})" + + def play(self, pa_device_name=None, pa_client_name=PA_CLIENT_NAME, remote=None, debug=False, wait=True): + if not pa_device_name and not remote: + if debug: + print("DEBUG: Neither pa_device_name nor remote given: not playing anything") + return + + if not self.play_definitions: + if debug: + print(f"DEBUG: Not playing {self} without definitions") + return + + if pa_device_name and remote and debug: + print("Both pa_device_name and remote given: using remote to play") + + if remote: + return self._play_remote(remote, debug) + else: + return self._play_pa(pa_device_name, pa_client_name, debug, wait) + + def _play_remote(self, remote, debug): + payload = [pd.to_dict() for pd in self.play_definitions] + if debug: + print(f"DEBUG: Would PUT {payload} to {remote}") + else: + requests.put(remote, json=payload) + + def _play_pa(self, pa_device_name, pa_client_name, debug, wait): + env = {'XDG_RUNTIME_DIR': '/run/user/1000'} + + p = subprocess.run(f"pactl list sinks | grep -e Name: -e Description: | grep '{pa_device_name}' -B 1 | " + "head -n 1 | awk '{print $2;}'", + capture_output=True, shell=True, env=env, check=True, encoding='utf-8') + pa_device = p.stdout.strip() + + if not pa_device: + raise RuntimeError(f"Could not find PA device with Description set to '{pa_device_name}'.") + + ffmpeg_file_input = '\n'.join(f"file '{p}'" for pd in self.play_definitions for p in pd.paths) + cmd = ( + 'ffmpeg -loglevel 0 ' + f"-f concat -safe 0 -i <(echo \"{ffmpeg_file_input}\") " + '-f wav - | ' + f"paplay --client-name {pa_client_name} --device '{pa_device}'") + # Using this would output to pulseaudio directly, but this runs short as + # ffmpeg quits once it's encoded everything and doesn't wait till that's + # all played on pulse + # -f pulse -device "${DEVICE}" -name "Uhr" "Uhr" + + if debug: + print(f"DEBUG: {cmd}") + else: + kwargs = dict(shell=True, executable='/bin/bash', env=env) + if wait: + subprocess.run(cmd, **kwargs) + else: + return subprocess.Popen(cmd, **kwargs) + + +class PlayDefinitionError(Exception): + pass + + +class PlayDefinition: + + def __init__(self, path, repeat=1): + self._path = path + self._repeat = repeat + + @property + def paths(self): + return [self._path] * self._repeat + + @classmethod + def create(cls, path, repeat=1, check_exists=True): + path = Path(path).expanduser().resolve() + if check_exists and not path.exists(): + raise PlayDefinitionError(f"{path} does not exist") + + return cls(path, repeat) + + @classmethod + def create_from_dict(cls, parsed_definition, check_exists=True): + if 'repeat' not in parsed_definition: + raise PlayDefinitionError("Entry is missing key 'repeat'") + if 'file' not in parsed_definition: + raise PlayDefinitionError("Entry is missing key 'file'") + + dt = datetime.now() + params = { + 'month': dt.month, + 'day': dt.day, + 'dow': dt.isoweekday(), + 'hour': dt.hour, + 'analog_hour': int(dt.strftime('%I')), + 'minute': dt.minute + } + + repeat = parsed_definition['repeat'] + if isinstance(repeat, int): + repeat = str(repeat) + try: + repeat = repeat.format(**params) + except Exception as e: + raise PlayDefinitionError(f"definition formatting failed: {e}") + + repeat = eval(repeat, {}, {}) + + return cls.create(parsed_definition['file'], repeat, check_exists=check_exists) + + @classmethod + def create_from_json(cls, json_definition, check_exists=True): + return cls.create_from_dict(json.loads(json_definition), check_exists=check_exists) + + def __repr__(self): + return f"PlayDefinition(path={self._path}, repeat={self._repeat})" + + def to_dict(self): + return {'file': str(self._path), 'repeat': self._repeat} + + def to_json(self): + return json.dumps(self.to_dict()) + + +def client_options(f): + @click.option('--pa-device-name') + @click.option('--pa-client-name', default=PA_CLIENT_NAME) + @click.option('--remote') + @click.option('--debug', is_flag=True) + @functools.wraps(f) + def wrapper(*args, **kwargs): + if kwargs.get('pa_device_name') and kwargs.get('remote'): + raise click.ClickException('"--remote" and "--pa-device-name" are mutually exclusive') + + f(*args, **kwargs) + + return wrapper + + +@click.group() +def cli(): + ... + + +@cli.command() +@click.option('--file', 'file_path', type=click.Path(), required=True) +@click.option('--repeat', default="1") +@client_options +def play(file_path, repeat, pa_device_name, pa_client_name, remote, debug): + """Play a file with the given repeat definition + + Construct a PlayDefinition and an Entry and play them as if they would have + been read from the crontab-like config-file. + """ + pd = PlayDefinition.create_from_json( + json.dumps({'file': file_path, 'repeat': repeat}), check_exists=not bool(remote)) + + entry = Entry(0, None, [pd]) + entry.play(pa_device_name, pa_client_name, remote, debug) + + +@cli.command() +@click.option('--remote', required=True) +def stop(remote): + """Send the stop command to the given remote""" + requests.post(remote) + + +@cli.command() +@click.option('--config-file', type=click.Path(exists=True), required=True) +@client_options +def play_by_config(config_file, pa_device_name, pa_client_name, remote, debug): + """Play the defined entries according to given crontab-like config-file + + Only entries for which the current time matches the definition in the + crontab-like config-file are played. + """ + # parse the config file + # config file entry starting with a # (and any space before) are ignored + # entries contain a crontab definition followed by a ' = ' and then one or + # more play definitions separated by '|' + # + # a play definition is either + # * a path to a file (without '|') + # * a JSON dict containing the keys "file" and "repeat", where "times" can + # contain mathematical expressions and can use Python formatting with the + # variables month, day, dow (starting at monday being 1), hour (0 - 23), + # analog_hour (1 - 12), minute containing the current run's values + with open(config_file) as f: + lines = [l.strip() for l in f.readlines()] + + lines = [(i, l) for (i, l) in enumerate(lines, start=1) + if l and not l.startswith('#')] + + entries = [] + for i, line in lines: + if ' = ' not in line: + print(f"Error: Config file {config_file} line {i} contains no ' = '. Ignoring.") + continue + + cron_definition, play_definitions = line.split(' = ', 1) + + try: + c = CronTab(cron_definition) + except ValueError as e: + print(f"Error: Config file {config_file} line {i} contains no valid cron definition: {e}") + continue + + entry = Entry(i, c) + + play_definitions = play_definitions.split('|') + for p_def in play_definitions: + p_def = p_def.strip() + try: + play_definition = PlayDefinition.create_from_json(p_def) + except PlayDefinitionError as e: + print(f"Error: Config file {config_file} line {i}: {e}") + continue + except json.decoder.JSONDecodeError: + # no valid JSON, treat as filepath + try: + play_definition = PlayDefinition.create(p_def) + except PlayDefinitionError as e: + print(f"Error: Config file {config_file} line {i}: {e}") + continue + + entry.play_definitions.append(play_definition) + + if entry.play_definitions: + entries.append(entry) + + if debug: + from pprint import pprint + print("\n## All entries") + pprint(entries) + print() + + this_runs_entries = [] + # we filter first into a new list, because multiple Entries might be valid + # for this run and the farther down the entry is defined the later it would + # be checked + for entry in entries: + # check if this entry should be played right now + if debug: + print(f"{entry.crontab.previous(default_utc=False)} {entry}") + + if entry.crontab.previous(default_utc=False) < -59: + continue + + this_runs_entries.append(entry) + + if debug: + from pprint import pprint + print("\n## This run's entries") + pprint(this_runs_entries) + print() + + for entry in this_runs_entries: + entry.play(pa_device_name, pa_client_name, remote, debug=debug) + + +class PlayerHttpServer(HTTPServer): + + def __init__(self, *args, player, **kwargs): + self.player = player + super().__init__(*args, **kwargs) + + +class PlayerHttpHandler(BaseHTTPRequestHandler): + + def do_GET(self): + """Return the last played thing on / only""" + if self.path != '/': + self.send_response(404, 'Invalid path') + return + + # return the last thing that was played + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + data = {'previous': None, 'playing': self.server.player.is_playing} + if self.server.player.last_played: + data['previous'] = [pd.to_dict() for pd in self.server.player.last_played.play_definitions] + self.wfile.write(bytes(json.dumps(data), 'utf-8')) + + def do_PUT(self): + """Accept something new to be played on / only""" + if self.path != '/': + self.send_response(404) + return + + if self.headers.get('Content-Type') != 'application/json': + self.send_response(400) + self.end_headers() + self.wfile.write(b'Content-type application/json only') + return + + content_length = self.headers.get('content-length') + length = int(content_length) if content_length else 0 + + if not length: + self.send_response(400) + self.end_headers() + self.wfile.write(b'Got empty body') + return + data = self.rfile.read(length) + try: + entry_data = json.loads(data) + except json.JSONDecodeError as e: + self.send_response(400) + self.end_headers() + self.wfile.write(bytes(f"Could not parse JSON: {e}", 'utf-8')) + return + + if not isinstance(entry_data, list): + self.send_response(400) + self.end_headers() + self.wfile.write(b'Entry must be a list of dicts') + return + if not entry_data: + self.send_response(400) + self.end_headers() + self.wfile.write(b'Empty entry') + return + + entry = Entry() + for i, pd_data in enumerate(entry_data): + try: + entry.play_definitions.append(PlayDefinition.create_from_dict(pd_data)) + except Exception as e: + self.send_response(400) + self.end_headers() + self.wfile.write(bytes(f"Invalid PlayDefinition {i}: {e}"), 'utf-8') + return + self.server.player.append(entry) + self.send_response(200) + self.end_headers() + + def do_POST(self): + """Stop the currently-playing entry on /""" + if self.path != '/': + self.send_response(404) + return + + self.server.player.stop() + self.send_response(200) + self.end_headers() + + +class Player(threading.Thread): + + def __init__(self, *args, done_event, entry, pa_device_name, pa_client_name, debug, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) + self._done_event = done_event + self._entry = entry + self._process = None + self._child_pids = [] + self._child_pids_event = threading.Event() + + self._pa_device_name = pa_device_name + self._pa_client_name = pa_client_name + self._debug = debug + + def stop(self): + if not self._process: + print("no process to stop") + return + + self._child_pids_event.wait() + + for child_pid in self._child_pids: + os.kill(child_pid, signal.SIGTERM) + self._process.terminate() + try: + self._process.wait(2) + except subprocess.TimeoutExpired: + self._process.kill() + # TODO check if children are still running and try kill + # for child_pid in self._child_pids: + + def run(self): + self._process = self._entry.play(pa_device_name=self._pa_device_name, pa_client_name=self._pa_client_name, + debug=self._debug, wait=False) + p = psutil.Process(self._process.pid) + while len(p.children()) < 1: + time.sleep(0.2) + for child in p.children(): + self._child_pids.append(child.pid) + self._child_pids_event.set() + self._process.wait() + + +class PlayerManager(threading.Thread): + + def __init__(self, *args, pa_device_name, pa_client_name, debug, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) + + self._pa_device_name = pa_device_name + self._pa_client_name = pa_client_name + self._debug = debug + + self._queue = queue.SimpleQueue() + # event signalling this PlayerThread to stop + self._stop_thread_event = threading.Event() + + # a Player object currently running + self._playing_thread = None + # an event to be set by the currently running Player to signal it's + # done + self._playing_done_event = threading.Event() + # event signalling to stop the current Player + self._stop = threading.Event() + + # an Entry object that was last played + self.last_played = None + + def append(self, entry): + self._queue.put(entry) + + def stop(self): + self._stop.set() + + def stop_thread(self): + self._stop_thread_event.set() + if self._playing_thread and self._playing_thread.is_alive(): + self._playing_thread.stop() + + @property + def is_playing(self): + return bool(self._playing_thread) + + def run(self): + while not self._stop_thread_event.is_set(): + # check if something is currently playing + if self._playing_thread: + # try to stop what's currently happening + if self._stop.is_set(): + self._playing_thread.stop() + self._stop.clear() + + # clean up if playing was done + if not self._playing_thread.is_alive(): + self._playing_thread.join() + self._playing_done_event.clear() + self._playing_thread = None + continue + # wait for playing to be done for some time, but continue to + # let stop() have some effect + self._playing_done_event.wait(1) + continue + + try: + entry = self._queue.get(block=True, timeout=1) + except queue.Empty: + continue + + self.last_played = entry + + # make sure we don't immediately try to stop it again, because it was set some time ago + self._stop.clear() + + player = Player(entry=entry, done_event=self._playing_done_event, pa_device_name=self._pa_device_name, + pa_client_name=self._pa_client_name, debug=self._debug, daemon=True) + player.start() + self._playing_thread = player + + +@cli.command() +@click.option('--listen-host', default='127.0.0.1') +@click.option('--listen-port', default=8227) +@click.option('--pa-device-name', required=True) +@click.option('--pa-client-name', default=PA_CLIENT_NAME) +@click.option('--debug', is_flag=True) +def daemon(listen_host, listen_port, pa_device_name, pa_client_name, debug): + """Listen on given host:port with an HTTP server + + Via HTTP, new requests to play files are accepted as PlayDefinitions as new + entry and currently playing entries can be stopped. The last playing entry + can be replayed, too. + + If --config-file is given, additionally play the crontab-like config + """ + play_thread = PlayerManager(pa_device_name=pa_device_name, pa_client_name=pa_client_name, + debug=debug, daemon=True) + play_thread.start() + + server_address = (listen_host, listen_port) + httpd = PlayerHttpServer(server_address, PlayerHttpHandler, player=play_thread) + httpd.serve_forever() + + +if __name__ == '__main__': + import sys + sys.exit(cli()) diff --git a/cronplayer/example.config b/cronplayer/example.config new file mode 100644 index 0000000..72e7b37 --- /dev/null +++ b/cronplayer/example.config @@ -0,0 +1,5 @@ +# # m h dom mon dow = play definition +15 * * * * = ~/Uhr/viertel.wav +30 * * * * = ~/Uhr/halb.wav +45 * * * * = ~/Uhr/dreiviertel.wav +00 * * * * = ~/Uhr/um.wav | {"file": "~/Uhr/firstgong.wav", "repeat": "{hour} - 1"} | ~/Uhr/endgong.wav diff --git a/cronplayer/setup.cfg b/cronplayer/setup.cfg new file mode 100644 index 0000000..0251394 --- /dev/null +++ b/cronplayer/setup.cfg @@ -0,0 +1,18 @@ +[metadata] +name = cronplayer +version = 0.1 +description = Play a defined bunch of files at a certain time as defined in cron-like config file - possibly remote via integrated webserver + +[options] +scripts = cronplayer +install_requires = + crontab + click + psutil + requests + + +[flake8] +max-line-length = 120 +exclude = .git,__pycache__,*.egg-info,*lib/python* +ignore = E241,E741,W503,W504 diff --git a/cronplayer/setup.py b/cronplayer/setup.py new file mode 100644 index 0000000..056ba45 --- /dev/null +++ b/cronplayer/setup.py @@ -0,0 +1,4 @@ +import setuptools + + +setuptools.setup()