camp-atmo/campatmo.py

203 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
import time
import sys
import random
from glob import glob
from threading import Thread
from typing import Union, List, Mapping
from pygame import mixer
from atmorepl import AtmoReplRunner
VERBOSITY = 0
class SoundCollection:
def __init__(
self, name, files: Union[List[str], Mapping[str, List[str]]], volume=0.5
):
self.name = name
self.topics = files.keys() if hasattr(files, "keys") else []
if self.topics:
self.files = files
else:
self.files = {"None": files}
self.flat_files = [f for topic in self.files for f in self.files[topic]]
self.volume = volume
self.cur_topic = None
self.load_all_files()
self.sound_indices = {name: 0 for name in self.sounds}
self.flat_sound_index = 0
def load_all_files(self):
self.sounds = {}
if self.topics:
for t in self.topics:
self.load_topic_files(t)
else:
self.load_topic_files("None")
self.flat_sounds = [s for topic in self.sounds for s in self.sounds[topic]]
random.shuffle(self.flat_sounds)
def load_topic_files(self, topic):
self.sounds[topic] = []
for f in self.files[topic]:
sound = mixer.Sound(f)
sound.set_volume(self.volume)
self.sounds[topic].append(sound)
random.shuffle(self.sounds[topic])
def next(self):
if self.cur_topic is None:
if len(self.flat_sounds) == 0:
return
self.flat_sound_index += 1
if self.flat_sound_index >= len(self.flat_sounds):
random.shuffle(self.flat_sounds)
self.flat_sound_index = 0
return self.flat_sounds[self.flat_sound_index]
else:
if len(self.sounds[self.cur_topic]) == 0:
return
self.sound_indices[self.cur_topic] += 1
if self.sound_indices[self.cur_topic] >= len(self.sounds[self.cur_topic]):
random.shuffle(self.sounds[self.cur_topic])
self.sound_indices[self.cur_topic] = 0
return self.sounds[self.cur_topic][self.sound_indices[self.cur_topic]]
def set_topic(self, topic=None):
if self.topics:
self.cur_topic = topic
def __len__(self):
if self.cur_topic is None:
return len(self.flat_sounds)
else:
return len(self.sounds[self.cur_topic])
class CampAtmo:
tracks_per_type = 2
single_track_types = ["lines"]
background_track_types = ["background", "buzzwords"]
lines_delta_sec = (5, 15)
line_reaction_wait_sec = 0.8
def __init__(self):
self.stopped = False
mixer.init(frequency=22050 * 2)
self.load_sounds()
def start(self):
self.background_thread = Thread(
None, self.run_forever, "atmo-background", daemon=True
)
self.background_thread.start()
self.lines_thread = Thread(
None, self.run_lines_forever, "atmo-lines", daemon=True
)
self.lines_thread.start()
def stop():
self.stopped = True
self.lines_thread.join(2)
self.background_thread.join(0.5)
def load_sounds(self):
type_names = glob("samples/*/")
type_files = {}
for type_dir in type_names:
type_name = type_dir.split("/")[1]
flat_files = glob(type_dir + "/*.ogg")
type_files[type_name] = flat_files
topic_files = glob(type_dir + "/*/*.ogg")
if topic_files:
type_files[type_name] = {}
for topic_dir in glob(type_dir + "/*/"):
topic_name = topic_dir.split("/")[2]
type_files[type_name][topic_name] = glob(type_dir + "/*/*.ogg")
self.types = {}
i = 0
mixer.set_num_channels(
sum(
[
1 if name in self.single_track_types else self.tracks_per_type
for name in type_files
]
)
)
v(f"{mixer.get_num_channels()} channels set")
for name in type_files:
self.types[name] = []
for k in range(
1 if name in self.single_track_types else self.tracks_per_type
):
self.types[name].append(mixer.Channel(i))
i += 1
self.sounds = {}
for name, files in type_files.items():
self.sounds[name] = SoundCollection(
name, files, volume=0.1 if name in self.background_track_types else 0.9
)
def manage_background_queue(self, name, type_channels):
for c in type_channels:
if c.get_queue() is not None and len(self.sounds[name]) > 1:
continue
v(name)
c.queue(self.sounds[name].next())
if c.get_queue() is None and len(self.sounds[name]) > 1:
c.queue(self.sounds[name].next())
def run_forever(self):
while not self.stopped:
for name, type_channels in self.types.items():
if (
name not in self.background_track_types
or len(self.sounds[name]) == 0
):
continue
self.manage_background_queue(name, type_channels)
time.sleep(1)
self.load_sounds()
def play_type_sounds(self, type_name) -> int:
if len(self.sounds[type_name]) == 0:
return
max_sound_length = 0
for channel in self.types[type_name]:
sound = self.sounds[type_name].next()
max_sound_length = max(max_sound_length, sound.get_length())
channel.queue(sound)
return max_sound_length
def run_lines_forever(self):
while not self.stopped:
time.sleep(random.randrange(*self.lines_delta_sec))
length_sec = self.play_type_sounds("lines")
time.sleep(length_sec + self.line_reaction_wait_sec)
self.play_type_sounds("reactions")
def main():
atmo = CampAtmo()
atmoRepl = AtmoReplRunner(atmo, port=7723)
atmoRepl.start()
atmo.start()
try:
while True:
time.sleep(100)
except KeyboardInterrupt:
print("exiting...")
atmo.stop()
sys.exit(0)
def v(*msg):
if VERBOSITY >= 1:
print(*msg)
if __name__ == "__main__":
main()