#!/usr/bin/env python3 from datetime import datetime import functools from http.server import BaseHTTPRequestHandler, HTTPServer import json import os from pathlib import Path import queue import signal import subprocess import threading import time import click from crontab import CronTab import requests import psutil PA_CLIENT_NAME = 'cronplayer' class Entry: def __init__(self, number=None, crontab=None, play_definitions=None): self.number = number self.crontab = crontab self.play_definitions = play_definitions or [] def __repr__(self): return f"Entry(number={self.number}, crontab={self.crontab}, play_definitions={self.play_definitions})" def play(self, pa_device_name=None, pa_client_name=PA_CLIENT_NAME, remote=None, debug=False, wait=True): if not pa_device_name and not remote: if debug: print("DEBUG: Neither pa_device_name nor remote given: not playing anything") return if not self.play_definitions: if debug: print(f"DEBUG: Not playing {self} without definitions") return if pa_device_name and remote and debug: print("Both pa_device_name and remote given: using remote to play") if remote: return self._play_remote(remote, debug) else: return self._play_pa(pa_device_name, pa_client_name, debug, wait) def _play_remote(self, remote, debug): payload = [pd.to_dict() for pd in self.play_definitions] if debug: print(f"DEBUG: Would PUT {payload} to {remote}") else: requests.put(remote, json=payload) def _play_pa(self, pa_device_name, pa_client_name, debug, wait): env = {'XDG_RUNTIME_DIR': '/run/user/1000'} p = subprocess.run(f"pactl list sinks | grep -e Name: -e Description: | grep '{pa_device_name}' -B 1 | " "head -n 1 | awk '{print $2;}'", capture_output=True, shell=True, env=env, check=True, encoding='utf-8') pa_device = p.stdout.strip() if not pa_device: raise RuntimeError(f"Could not find PA device with Description set to '{pa_device_name}'.") ffmpeg_file_input = '\n'.join(f"file '{p}'" for pd in self.play_definitions for p in pd.paths) cmd = ( 'ffmpeg -loglevel 0 ' f"-f concat -safe 0 -i <(echo \"{ffmpeg_file_input}\") " '-f wav - | ' f"paplay --client-name {pa_client_name} --device '{pa_device}'") # Using this would output to pulseaudio directly, but this runs short as # ffmpeg quits once it's encoded everything and doesn't wait till that's # all played on pulse # -f pulse -device "${DEVICE}" -name "Uhr" "Uhr" if debug: print(f"DEBUG: {cmd}") else: kwargs = dict(shell=True, executable='/bin/bash', env=env) if wait: subprocess.run(cmd, **kwargs) else: return subprocess.Popen(cmd, **kwargs) class PlayDefinitionError(Exception): pass class PlayDefinition: def __init__(self, path, repeat=1): self._path = path self._repeat = repeat @property def paths(self): return [self._path] * self._repeat @classmethod def create(cls, path, repeat=1, check_exists=True): path = Path(path).expanduser().resolve() if check_exists and not path.exists(): raise PlayDefinitionError(f"{path} does not exist") return cls(path, repeat) @classmethod def create_from_dict(cls, parsed_definition, check_exists=True): if 'repeat' not in parsed_definition: raise PlayDefinitionError("Entry is missing key 'repeat'") if 'file' not in parsed_definition: raise PlayDefinitionError("Entry is missing key 'file'") dt = datetime.now() params = { 'month': dt.month, 'day': dt.day, 'dow': dt.isoweekday(), 'hour': dt.hour, 'analog_hour': int(dt.strftime('%I')), 'minute': dt.minute } repeat = parsed_definition['repeat'] if isinstance(repeat, int): repeat = str(repeat) try: repeat = repeat.format(**params) except Exception as e: raise PlayDefinitionError(f"definition formatting failed: {e}") repeat = eval(repeat, {}, {}) return cls.create(parsed_definition['file'], repeat, check_exists=check_exists) @classmethod def create_from_json(cls, json_definition, check_exists=True): return cls.create_from_dict(json.loads(json_definition), check_exists=check_exists) def __repr__(self): return f"PlayDefinition(path={self._path}, repeat={self._repeat})" def to_dict(self): return {'file': str(self._path), 'repeat': self._repeat} def to_json(self): return json.dumps(self.to_dict()) def client_options(f): @click.option('--pa-device-name') @click.option('--pa-client-name', default=PA_CLIENT_NAME) @click.option('--remote') @click.option('--debug', is_flag=True) @functools.wraps(f) def wrapper(*args, **kwargs): if kwargs.get('pa_device_name') and kwargs.get('remote'): raise click.ClickException('"--remote" and "--pa-device-name" are mutually exclusive') f(*args, **kwargs) return wrapper @click.group() def cli(): ... @cli.command() @click.option('--file', 'file_path', type=click.Path(), required=True) @click.option('--repeat', default="1") @client_options def play(file_path, repeat, pa_device_name, pa_client_name, remote, debug): """Play a file with the given repeat definition Construct a PlayDefinition and an Entry and play them as if they would have been read from the crontab-like config-file. """ pd = PlayDefinition.create_from_json( json.dumps({'file': file_path, 'repeat': repeat}), check_exists=not bool(remote)) entry = Entry(0, None, [pd]) entry.play(pa_device_name, pa_client_name, remote, debug) @cli.command() @click.option('--remote', required=True) def stop(remote): """Send the stop command to the given remote""" requests.post(remote) @cli.command() @click.option('--config-file', type=click.Path(exists=True), required=True) @client_options def play_by_config(config_file, pa_device_name, pa_client_name, remote, debug): """Play the defined entries according to given crontab-like config-file Only entries for which the current time matches the definition in the crontab-like config-file are played. """ # parse the config file # config file entry starting with a # (and any space before) are ignored # entries contain a crontab definition followed by a ' = ' and then one or # more play definitions separated by '|' # # a play definition is either # * a path to a file (without '|') # * a JSON dict containing the keys "file" and "repeat", where "times" can # contain mathematical expressions and can use Python formatting with the # variables month, day, dow (starting at monday being 1), hour (0 - 23), # analog_hour (1 - 12), minute containing the current run's values with open(config_file) as f: lines = [l.strip() for l in f.readlines()] lines = [(i, l) for (i, l) in enumerate(lines, start=1) if l and not l.startswith('#')] entries = [] for i, line in lines: if ' = ' not in line: print(f"Error: Config file {config_file} line {i} contains no ' = '. Ignoring.") continue cron_definition, play_definitions = line.split(' = ', 1) try: c = CronTab(cron_definition) except ValueError as e: print(f"Error: Config file {config_file} line {i} contains no valid cron definition: {e}") continue entry = Entry(i, c) play_definitions = play_definitions.split('|') for p_def in play_definitions: p_def = p_def.strip() try: play_definition = PlayDefinition.create_from_json(p_def) except PlayDefinitionError as e: print(f"Error: Config file {config_file} line {i}: {e}") continue except json.decoder.JSONDecodeError: # no valid JSON, treat as filepath try: play_definition = PlayDefinition.create(p_def) except PlayDefinitionError as e: print(f"Error: Config file {config_file} line {i}: {e}") continue entry.play_definitions.append(play_definition) if entry.play_definitions: entries.append(entry) if debug: from pprint import pprint print("\n## All entries") pprint(entries) print() this_runs_entries = [] # we filter first into a new list, because multiple Entries might be valid # for this run and the farther down the entry is defined the later it would # be checked for entry in entries: # check if this entry should be played right now if debug: print(f"{entry.crontab.previous(default_utc=False)} {entry}") if entry.crontab.previous(default_utc=False) < -59: continue this_runs_entries.append(entry) if debug: from pprint import pprint print("\n## This run's entries") pprint(this_runs_entries) print() for entry in this_runs_entries: entry.play(pa_device_name, pa_client_name, remote, debug=debug) class PlayerHttpServer(HTTPServer): def __init__(self, *args, player, **kwargs): self.player = player super().__init__(*args, **kwargs) class PlayerHttpHandler(BaseHTTPRequestHandler): def do_GET(self): """Return the last played thing on / only""" if self.path != '/': self.send_response(404, 'Invalid path') return # return the last thing that was played self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() data = {'previous': None, 'playing': self.server.player.is_playing} if self.server.player.last_played: data['previous'] = [pd.to_dict() for pd in self.server.player.last_played.play_definitions] self.wfile.write(bytes(json.dumps(data), 'utf-8')) def do_PUT(self): """Accept something new to be played on / only""" if self.path != '/': self.send_response(404) return if self.headers.get('Content-Type') != 'application/json': self.send_response(400) self.end_headers() self.wfile.write(b'Content-type application/json only') return content_length = self.headers.get('content-length') length = int(content_length) if content_length else 0 if not length: self.send_response(400) self.end_headers() self.wfile.write(b'Got empty body') return data = self.rfile.read(length) try: entry_data = json.loads(data) except json.JSONDecodeError as e: self.send_response(400) self.end_headers() self.wfile.write(bytes(f"Could not parse JSON: {e}", 'utf-8')) return if not isinstance(entry_data, list): self.send_response(400) self.end_headers() self.wfile.write(b'Entry must be a list of dicts') return if not entry_data: self.send_response(400) self.end_headers() self.wfile.write(b'Empty entry') return entry = Entry() for i, pd_data in enumerate(entry_data): try: entry.play_definitions.append(PlayDefinition.create_from_dict(pd_data)) except Exception as e: self.send_response(400) self.end_headers() self.wfile.write(bytes(f"Invalid PlayDefinition {i}: {e}"), 'utf-8') return self.server.player.append(entry) self.send_response(200) self.end_headers() def do_POST(self): """Stop the currently-playing entry on /""" if self.path != '/': self.send_response(404) return self.server.player.stop() self.send_response(200) self.end_headers() class Player(threading.Thread): def __init__(self, *args, done_event, entry, pa_device_name, pa_client_name, debug, **kwargs): threading.Thread.__init__(self, *args, **kwargs) self._done_event = done_event self._entry = entry self._process = None self._child_pids = [] self._child_pids_event = threading.Event() self._pa_device_name = pa_device_name self._pa_client_name = pa_client_name self._debug = debug def stop(self): if not self._process: print("no process to stop") return self._child_pids_event.wait() for child_pid in self._child_pids: os.kill(child_pid, signal.SIGTERM) self._process.terminate() try: self._process.wait(2) except subprocess.TimeoutExpired: self._process.kill() # TODO check if children are still running and try kill # for child_pid in self._child_pids: def run(self): self._process = self._entry.play(pa_device_name=self._pa_device_name, pa_client_name=self._pa_client_name, debug=self._debug, wait=False) p = psutil.Process(self._process.pid) while len(p.children()) < 1: time.sleep(0.2) for child in p.children(): self._child_pids.append(child.pid) self._child_pids_event.set() self._process.wait() class PlayerManager(threading.Thread): def __init__(self, *args, pa_device_name, pa_client_name, debug, **kwargs): threading.Thread.__init__(self, *args, **kwargs) self._pa_device_name = pa_device_name self._pa_client_name = pa_client_name self._debug = debug self._queue = queue.SimpleQueue() # event signalling this PlayerThread to stop self._stop_thread_event = threading.Event() # a Player object currently running self._playing_thread = None # an event to be set by the currently running Player to signal it's # done self._playing_done_event = threading.Event() # event signalling to stop the current Player self._stop = threading.Event() # an Entry object that was last played self.last_played = None def append(self, entry): self._queue.put(entry) def stop(self): self._stop.set() def stop_thread(self): self._stop_thread_event.set() if self._playing_thread and self._playing_thread.is_alive(): self._playing_thread.stop() @property def is_playing(self): return bool(self._playing_thread) def run(self): while not self._stop_thread_event.is_set(): # check if something is currently playing if self._playing_thread: # try to stop what's currently happening if self._stop.is_set(): self._playing_thread.stop() self._stop.clear() # clean up if playing was done if not self._playing_thread.is_alive(): self._playing_thread.join() self._playing_done_event.clear() self._playing_thread = None continue # wait for playing to be done for some time, but continue to # let stop() have some effect self._playing_done_event.wait(1) continue try: entry = self._queue.get(block=True, timeout=1) except queue.Empty: continue self.last_played = entry # make sure we don't immediately try to stop it again, because it was set some time ago self._stop.clear() player = Player(entry=entry, done_event=self._playing_done_event, pa_device_name=self._pa_device_name, pa_client_name=self._pa_client_name, debug=self._debug, daemon=True) player.start() self._playing_thread = player @cli.command() @click.option('--listen-host', default='127.0.0.1') @click.option('--listen-port', default=8227) @click.option('--pa-device-name', required=True) @click.option('--pa-client-name', default=PA_CLIENT_NAME) @click.option('--debug', is_flag=True) def daemon(listen_host, listen_port, pa_device_name, pa_client_name, debug): """Listen on given host:port with an HTTP server Via HTTP, new requests to play files are accepted as PlayDefinitions as new entry and currently playing entries can be stopped. The last playing entry can be replayed, too. If --config-file is given, additionally play the crontab-like config """ play_thread = PlayerManager(pa_device_name=pa_device_name, pa_client_name=pa_client_name, debug=debug, daemon=True) play_thread.start() server_address = (listen_host, listen_port) httpd = PlayerHttpServer(server_address, PlayerHttpHandler, player=play_thread) httpd.serve_forever() if __name__ == '__main__': import sys sys.exit(cli())