script-dump/cronplayer/cronplayer

526 lines
17 KiB
Python

#!/usr/bin/env python3
from datetime import datetime
import functools
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import os
from pathlib import Path
import queue
import signal
import subprocess
import threading
import time
import click
from crontab import CronTab
import requests
import psutil
PA_CLIENT_NAME = 'cronplayer'
class Entry:
def __init__(self, number=None, crontab=None, play_definitions=None):
self.number = number
self.crontab = crontab
self.play_definitions = play_definitions or []
def __repr__(self):
return f"Entry(number={self.number}, crontab={self.crontab}, play_definitions={self.play_definitions})"
def play(self, pa_device_name=None, pa_client_name=PA_CLIENT_NAME, remote=None, debug=False, wait=True):
if not pa_device_name and not remote:
if debug:
print("DEBUG: Neither pa_device_name nor remote given: not playing anything")
return
if not self.play_definitions:
if debug:
print(f"DEBUG: Not playing {self} without definitions")
return
if pa_device_name and remote and debug:
print("Both pa_device_name and remote given: using remote to play")
if remote:
return self._play_remote(remote, debug)
else:
return self._play_pa(pa_device_name, pa_client_name, debug, wait)
def _play_remote(self, remote, debug):
payload = [pd.to_dict() for pd in self.play_definitions]
if debug:
print(f"DEBUG: Would PUT {payload} to {remote}")
else:
requests.put(remote, json=payload)
def _play_pa(self, pa_device_name, pa_client_name, debug, wait):
env = {'XDG_RUNTIME_DIR': '/run/user/1000'}
p = subprocess.run(f"pactl list sinks | grep -e Name: -e Description: | grep '{pa_device_name}' -B 1 | "
"head -n 1 | awk '{print $2;}'",
capture_output=True, shell=True, env=env, check=True, encoding='utf-8')
pa_device = p.stdout.strip()
if not pa_device:
raise RuntimeError(f"Could not find PA device with Description set to '{pa_device_name}'.")
ffmpeg_file_input = '\n'.join(f"file '{p}'" for pd in self.play_definitions for p in pd.paths)
cmd = (
'ffmpeg -loglevel 0 '
f"-f concat -safe 0 -i <(echo \"{ffmpeg_file_input}\") "
'-f wav - | '
f"paplay --client-name {pa_client_name} --device '{pa_device}'")
# Using this would output to pulseaudio directly, but this runs short as
# ffmpeg quits once it's encoded everything and doesn't wait till that's
# all played on pulse
# -f pulse -device "${DEVICE}" -name "Uhr" "Uhr"
if debug:
print(f"DEBUG: {cmd}")
else:
kwargs = dict(shell=True, executable='/bin/bash', env=env)
if wait:
subprocess.run(cmd, **kwargs)
else:
return subprocess.Popen(cmd, **kwargs)
class PlayDefinitionError(Exception):
pass
class PlayDefinition:
def __init__(self, path, repeat=1):
self._path = path
self._repeat = repeat
@property
def paths(self):
return [self._path] * self._repeat
@classmethod
def create(cls, path, repeat=1, check_exists=True):
path = Path(path).expanduser().resolve()
if check_exists and not path.exists():
raise PlayDefinitionError(f"{path} does not exist")
return cls(path, repeat)
@classmethod
def create_from_dict(cls, parsed_definition, check_exists=True):
if 'repeat' not in parsed_definition:
raise PlayDefinitionError("Entry is missing key 'repeat'")
if 'file' not in parsed_definition:
raise PlayDefinitionError("Entry is missing key 'file'")
dt = datetime.now()
params = {
'month': dt.month,
'day': dt.day,
'dow': dt.isoweekday(),
'hour': dt.hour,
'analog_hour': int(dt.strftime('%I')),
'minute': dt.minute
}
repeat = parsed_definition['repeat']
if isinstance(repeat, int):
repeat = str(repeat)
try:
repeat = repeat.format(**params)
except Exception as e:
raise PlayDefinitionError(f"definition formatting failed: {e}")
repeat = eval(repeat, {}, {})
return cls.create(parsed_definition['file'], repeat, check_exists=check_exists)
@classmethod
def create_from_json(cls, json_definition, check_exists=True):
return cls.create_from_dict(json.loads(json_definition), check_exists=check_exists)
def __repr__(self):
return f"PlayDefinition(path={self._path}, repeat={self._repeat})"
def to_dict(self):
return {'file': str(self._path), 'repeat': self._repeat}
def to_json(self):
return json.dumps(self.to_dict())
def client_options(f):
@click.option('--pa-device-name')
@click.option('--pa-client-name', default=PA_CLIENT_NAME)
@click.option('--remote')
@click.option('--debug', is_flag=True)
@functools.wraps(f)
def wrapper(*args, **kwargs):
if kwargs.get('pa_device_name') and kwargs.get('remote'):
raise click.ClickException('"--remote" and "--pa-device-name" are mutually exclusive')
f(*args, **kwargs)
return wrapper
@click.group()
def cli():
...
@cli.command()
@click.option('--file', 'file_path', type=click.Path(), required=True)
@click.option('--repeat', default="1")
@client_options
def play(file_path, repeat, pa_device_name, pa_client_name, remote, debug):
"""Play a file with the given repeat definition
Construct a PlayDefinition and an Entry and play them as if they would have
been read from the crontab-like config-file.
"""
pd = PlayDefinition.create_from_json(
json.dumps({'file': file_path, 'repeat': repeat}), check_exists=not bool(remote))
entry = Entry(0, None, [pd])
entry.play(pa_device_name, pa_client_name, remote, debug)
@cli.command()
@click.option('--remote', required=True)
def stop(remote):
"""Send the stop command to the given remote"""
requests.post(remote)
@cli.command()
@click.option('--config-file', type=click.Path(exists=True), required=True)
@client_options
def play_by_config(config_file, pa_device_name, pa_client_name, remote, debug):
"""Play the defined entries according to given crontab-like config-file
Only entries for which the current time matches the definition in the
crontab-like config-file are played.
"""
# parse the config file
# config file entry starting with a # (and any space before) are ignored
# entries contain a crontab definition followed by a ' = ' and then one or
# more play definitions separated by '|'
#
# a play definition is either
# * a path to a file (without '|')
# * a JSON dict containing the keys "file" and "repeat", where "times" can
# contain mathematical expressions and can use Python formatting with the
# variables month, day, dow (starting at monday being 1), hour (0 - 23),
# analog_hour (1 - 12), minute containing the current run's values
with open(config_file) as f:
lines = [l.strip() for l in f.readlines()]
lines = [(i, l) for (i, l) in enumerate(lines, start=1)
if l and not l.startswith('#')]
entries = []
for i, line in lines:
if ' = ' not in line:
print(f"Error: Config file {config_file} line {i} contains no ' = '. Ignoring.")
continue
cron_definition, play_definitions = line.split(' = ', 1)
try:
c = CronTab(cron_definition)
except ValueError as e:
print(f"Error: Config file {config_file} line {i} contains no valid cron definition: {e}")
continue
entry = Entry(i, c)
play_definitions = play_definitions.split('|')
for p_def in play_definitions:
p_def = p_def.strip()
try:
play_definition = PlayDefinition.create_from_json(p_def)
except PlayDefinitionError as e:
print(f"Error: Config file {config_file} line {i}: {e}")
continue
except json.decoder.JSONDecodeError:
# no valid JSON, treat as filepath
try:
play_definition = PlayDefinition.create(p_def)
except PlayDefinitionError as e:
print(f"Error: Config file {config_file} line {i}: {e}")
continue
entry.play_definitions.append(play_definition)
if entry.play_definitions:
entries.append(entry)
if debug:
from pprint import pprint
print("\n## All entries")
pprint(entries)
print()
this_runs_entries = []
# we filter first into a new list, because multiple Entries might be valid
# for this run and the farther down the entry is defined the later it would
# be checked
for entry in entries:
# check if this entry should be played right now
if debug:
print(f"{entry.crontab.previous(default_utc=False)} {entry}")
if entry.crontab.previous(default_utc=False) < -59:
continue
this_runs_entries.append(entry)
if debug:
from pprint import pprint
print("\n## This run's entries")
pprint(this_runs_entries)
print()
for entry in this_runs_entries:
entry.play(pa_device_name, pa_client_name, remote, debug=debug)
class PlayerHttpServer(HTTPServer):
def __init__(self, *args, player, **kwargs):
self.player = player
super().__init__(*args, **kwargs)
class PlayerHttpHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""Return the last played thing on / only"""
if self.path != '/':
self.send_response(404, 'Invalid path')
return
# return the last thing that was played
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
data = {'previous': None, 'playing': self.server.player.is_playing}
if self.server.player.last_played:
data['previous'] = [pd.to_dict() for pd in self.server.player.last_played.play_definitions]
self.wfile.write(bytes(json.dumps(data), 'utf-8'))
def do_PUT(self):
"""Accept something new to be played on / only"""
if self.path != '/':
self.send_response(404)
return
if self.headers.get('Content-Type') != 'application/json':
self.send_response(400)
self.end_headers()
self.wfile.write(b'Content-type application/json only')
return
content_length = self.headers.get('content-length')
length = int(content_length) if content_length else 0
if not length:
self.send_response(400)
self.end_headers()
self.wfile.write(b'Got empty body')
return
data = self.rfile.read(length)
try:
entry_data = json.loads(data)
except json.JSONDecodeError as e:
self.send_response(400)
self.end_headers()
self.wfile.write(bytes(f"Could not parse JSON: {e}", 'utf-8'))
return
if not isinstance(entry_data, list):
self.send_response(400)
self.end_headers()
self.wfile.write(b'Entry must be a list of dicts')
return
if not entry_data:
self.send_response(400)
self.end_headers()
self.wfile.write(b'Empty entry')
return
entry = Entry()
for i, pd_data in enumerate(entry_data):
try:
entry.play_definitions.append(PlayDefinition.create_from_dict(pd_data))
except Exception as e:
self.send_response(400)
self.end_headers()
self.wfile.write(bytes(f"Invalid PlayDefinition {i}: {e}"), 'utf-8')
return
self.server.player.append(entry)
self.send_response(200)
self.end_headers()
def do_POST(self):
"""Stop the currently-playing entry on /"""
if self.path != '/':
self.send_response(404)
return
self.server.player.stop()
self.send_response(200)
self.end_headers()
class Player(threading.Thread):
def __init__(self, *args, done_event, entry, pa_device_name, pa_client_name, debug, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self._done_event = done_event
self._entry = entry
self._process = None
self._child_pids = []
self._child_pids_event = threading.Event()
self._pa_device_name = pa_device_name
self._pa_client_name = pa_client_name
self._debug = debug
def stop(self):
if not self._process:
print("no process to stop")
return
self._child_pids_event.wait()
for child_pid in self._child_pids:
os.kill(child_pid, signal.SIGTERM)
self._process.terminate()
try:
self._process.wait(2)
except subprocess.TimeoutExpired:
self._process.kill()
# TODO check if children are still running and try kill
# for child_pid in self._child_pids:
def run(self):
self._process = self._entry.play(pa_device_name=self._pa_device_name, pa_client_name=self._pa_client_name,
debug=self._debug, wait=False)
p = psutil.Process(self._process.pid)
while len(p.children()) < 1:
time.sleep(0.2)
for child in p.children():
self._child_pids.append(child.pid)
self._child_pids_event.set()
self._process.wait()
class PlayerManager(threading.Thread):
def __init__(self, *args, pa_device_name, pa_client_name, debug, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self._pa_device_name = pa_device_name
self._pa_client_name = pa_client_name
self._debug = debug
self._queue = queue.SimpleQueue()
# event signalling this PlayerThread to stop
self._stop_thread_event = threading.Event()
# a Player object currently running
self._playing_thread = None
# an event to be set by the currently running Player to signal it's
# done
self._playing_done_event = threading.Event()
# event signalling to stop the current Player
self._stop = threading.Event()
# an Entry object that was last played
self.last_played = None
def append(self, entry):
self._queue.put(entry)
def stop(self):
self._stop.set()
def stop_thread(self):
self._stop_thread_event.set()
if self._playing_thread and self._playing_thread.is_alive():
self._playing_thread.stop()
@property
def is_playing(self):
return bool(self._playing_thread)
def run(self):
while not self._stop_thread_event.is_set():
# check if something is currently playing
if self._playing_thread:
# try to stop what's currently happening
if self._stop.is_set():
self._playing_thread.stop()
self._stop.clear()
# clean up if playing was done
if not self._playing_thread.is_alive():
self._playing_thread.join()
self._playing_done_event.clear()
self._playing_thread = None
continue
# wait for playing to be done for some time, but continue to
# let stop() have some effect
self._playing_done_event.wait(1)
continue
try:
entry = self._queue.get(block=True, timeout=1)
except queue.Empty:
continue
self.last_played = entry
# make sure we don't immediately try to stop it again, because it was set some time ago
self._stop.clear()
player = Player(entry=entry, done_event=self._playing_done_event, pa_device_name=self._pa_device_name,
pa_client_name=self._pa_client_name, debug=self._debug, daemon=True)
player.start()
self._playing_thread = player
@cli.command()
@click.option('--listen-host', default='127.0.0.1')
@click.option('--listen-port', default=8227)
@click.option('--pa-device-name', required=True)
@click.option('--pa-client-name', default=PA_CLIENT_NAME)
@click.option('--debug', is_flag=True)
def daemon(listen_host, listen_port, pa_device_name, pa_client_name, debug):
"""Listen on given host:port with an HTTP server
Via HTTP, new requests to play files are accepted as PlayDefinitions as new
entry and currently playing entries can be stopped. The last playing entry
can be replayed, too.
If --config-file is given, additionally play the crontab-like config
"""
play_thread = PlayerManager(pa_device_name=pa_device_name, pa_client_name=pa_client_name,
debug=debug, daemon=True)
play_thread.start()
server_address = (listen_host, listen_port)
httpd = PlayerHttpServer(server_address, PlayerHttpHandler, player=play_thread)
httpd.serve_forever()
if __name__ == '__main__':
import sys
sys.exit(cli())