#!/usr/bin/env python3 import time import sys import random from glob import glob from pygame import mixer from typing import Union, List, Mapping from atmorepl import AtmoReplRunner VERBOSITY = 0 class SoundCollection: def __init__( self, name, files: Union[List[str], Mapping[str, List[str]]], volume=0.5 ): self.name = name self.topics = files.keys() if hasattr(files, "keys") else [] if self.topics: self.files = files else: self.files = {"None": files} self.flat_files = [f for topic in self.files for f in self.files[topic]] self.volume = volume self.cur_topic = None self.load_all_files() self.sound_indices = {name: 0 for name in self.sounds} self.flat_sound_index = 0 def load_all_files(self): self.sounds = {} if self.topics: for t in self.topics: self.load_topic_files(t) else: self.load_topic_files("None") self.flat_sounds = [s for topic in self.sounds for s in self.sounds[topic]] random.shuffle(self.flat_sounds) def load_topic_files(self, topic): self.sounds[topic] = [] for f in self.files[topic]: sound = mixer.Sound(f) sound.set_volume(self.volume) self.sounds[topic].append(sound) random.shuffle(self.sounds[topic]) def next(self): if self.cur_topic is None: if len(self.flat_sounds) == 0: return self.flat_sound_index += 1 if self.flat_sound_index >= len(self.flat_sounds): random.shuffle(self.flat_sounds) self.flat_sound_index = 0 return self.flat_sounds[self.flat_sound_index] else: if len(self.sounds[self.cur_topic]) == 0: return self.sound_indices[self.cur_topic] += 1 if self.sound_indices[self.cur_topic] >= len(self.sounds[self.cur_topic]): random.shuffle(self.sounds[self.cur_topic]) self.sound_indices[self.cur_topic] = 0 return self.sounds[self.cur_topic][self.sound_indices[self.cur_topic]] def set_topic(self, topic=None): if self.topics: self.cur_topic = topic def __len__(self): if self.cur_topic is None: return len(self.flat_sounds) else: return len(self.sounds[self.cur_topic]) class CampAtmo: tracks_per_type = 2 single_track_types = ["lines"] background_track_types = ["background", "buzzwords"] lines_delta_sec = (5, 15) line_reaction_wait_sec = 0.8 def __init__(self): mixer.init(frequency=22050 * 2) self.load_sounds() def load_sounds(self): type_names = glob("samples/*/") type_files = {} for type_dir in type_names: type_name = type_dir.split("/")[1] flat_files = glob(type_dir + "/*.ogg") type_files[type_name] = flat_files topic_files = glob(type_dir + "/*/*.ogg") if topic_files: type_files[type_name] = {} for topic_dir in glob(type_dir + "/*/"): topic_name = topic_dir.split("/")[2] type_files[type_name][topic_name] = glob(type_dir + "/*/*.ogg") self.types = {} i = 0 mixer.set_num_channels( sum( [ 1 if name in self.single_track_types else self.tracks_per_type for name in type_files ] ) ) v(f"{mixer.get_num_channels()} channels set") for name in type_files: self.types[name] = [] for k in range( 1 if name in self.single_track_types else self.tracks_per_type ): self.types[name].append(mixer.Channel(i)) i += 1 self.sounds = {} for name, files in type_files.items(): self.sounds[name] = SoundCollection(name, files) def manage_background_queue(self, name, type_channels): for c in type_channels: if c.get_queue() is not None and len(self.sounds[name]) > 1: continue v(name) c.queue(self.sounds[name].next()) if c.get_queue() is None and len(self.sounds[name]) > 1: c.queue(self.sounds[name].next()) def run_forever(self): while True: for name, type_channels in self.types.items(): if ( name not in self.background_track_types or len(self.sounds[name]) == 0 ): continue self.manage_background_queue(name, type_channels) time.sleep(1) self.load_sounds() def main(): atmo = CampAtmo() atmoRepl = AtmoReplRunner(atmo, port=7723) atmoRepl.start() try: atmo.run_forever() except KeyboardInterrupt: print("exit") sys.exit(0) def v(*msg): if VERBOSITY >= 1: print(*msg) if __name__ == "__main__": main()