#!/usr/bin/env python3 import time import sys import random from glob import glob from threading import Thread from typing import Union, List, Mapping from pygame import mixer from atmorepl import AtmoReplRunner VERBOSITY = 0 class SoundCollection: def __init__( self, name, files: Union[List[str], Mapping[str, List[str]]], volume=0.5 ): self.name = name self.topics = files.keys() if hasattr(files, "keys") else [] if self.topics: self.files = files else: self.files = {"None": files} self.flat_files = [f for topic in self.files for f in self.files[topic]] self.volume = volume self.cur_topic = None self.load_all_files() self.sound_indices = {name: 0 for name in self.sounds} self.flat_sound_index = 0 def load_all_files(self): self.sounds = {} if self.topics: for t in self.topics: self.load_topic_files(t) else: self.load_topic_files("None") self.flat_sounds = [s for topic in self.sounds for s in self.sounds[topic]] random.shuffle(self.flat_sounds) def load_topic_files(self, topic): self.sounds[topic] = [] for f in self.files[topic]: sound = mixer.Sound(f) sound.set_volume(self.volume) self.sounds[topic].append(sound) random.shuffle(self.sounds[topic]) def next(self): if self.cur_topic is None: if len(self.flat_sounds) == 0: return self.flat_sound_index += 1 if self.flat_sound_index >= len(self.flat_sounds): random.shuffle(self.flat_sounds) self.flat_sound_index = 0 return self.flat_sounds[self.flat_sound_index] else: if len(self.sounds[self.cur_topic]) == 0: return self.sound_indices[self.cur_topic] += 1 if self.sound_indices[self.cur_topic] >= len(self.sounds[self.cur_topic]): random.shuffle(self.sounds[self.cur_topic]) self.sound_indices[self.cur_topic] = 0 return self.sounds[self.cur_topic][self.sound_indices[self.cur_topic]] def set_topic(self, topic=None): if self.topics: self.cur_topic = topic def __len__(self): if self.cur_topic is None: return len(self.flat_sounds) else: return len(self.sounds[self.cur_topic]) class CampAtmo: tracks_per_type = 2 single_track_types = ["lines"] background_track_types = ["background", "buzzwords"] lines_delta_sec = (5, 15) line_reaction_wait_sec = 0.8 def __init__(self): self.stopped = False mixer.init(frequency=22050 * 2) self.load_sounds() def start(self): self.background_thread = Thread( None, self.run_forever, "atmo-background", daemon=True ) self.background_thread.start() self.lines_thread = Thread( None, self.run_lines_forever, "atmo-lines", daemon=True ) self.lines_thread.start() def stop(self): self.stopped = True self.lines_thread.join(2) self.background_thread.join(0.5) def load_sounds(self): type_names = glob("samples/*/") type_files = {} for type_dir in type_names: type_name = type_dir.split("/")[1] flat_files = glob(type_dir + "/*.ogg") type_files[type_name] = flat_files topic_files = glob(type_dir + "/*/*.ogg") if topic_files: type_files[type_name] = {} for topic_dir in glob(type_dir + "/*/"): topic_name = topic_dir.split("/")[2] type_files[type_name][topic_name] = glob(type_dir + "/*/*.ogg") self.types = {} i = 0 mixer.set_num_channels( sum( [ 1 if name in self.single_track_types else self.tracks_per_type for name in type_files ] ) ) v(f"{mixer.get_num_channels()} channels set") for name in type_files: self.types[name] = [] for k in range( 1 if name in self.single_track_types else self.tracks_per_type ): self.types[name].append(mixer.Channel(i)) i += 1 self.sounds = {} for name, files in type_files.items(): self.sounds[name] = SoundCollection( name, files, volume=0.1 if name in self.background_track_types else 0.9 ) def manage_background_queue(self, name, type_channels): for c in type_channels: if c.get_queue() is not None and len(self.sounds[name]) > 1: continue v(name) c.queue(self.sounds[name].next()) if c.get_queue() is None and len(self.sounds[name]) > 1: c.queue(self.sounds[name].next()) def run_forever(self): while not self.stopped: for name, type_channels in self.types.items(): if ( name not in self.background_track_types or len(self.sounds[name]) == 0 ): continue self.manage_background_queue(name, type_channels) time.sleep(1) self.load_sounds() def play_type_sounds(self, type_name) -> int: if len(self.sounds[type_name]) == 0: return max_sound_length = 0 for channel in self.types[type_name]: sound = self.sounds[type_name].next() max_sound_length = max(max_sound_length, sound.get_length()) channel.queue(sound) return max_sound_length def run_lines_forever(self): while not self.stopped: time.sleep(random.randrange(*self.lines_delta_sec)) length_sec = self.play_type_sounds("lines") time.sleep(length_sec + self.line_reaction_wait_sec) self.play_type_sounds("reactions") def main(): atmo = CampAtmo() atmoRepl = AtmoReplRunner(atmo, port=7723) atmoRepl.start() atmo.start() try: while True: time.sleep(100) except KeyboardInterrupt: print("exiting...") atmo.stop() sys.exit(0) def v(*msg): if VERBOSITY >= 1: print(*msg) if __name__ == "__main__": main()